2023-12-14 15:45:12 版本 : 【Python开发】FastAPI 08:Security 登录认证
作者: 系统管理员1 于 2023年12月14日 发布在分类 / 技术研发 / python 下,并于 2023年12月14日 编辑
 历史版本

备注 修改日期 修改人
内容更新 2023-12-14 15:48:24[当前版本] 系统管理员1
内容更新 2023-12-14 15:48:06 系统管理员1
内容更新 2023-12-14 15:47:45 系统管理员1
内容更新 2023-12-14 15:45:12 系统管理员1

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

本文链接: https://blog.csdn.net/weixin_51407397/article/details/131031170


源码地址:

https://gitee.com/yinyuu/fast-api_study_yinyu

1 介绍

FastAPI 提供了多种工具,可帮助你以标准的方式轻松、快速地处理安全性,而无需研究和学习所有的安全规范,这相比花费大量的精力和代码处理安全性和身份认证很有好了(比如 java )。

首先我们来看一些小概念,如果已了解可直接看第二种~

1.1 OAuth2

OAuth2 是一个规范,它定义了几种处理身份认证和授权的方法。 它是一个相当广泛的规范,涵盖了一些复杂的使用场景,包括使用「第三方」进行身份认证的方法。

Facebook,Google,Twitter,GitHub 登录系统均是采用该机制。

OAuth 1

OAuth 1OAuth2 完全不同,并且更为复杂,它直接包含了有关如何加密通信的规范。

如今它已经 out 了,也就没多少人用了。

OAuth2 没有指定如何加密通信,它期望使用 HTTPS 进行通信。

1.2 OpenAPI

OpenAPI(以前称为 Swagger)是用于构建 API 的开放规范。

FastAPI 基于 OpenAPI,因此自动交互式文档界面,代码生成等成为可能。

OpenAPI 有一种定义多个安全「方案」的方法,你可以利用所有这些基于标准的工具,包括这些交互式文档系统。

OpenAPI 定义了以下安全方案:

  • apiKey:一个特定于应用程序的密钥,可以来自:
    • 查询参数。
    • 请求头。
    • cookie
  • http:标准的 HTTP 身份认证系统,包括:
    • bearer: 一个值为 Bearer 加令牌字符串的 Authorization 请求头。这是从 OAuth2 继承的。
    • HTTP Basic 认证方式。
    • HTTP Digest,等等。
  • oauth2:所有的 OAuth2 处理安全性的方式(称为「流程」)。
    • 以下几种流程适合构建 OAuth 2.0 身份认证的提供者(例如 Google,Facebook,Twitter,GitHub 等): * implicit * clientCredentials * authorizationCode
    • 但是有一个特定的「流程」可以完美地用于直接在同一应用程序中处理身份认证:
    • password:接下来将介绍它的示例。
  • openIdConnect:提供了一种定义如何自动发现 OAuth2 身份认证数据的方法。
    • 此自动发现机制是 OpenID Connect 规范中定义的内容。

FastAPI fastapi.security 模块中为每个安全方案提供了几种工具,这些工具简化了这些安全机制的使用方法。

2 安全基础

假设前后端分离开发,前端要使用后端的 username password 验证用户身份。

预先安装:pip install python-multipart

因为 OAuth2 使用表单数据发送 username password

2.1 使用 Bearer

本例使用 OAuth2 Password 流以及 Bearer 令牌(Token),为此要使用 OAuth2PasswordBearer 类。

 OAuth2PasswordBearer

创建 OAuth2PasswordBearer 的类实例时,要传递 tokenUrl 参数。该参数包含客户端(用户浏览器中运行的前端) 的 URL,用于发送 username password,并获取令牌。

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

tokenUrl="token" 指向的是暂未创建的相对 URL token,这个相对 URL 相当于 ./token。比如 API 位于 https://example.com/,则指向 https://example.com/token

oauth2_scheme 变量是 OAuth2PasswordBearer 的实例,也是可调用项。

比如以下边方式调用:

oauth2_scheme(some, parameters)

因此,Depends 可以调用 oauth2_scheme 变量。

使用

接下来,使用 Depends oauth2_scheme 传入依赖项。

from fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordBearer
 
app = FastAPI()
 
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
 
@app.get("/items/")
async def read_items(token: str = Depends(oauth2_scheme)):
    return {"token": token}

该依赖项使用字符串(str)接收路径操作函数的参数 token

运行

使用 main 函数运行:

if __name__ == '__main__':
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8000)

文档

此时访问交互式文档 http://127.0.0.1:8000/docs#

页面右上角出现了一个「Authorize」按钮。 路径操作的右上角也出现了一个可以点击的小锁图标,代表这个接口需要登录认证。

那么点击 Authorize 按钮,弹出授权表单,输入 username password 及其它可选字段:

目前,由于是 demo,在表单中输入内容不会有任何反应,后文会逐步进行完善!

这个自动工具非常实用,可在文档中与所有 API 交互。

前端团队(可能就是开发者本人)可以使用本工具。

第三方应用与系统也可以调用本工具。 开发者也可以用它来调试、检查、测试应用。

文档中请求接口

在文档中请求该接口时,FastAPI 校验请求中的 Authorization 请求头,核对请求头的值是不是由 Bearer + 令牌组成, 并返回令牌字符串(str)。

如果没有找到 Authorization 请求头,或请求头的值不是 Bearer + 令牌。FastAPI 直接返回 401 错误状态码(UNAUTHORIZED):

 密码流

现在,再回过头来看一下。 Password 流是 OAuth2 定义的,用于处理安全与身份验证的方式(流)。 OAuth2 的设计目标是为了让后端或 API 独立于服务器验证用户身份。

但在本例中,FastAPI 应用会处理 API 与身份验证。

以下是简化的运行流程:

  • 用户在前端输入 username password,并点击回车
  • (用户浏览器中运行的)前端把 username password 发送至 API 中指定的 URL(使用 tokenUrl="token" 声明)
  • API 检查 username password,并用令牌(Token) 响应(暂未实现此功能):
  • 令牌只是用于验证用户的字符串
  • 一般来说,令牌会在一段时间后过期
    • 过时后,用户要再次登录
    • 这样一来,就算令牌被人窃取,风险也较低。因为它与永久密钥不同,在绝大多数情况下不会长期有效
  • 前端临时将令牌存储在某个位置
  • 用户点击前端,前往前端应用的其它部件
  • 前端需要从 API 中提取更多数据:
    • 为指定的端点(Endpoint)进行身份验证
    • 因此,用 API 验证身份时,要发送值为 Bearer + 令牌的请求头 Authorization
    • 假如令牌为 foobarAuthorization 请求头就是: Bearer foobar

看到了吧,只要多写三四行代码,就可以添加基础的安全表单。

2.2 获取当前用户

前边的安全系统向路径操作函数提供了一个 str 类型的 token,接下来进行完善,让它返回当前用户给我们。

 创建用户模型

首先,创建一个用户 Pydantic 模型。

与使用 Pydantic 声明请求体的方式相同,我们可以在其他任何地方使用它:

from pydantic import BaseModel
 
class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None

创建 get_current_user 依赖项

接下来创建一个 get_current_user 依赖项,用于获取当前的用户,get_current_user 将具有一个我们之前所创建的同一个 oauth2_scheme 作为依赖项。

fake_decode_token (伪)工具函数接收 str 类型的令牌并返回我们的 Pydantic User 模型,get_current_user 将从子依赖项 oauth2_scheme 中接收一个 str 类型的 token

 
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
 
def fake_decode_token(token):
    return User(
        username=token + "fakedecoded", email="yinyu@example.com", full_name="yinyu"
    )
 
async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    return user

注入当前用户

现在可以在路径操作中使用 get_current_user 作为 Depends 了,完整代码:

from typing import Union
from fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
 
app = FastAPI()
 
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
 
class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None
 
def fake_decode_token(token):
    return User(
        username = token + "fakedecoded", email="yinyu@example.com", full_name="yinyu"
    )
 
async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    return user
 
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user

注意我们将 current_user 的类型声明为 Pydantic 模型 User,FastAPI 将自动进行代码补全和类型检查。

现在你可以直接在路径操作函数中获取当前用户。 接下来我们只需要再为用户/客户端添加一个真正发送 username password 的路径操作。

3 使用密码和 Bearer 的简单 OAuth2

接着上一章继续开发,添加缺少的部分以实现一个完整的安全性流程。

3.1流程

 获取 username password

回顾一下,流程第一步就是用户在前端输入 username password,那么此时要做的事便是获取该 username password,然后再进行校验,部分代码:

    from fastapi.security import OAuth2PasswordRequestForm
     
    async def login(form_data: OAuth2PasswordRequestForm = Depends()):

FastAPI 提供了 OAuth2PasswordRequestForm ,它的作用是接收请求表单中的数据,包括:

  • username
  • password
  • 一个可选的 scope 字段,是一个由空格分隔的字符串组成的大字符串
  • 一个可选的 grant_type
  • 一个可选的 client_id(我们的示例不需要它)
  • 一个可选的 client_secret(我们的示例不需要它)

OAuth2PasswordRequestForm 并不像 OAuth2PasswordBearer 一样是 FastAPI 的一个特殊的类。 OAuth2PasswordBearer 使得 FastAPI 明白它是一个安全方案。所以它得以通过这种方式添加到 OpenAPI 中。 但 OAuth2PasswordRequestForm 只是一个你可以自己编写的类依赖项,或者你也可以直接声明 Form 参数(请求表单),只是前者更为快捷方便。

使用表单数据

首先,使用表单字段中的 username 从(伪)数据库中获取用户数据。 如果没有这个用户,我们将返回一个错误消息,提示「用户名或密码错误」

...
 
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}
 
 
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username) #使用表单数据
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    ...

校验密码

目前我们已经从数据库中获取了用户数据,但尚未校验密码。

首先让我们将这些数据放入 Pydantic UserInDB 模型中。 永远不要保存明文密码,因此,我们将使用(伪)哈希密码系统。 如果密码不匹配,我们将返回同一个错误。

class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None
 
class UserInDB(User):
    hashed_password: str
 
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    user = UserInDB(**user_dict)
    hashed_password = fake_hash_password(form_data.password)
    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
 ...

hashed_password 可以理解为哈希密码(为了安全):「哈希」的意思是:将某些内容(在本例中为密码)转换为看起来像乱码的字节序列(只是一个字符串)。 每次你传入完全相同的内容(完全相同的密码)时,你都会得到完全相同的乱码。

UserInDB(**user_dict) 表示:

直接将 user_dict 的键和值作为关键字参数传递,等同于:

UserInDB(
    username = user_dict["username"],
    email = user_dict["email"],
    full_name = user_dict["full_name"],
    disabled = user_dict["disabled"],
    hashed_password = user_dict["hashed_password"],
)

这在【Python开发】FastAPI 04:响应模型 有所介绍。

返回令牌/token

token 端点的响应必须是一个 JSON 对象,它应该有一个 token_type。在我们的例子中,由于我们使用的是「Bearer」令牌,因此令牌类型应为「bearer」。 并且还应该有一个 access_token 字段,它是一个包含我们的访问令牌的字符串。

在此示例中,我们将极其不安全地返回相同的 username 作为令牌。

@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    user = UserInDB(**user_dict)
    hashed_password = fake_hash_password(form_data.password)
    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
 
    return {"access_token": user.username, "token_type": "bearer"}

在下一章中,你将看到一个真实的安全实现,使用了哈希密码和 JWT 令牌。

获取当前活跃用户

我们想要仅当此用户处于启用状态时才能获取 current_user。 因此,我们创建了一个额外的依赖项 get_current_active_user,而该依赖项又以 get_current_user 作为依赖项。

如果用户不存在或处于未启用状态,则这两个依赖项都将仅返回 HTTP 错误。 因此,在我们的端点中,只有当用户存在,身份认证通过且处于启用状态时,我们才能获得该用户:

async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user
 
async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user
 
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

我们在此处返回的值为 Bearer 的额外响应头 WWW-Authenticate 也是规范的一部分。 任何的 401「未认证」HTTP(错误)状态码都应该返回 WWW-Authenticate 响应头。 对于 bearer 令牌(我们的例子),该响应头的值应为 Bearer。 不过你可以忽略这个额外的响应头,没什么影响。

3.2 实际效果

完整代码

from typing import Union
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
 
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret1",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}
 
 
app = FastAPI()
 
 
def fake_hash_password(password: str):
    return "fakehashed" + password
 
 
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
 
 
class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None
 
class UserInDB(User):
    hashed_password: str
 
 
def get_user(db: dict, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)
 
 
def fake_decode_token(token: str):
    # This doesn't provide any security at all
    # Check the next version
    user = get_user(fake_users_db, token)
    return user
 
 
async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user
 
 
async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user
 
 
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    user = UserInDB(**user_dict)
    hashed_password = fake_hash_password(form_data.password)
    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
 
    return {"access_token": user.username, "token_type": "bearer"} #access_token 将被识别为 token
 
 
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user
 
if __name__ == '__main__':
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8080)

身份认证


历史版本-目录  [回到顶端]
    wcp知识库系统-京ICP备15024440号-1 -V 5.1.9 -wcp