FastAPI + SQLAlchemy 异步 ORM
基于 SQLAlchemy 2.0+ 异步 API + FastAPI
数据库:MySQL,通过 aiomysql
核心目标:用 ORM 操作数据库,无需手写 SQL
一、什么是ORM
ORM(Object-Relational Mapping,对象关系映射)
让开发者通过操作 Python 对象(模型类)来操作关系型数据库,自动转换为 SQL 语句。
作用
- 减少重复 SQL,避免手写易错语句
- 代码可读性强,面向对象风格
- 自动防 SQL 注入,参数化查询
- 自动管理连接、事务、连接池
FastAPI 官方推荐 ORM:SQLAlchemy ,支持同步/异步
二、环境准备
1
2
|
# 安装依赖
pip install "sqlalchemy[asyncio]" aiomysql
|
sqlalchemy[asyncio]:启用异步支持
aiomysql:MySQL 的异步驱动
三、数据库连接与模型定义
1. 配置数据库 URL
1
2
3
4
|
# 确保数据库已创建:
# CREATE DATABASE fastapi_test CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/fastapi_test?charset=utf8mb4"
|
2. 创建异步引擎
1
2
3
4
5
6
7
8
|
from sqlalchemy.ext.asyncio import create_async_engine
async_engine = create_async_engine(
ASYNC_DATABASE_URL,
echo=True, # 打印 SQL 日志(开发用)
pool_size=10, # 连接池大小
max_overflow=20, # 超出 pool_size 的最大连接数
)
|
3. 定义模型(使用 SQLAlchemy 2.0 风格)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import Integer, String, Float, DateTime, func
from datetime import datetime
class Base(DeclarativeBase):
"""基类:通用字段"""
create_time: Mapped[datetime] = mapped_column(
DateTime,
default=func.now(),
comment="创建时间"
)
update_time: Mapped[datetime] = mapped_column(
DateTime,
default=func.now(),
onupdate=func.now(),
comment="更新时间"
)
class Book(Base):
__tablename__ = "book"
id: Mapped[int] = mapped_column(Integer, primary_key=True, comment="书籍ID")
book_name: Mapped[str] = mapped_column(String(255), comment="书名")
author: Mapped[str] = mapped_column(String(255), comment="作者")
price: Mapped[float] = mapped_column(Float, comment="价格")
publisher: Mapped[str] = mapped_column(String(255), comment="出版社")
class User(Base):
__tablename__ = "user"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
name: Mapped[str] = mapped_column(String(255))
password: Mapped[str] = mapped_column(String(255))
|
注意:
- 使用
Mapped + mapped_column 是 SQLAlchemy 2.0 推荐方式,类型安全且简洁。
Mapped[int]:告诉类型检查器,这个属性在实例上是 int 类型,利用python原生类型进行定义
mapped_column(Integer, xxx):对应数据库列行为,定义orm的数据类型Integer
- 运行时,book.id 是 int,不是 Column 对象,如果说是老版本的写法就是id = Column(Integer, …),那id就是Column对象,而不是原生int对象,所以在2.0版本Fastapi推荐
Mapped与mapped_cloumn的结合用法
四、应用生命周期 启动应用自动建表
不要用已废弃的 @app.on_event,使用 lifespan
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
from fastapi import FastAPI
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import AsyncGenerator
async def create_tables():
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator:
# 启动时:建表
await create_tables()
yield # 应用运行中
# 关闭时 这里没有
app = FastAPI(lifespan=lifespan)
|
表只建一次;若表结构变更,需手动 DROP 或用 Alembic等迁移工具。
五、数据库会话依赖注入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
AsyncSessionLocal = async_sessionmaker(
bind=async_engine,
class_=AsyncSession,
expire_on_commit=False, # 提交后对象不过期,可继续使用
)
async def get_db():
"""依赖:提供数据库会话"""
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
|
在路由中通过 Depends(get_db) 注入会话,自动管理事务与连接。
六、数据库查询操作
一般格式
1
2
|
result = await db.execute(select(Model).where(...))
data = result.scalars().all() # 或 .first(), .scalar_one_or_none() 等
|
1. 基础查询
| 操作 |
代码示例 |
| 查所有 |
await db.execute(select(Book)) → scalars().all() |
| 查单条(主键) |
await db.get(Book, 2) |
| 查第一条 |
result.scalars().first() |
| 查唯一或 None |
result.scalar_one_or_none() |
| 查唯一值并直接返回结果 |
result.scalar() |
简单示例:
1
2
3
4
5
6
7
|
@app.get("/books")
async def get_books(db: AsyncSession = Depends(get_db)):
stmt = select(Book) # 构造 Select 对象
result = session.execute(stmt) # 执行后得到 Result 对象
# 正常可以直接放一起写 result = session.execute(select(Book))
books = result.scalars().all()
return books
|
select(xxx) 返回的对象类型
在 SQLAlchemy 2.0+ 中,select(Book) 返回一个 Select 对象,属于 SQL 表达式构造器(SQL Expression Construct),尚未执行查询。
select(Book) 等价于 SELECT * FROM book(假设 Book 映射到 book 表)。
- 此对象需通过 Session 执行(如 前面示例当中
session.execute(stmt))才能获取结果。
- 执行后返回
Result 对象,用于进一步提取数据。
scalar() 与 scalars() 的区别
首先要说的是对于不同的对象,他们的使用方法不同,sqlalchemy当中不止一个类型有这两种方法
在 Result 对象中,也就是excute()的结果,用于从查询结果中提取数据,但语义和返回类型不同。
scalars()
- 用途:当查询返回单列结果(即使该列是实体对象)时,将每一行的第一列值提取为一个可迭代序列
- 返回类型:
ScalarResult[Book],可迭代,支持 all(), first(), one() 等方法
- 适用场景:查询返回list,里面包含多个
Book 实体
示例:
1
2
|
stmt = select(Book).where(Book.id < 5)
books = session.execute(stmt).scalars().all() # List[Book]
|
scalar()
- 用途:期望查询仅返回一行且一列,并直接返回该值
- 返回类型:单个值 而不是列表(如
Book 实例、int、str),若无结果返回 None
- 严格校验:
- 若结果为空,返回
None
- 若结果多于一行或一列,抛出
MultipleResultsFound 或 NoResultFound 异
示例:
1
2
|
stmt = select(Book).where(Book.id == 1)
book = session.execute(stmt).scalar() # Book | None
|
使用建议
- 查询多个对象 → 用
scalars().all()
- 查询唯一对象 → 用
scalar()
- 不确定结果数量 → 用
scalars().first()(返回第一个或 None,不抛异常)
注意:若 select() 包含多列(如 select(Book.id, Book.name)),scalars() 仅返回第一列(id),其余列被忽略。
在session对象中,scalar与scalars的使用方法其实就是上述的简化,少了excute操作
举个例子马上就知道了
1
2
3
|
stmt = select(Book).where(Book.id < 5)
# books = session.execute(stmt).scalars().all() # 原来的Result对象的方法
books = session.scalars(stmt).all()
|
在session中直接使用scalar与scalars方法就是将excute的过程省略掉了(实际上是自动执行掉了),想具体知道怎么回事的,去看库里的具体实现方式
在官方文档里面,其快速入门教学用的是第二种方法
两者的实际使用的区别就是一个当没有参数的方法用,一个将查询语句当参数去查,再具体项目中,哪个都可以用,但是第二种看上去简短一点,我再查询资料的时候发现都有人用,没有说哪种更标准,毕竟它两种用法都存在,就有存在的意义。
2. 条件查询(.where())
比较查询
1
2
3
4
5
|
# id == 5
select(Book).where(Book.id == 5)
# price > 100
select(Book).where(Book.price > 100)
|
主要是等于、大于、小于、大于等于、小于等于
模糊查询(like)
1
2
3
4
|
# 作者名以 "张" 开头
select(Book).where(Book.author.like("张%"))
# 作者名为 "张某"
select(Book).where(Book.author.like("张_"))
|
%: 后面跟 零个、一个或多个字符
_: 后面跟 一个单个字符
逻辑组合(注意括号)
1
2
3
4
5
6
7
8
9
10
|
from sqlalchemy import and_, or_, not_
# 与:必须加括号
.where((Book.author.like("张%")) & (Book.price > 100))
# 或
.where((Book.price < 10) | (Book.price > 100))
# 非
.where(~Book.author.like("王%"))
|
& / | / ~ 优先级高,必须用括号包裹条件
包含查询(in_)
1
2
|
id_list = [1, 3, 5]
select(Book).where(Book.id.in_(id_list))
|
3. 聚合查询(func.xxx)
1
2
3
4
5
6
7
8
9
10
|
from sqlalchemy import func
# 统计总数
select(func.count(Book.id))
# 平均价格
select(func.avg(Book.price))
# 最大/最小/求和
func.max(Book.price), func.min(Book.price), func.sum(Book.price)
|
聚合结果用 .scalar() 提取单个值。
1
2
3
4
|
@app.get("/book-avg-price")
async def avg_price(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(func.avg(Book.price)))
return {"avg_price": result.scalar()}
|
4. 分页查询(.offset().limit())
1
2
3
4
5
6
7
8
9
10
|
@app.get("/books")
async def get_books_page(
page: int = 1,
page_size: int = 10,
db: AsyncSession = Depends(get_db)
):
skip = (page - 1) * page_size
stmt = select(Book).offset(skip).limit(page_size)
books = (await db.execute(stmt)).scalars().all()
return books
|
page=2, page_size=10 → 跳过前 10 条,取第 11~20 条。
5.升降序查询 (order_by)
通过 Select.order_by() 方法对查询结果按指定字段排序。
升序(默认)
1
2
3
|
from sqlalchemy import select
stmt = select(Book).order_by(Book.price) # 按 price 升序
|
等价于 SQL:SELECT * FROM book ORDER BY price ASC
降序
使用 desc() 函数:
1
2
3
|
from sqlalchemy import select, desc
stmt = select(Book).order_by(desc(Book.price)) # 按 price 降序
|
等价于 SQL:SELECT * FROM book ORDER BY price DESC
多字段排序
可传入多个排序条件,按优先级依次生效:
1
2
3
|
from sqlalchemy import select, desc
stmt = select(Book).order_by(Book.author, desc(Book.price))
|
表示:先按 author 升序,若 author 相同,再按 price 降序。
执行示例
1
2
3
4
5
6
7
8
|
from sqlalchemy import select, desc
from sqlalchemy.ext.asyncio import AsyncSession
async def get_books_sorted(session: AsyncSession):
stmt = select(Book).order_by(desc(Book.price))
result = await session.execute(stmt)
books = result.scalars().all()
return books
|
注意事项:
order_by() 接收列对象(如 Book.price)或 desc()/asc() 包装的列。
asc() 可显式写出,但通常省略(默认即升序)。
- 排序字段必须是模型中定义的列,不能是任意表达式(除非使用
text() 或 column() 构造)。
所有排序操作在 SELECT 语句生成阶段完成,不涉及 Python 层排序,性能由数据库保证。
七、数据库增删改
1. 前置准备:Pydantic 模型(用于请求体)
1
2
3
4
5
6
7
8
9
10
11
12
13
|
from pydantic import BaseModel, Field
class BookCreate(BaseModel):
book_name: str = Field(..., max_length=255)
author: str = Field(..., max_length=255)
price: float = Field(gt=0)
publisher: str = Field(..., max_length=255)
class BookUpdate(BaseModel):
book_name: str | None = Field(None, max_length=255)
author: str | None = Field(None, max_length=255)
price: float | None = Field(None, gt=0)
publisher: str | None = Field(None, max_length=255)
|
使用 Field 做字段校验(长度、范围等)
Update 模型字段为可选(| None),支持部分更新
2、新增(Create)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
@app.post("/books", status_code=201)
async def create_book(
book: BookCreate,
db: AsyncSession = Depends(get_db)
):
# 方法1:字典展开 适用于字段名完全一致
db_book = Book(**book.model_dump())
# 方法2:显式赋值 更安全,可处理字段映射差异
# db_book = Book(
# book_name=book.book_name,
# author=book.author,
# price=book.price,
# publisher=book.publisher
# )
db.add(db_book)
await db.commit()
await db.refresh(db_book) # 刷新对象,获取数据库生成的值(如 id, create_time)
return db_book
|
注意:
status_code=201:RESTful 规范,创建成功返回 201
db.refresh():确保返回的对象包含数据库自动生成的字段(如时间戳)
.model_dump() 与 .__dict__ 结果相同,但是Pydantic v2推荐.model_dump()
3、更新(Update)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
@app.put("/books/{book_id}")
async def update_book(
book_id: int,
book_update: BookUpdate,
db: AsyncSession = Depends(get_db)
):
# 1. 查找
db_book = await db.get(Book, book_id)
if not db_book:
raise HTTPException(status_code=404, detail="Book not found")
# 2. 更新字段(仅更新非 None 字段)
update_data = book_update.model_dump(exclude_unset=True) # 只取传入的字段
for key, value in update_data.items():
setattr(db_book, key, value)
# 3. 提交
await db.commit()
await db.refresh(db_book) # 获取更新后的完整对象(含 update_time)
return db_book
|
关键优化:
exclude_unset=True:只更新客户端实际发送的字段(避免覆盖为 None)
- 使用
setattr 动态赋值,避免手动写每个字段
db.refresh() 获取更新后的 update_time
4、删除(Delete)
1
2
3
4
5
6
7
8
9
|
@app.delete("/books/{book_id}", status_code=204)
async def delete_book(book_id: int, db: AsyncSession = Depends(get_db)):
db_book = await db.get(Book, book_id)
if not db_book:
raise HTTPException(status_code=404, detail="Book not found")
await db.delete(db_book)
await db.commit()
# 204 No Content:删除成功不返回 body
|
RESTful 规范:删除成功返回 204 No Content
仍需先查后删,确保资源存在(避免误删)
八、CRUD 操作对比表
| 操作 |
HTTP 方法 |
状态码 |
关键方法 |
注意事项 |
| Create |
POST |
201 |
db.add() + commit() |
用 refresh() 获取自增 ID |
| Read |
GET |
200 |
select() / db.get() |
处理 404 |
| Update |
PUT/PATCH |
200 |
setattr + commit() |
支持部分更新(exclude_unset) |
| Delete |
DELETE |
204 |
db.delete() + commit() |
先查后删 |
PUT vs PATCH:
PUT:全量更新(所有字段必填)
PATCH:部分更新(字段可选)→ 当前用 BookUpdate 实际是 PATCH 语义,可考虑改用 @app.patch
九、问题
忘记 await db.commit()
数据不会写入数据库
更新时覆盖为 None
1
2
|
# 错误:客户端没传 author,author 变成 None
book.author = book_update.author # 如果 author 未传,值为 None
|
正确:在转化模型时,使用 exclude_unset=True + setattr
不处理 404
客户端无法区分“ID 不存在”和“服务器错误”