主页
文章
分类
标签
关于
Fastapi的ORM初始化以及增删改查
发布于: 2025-7-4   更新于: 2025-7-4   收录于: Fastapi
文章字数: 4040   阅读时间: 9 分钟   阅读量:

FastAPI + SQLAlchemy 异步 ORM

基于 SQLAlchemy 2.0+ 异步 API + FastAPI
数据库:MySQL,通过 aiomysql
核心目标:用 ORM 操作数据库,无需手写 SQL


一、什么是ORM

ORM(Object-Relational Mapping,对象关系映射)
让开发者通过操作 Python 对象(模型类)来操作关系型数据库,自动转换为 SQL 语句

作用

  • 减少重复 SQL,避免手写易错语句
  • 代码可读性强,面向对象风格
  • 自动防 SQL 注入,参数化查询
  • 自动管理连接、事务、连接池

FastAPI 官方推荐 ORM:SQLAlchemy ,支持同步/异步


二、环境准备

1
2
# 安装依赖
pip install "sqlalchemy[asyncio]" aiomysql
  • sqlalchemy[asyncio]:启用异步支持
  • aiomysql:MySQL 的异步驱动

三、数据库连接与模型定义

1. 配置数据库 URL

1
2
3
4
# 确保数据库已创建:
# CREATE DATABASE fastapi_test CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/fastapi_test?charset=utf8mb4"

2. 创建异步引擎

1
2
3
4
5
6
7
8
from sqlalchemy.ext.asyncio import create_async_engine

async_engine = create_async_engine(
    ASYNC_DATABASE_URL,
    echo=True,           # 打印 SQL 日志(开发用)
    pool_size=10,        # 连接池大小
    max_overflow=20,     # 超出 pool_size 的最大连接数
)

3. 定义模型(使用 SQLAlchemy 2.0 风格)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import Integer, String, Float, DateTime, func
from datetime import datetime

class Base(DeclarativeBase):
    """基类:通用字段"""
    create_time: Mapped[datetime] = mapped_column(
        DateTime, 
        default=func.now(), 
        comment="创建时间"
    )
    update_time: Mapped[datetime] = mapped_column(
        DateTime, 
        default=func.now(), 
        onupdate=func.now(), 
        comment="更新时间"
    )

class Book(Base):
    __tablename__ = "book"
    
    id: Mapped[int] = mapped_column(Integer, primary_key=True, comment="书籍ID")
    book_name: Mapped[str] = mapped_column(String(255), comment="书名")
    author: Mapped[str] = mapped_column(String(255), comment="作者")
    price: Mapped[float] = mapped_column(Float, comment="价格")
    publisher: Mapped[str] = mapped_column(String(255), comment="出版社")

class User(Base):
    __tablename__ = "user"
    
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    name: Mapped[str] = mapped_column(String(255))
    password: Mapped[str] = mapped_column(String(255))

注意

  • 使用 Mapped + mapped_column 是 SQLAlchemy 2.0 推荐方式,类型安全且简洁。
  • Mapped[int]:告诉类型检查器,这个属性在实例上是 int 类型,利用python原生类型进行定义
  • mapped_column(Integer, xxx):对应数据库列行为,定义orm的数据类型Integer
  • 运行时,book.id 是 int,不是 Column 对象,如果说是老版本的写法就是id = Column(Integer, …),那id就是Column对象,而不是原生int对象,所以在2.0版本Fastapi推荐Mappedmapped_cloumn的结合用法

四、应用生命周期 启动应用自动建表

不要用已废弃的 @app.on_event,使用 lifespan

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from fastapi import FastAPI
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import AsyncGenerator

async def create_tables():
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator:
    # 启动时:建表
    await create_tables()
    yield  # 应用运行中
    # 关闭时 这里没有

app = FastAPI(lifespan=lifespan)

表只建一次;若表结构变更,需手动 DROP 或用 Alembic等迁移工具。


五、数据库会话依赖注入

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker

AsyncSessionLocal = async_sessionmaker(
    bind=async_engine,
    class_=AsyncSession,
    expire_on_commit=False,  # 提交后对象不过期,可继续使用
)

async def get_db():
    """依赖:提供数据库会话"""
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()

在路由中通过 Depends(get_db) 注入会话,自动管理事务与连接。


六、数据库查询操作

一般格式

1
2
result = await db.execute(select(Model).where(...))
data = result.scalars().all()  # 或 .first(), .scalar_one_or_none() 等

1. 基础查询

操作 代码示例
查所有 await db.execute(select(Book))scalars().all()
查单条(主键) await db.get(Book, 2)
查第一条 result.scalars().first()
查唯一或 None result.scalar_one_or_none()
查唯一值并直接返回结果 result.scalar()

简单示例

1
2
3
4
5
6
7
@app.get("/books")
async def get_books(db: AsyncSession = Depends(get_db)):
    stmt = select(Book)  # 构造 Select 对象
    result = session.execute(stmt)  # 执行后得到 Result 对象
    # 正常可以直接放一起写 result = session.execute(select(Book))
    books = result.scalars().all()
    return books

select(xxx) 返回的对象类型

在 SQLAlchemy 2.0+ 中,select(Book) 返回一个 Select 对象,属于 SQL 表达式构造器(SQL Expression Construct),尚未执行查询。

  • select(Book) 等价于 SELECT * FROM book(假设 Book 映射到 book 表)。
  • 此对象需通过 Session 执行(如 前面示例当中 session.execute(stmt))才能获取结果。
  • 执行后返回 Result 对象,用于进一步提取数据。

scalar() 与 scalars() 的区别

首先要说的是对于不同的对象,他们的使用方法不同,sqlalchemy当中不止一个类型有这两种方法

Result 对象中,也就是excute()的结果,用于从查询结果中提取数据,但语义和返回类型不同。

scalars()

  • 用途:当查询返回单列结果(即使该列是实体对象)时,将每一行的第一列值提取为一个可迭代序列
  • 返回类型ScalarResult[Book],可迭代,支持 all(), first(), one() 等方法
  • 适用场景:查询返回list,里面包含多个 Book 实体

示例:

1
2
stmt = select(Book).where(Book.id < 5)
books = session.execute(stmt).scalars().all()  # List[Book]

scalar()

  • 用途:期望查询仅返回一行且一列,并直接返回该值
  • 返回类型:单个值 而不是列表(如 Book 实例、intstr),若无结果返回 None
  • 严格校验
    • 若结果为空,返回 None
    • 若结果多于一行或一列,抛出 MultipleResultsFoundNoResultFound

示例:

1
2
stmt = select(Book).where(Book.id == 1)
book = session.execute(stmt).scalar()  # Book | None

使用建议

  • 查询多个对象 → 用 scalars().all()
  • 查询唯一对象 → 用 scalar()
  • 不确定结果数量 → 用 scalars().first()(返回第一个或 None,不抛异常)

注意:若 select() 包含多列(如 select(Book.id, Book.name)),scalars() 仅返回第一列(id),其余列被忽略。


session对象中,scalarscalars的使用方法其实就是上述的简化,少了excute操作

举个例子马上就知道了

1
2
3
stmt = select(Book).where(Book.id < 5)
# books = session.execute(stmt).scalars().all()  # 原来的Result对象的方法
books = session.scalars(stmt).all()

在session中直接使用scalar与scalars方法就是将excute的过程省略掉了(实际上是自动执行掉了),想具体知道怎么回事的,去看库里的具体实现方式

在官方文档里面,其快速入门教学用的是第二种方法

两者的实际使用的区别就是一个当没有参数的方法用,一个将查询语句当参数去查,再具体项目中,哪个都可以用,但是第二种看上去简短一点,我再查询资料的时候发现都有人用,没有说哪种更标准,毕竟它两种用法都存在,就有存在的意义。


2. 条件查询(.where()

比较查询

1
2
3
4
5
# id == 5
select(Book).where(Book.id == 5)

# price > 100
select(Book).where(Book.price > 100)

主要是等于、大于、小于、大于等于、小于等于

模糊查询(like

1
2
3
4
# 作者名以 "张" 开头
select(Book).where(Book.author.like("张%"))
# 作者名为 "张某"
 select(Book).where(Book.author.like("张_"))
  • %: 后面跟 零个、一个或多个字符
  • _: 后面跟 一个单个字符

逻辑组合(注意括号

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from sqlalchemy import and_, or_, not_

# 与:必须加括号
.where((Book.author.like("张%")) & (Book.price > 100))

# 或
.where((Book.price < 10) | (Book.price > 100))

# 非
.where(~Book.author.like("王%"))

& / | / ~ 优先级高,必须用括号包裹条件

包含查询(in_

1
2
id_list = [1, 3, 5]
select(Book).where(Book.id.in_(id_list))

3. 聚合查询(func.xxx

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from sqlalchemy import func

# 统计总数
select(func.count(Book.id))

# 平均价格
select(func.avg(Book.price))

# 最大/最小/求和
func.max(Book.price), func.min(Book.price), func.sum(Book.price)

聚合结果用 .scalar() 提取单个值。

1
2
3
4
@app.get("/book-avg-price")
async def avg_price(db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(func.avg(Book.price)))
    return {"avg_price": result.scalar()}

4. 分页查询(.offset().limit()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
@app.get("/books")
async def get_books_page(
    page: int = 1,
    page_size: int = 10,
    db: AsyncSession = Depends(get_db)
):
    skip = (page - 1) * page_size
    stmt = select(Book).offset(skip).limit(page_size)
    books = (await db.execute(stmt)).scalars().all()
    return books

page=2, page_size=10 → 跳过前 10 条,取第 11~20 条。

5.升降序查询 (order_by)

通过 Select.order_by() 方法对查询结果按指定字段排序。

升序(默认)

1
2
3
from sqlalchemy import select

stmt = select(Book).order_by(Book.price)  # 按 price 升序

等价于 SQL:SELECT * FROM book ORDER BY price ASC

降序

使用 desc() 函数:

1
2
3
from sqlalchemy import select, desc

stmt = select(Book).order_by(desc(Book.price))  # 按 price 降序

等价于 SQL:SELECT * FROM book ORDER BY price DESC

多字段排序

可传入多个排序条件,按优先级依次生效:

1
2
3
from sqlalchemy import select, desc

stmt = select(Book).order_by(Book.author, desc(Book.price))

表示:先按 author 升序,若 author 相同,再按 price 降序。

执行示例

1
2
3
4
5
6
7
8
from sqlalchemy import select, desc
from sqlalchemy.ext.asyncio import AsyncSession

async def get_books_sorted(session: AsyncSession):
    stmt = select(Book).order_by(desc(Book.price))
    result = await session.execute(stmt)
    books = result.scalars().all()
    return books

注意事项:

  • order_by() 接收列对象(如 Book.price)或 desc()/asc() 包装的列。
  • asc() 可显式写出,但通常省略(默认即升序)。
  • 排序字段必须是模型中定义的列,不能是任意表达式(除非使用 text()column() 构造)。

所有排序操作在 SELECT 语句生成阶段完成,不涉及 Python 层排序,性能由数据库保证。


七、数据库增删改

1. 前置准备:Pydantic 模型(用于请求体)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from pydantic import BaseModel, Field

class BookCreate(BaseModel):
    book_name: str = Field(..., max_length=255)
    author: str = Field(..., max_length=255)
    price: float = Field(gt=0)
    publisher: str = Field(..., max_length=255)

class BookUpdate(BaseModel):
    book_name: str | None = Field(None, max_length=255)
    author: str | None = Field(None, max_length=255)
    price: float | None = Field(None, gt=0)
    publisher: str | None = Field(None, max_length=255)

使用 Field 做字段校验(长度、范围等)
Update 模型字段为可选(| None),支持部分更新


2、新增(Create)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
@app.post("/books", status_code=201)
async def create_book(
    book: BookCreate,
    db: AsyncSession = Depends(get_db)
):
    # 方法1:字典展开 适用于字段名完全一致
    db_book = Book(**book.model_dump())  
    
    # 方法2:显式赋值 更安全,可处理字段映射差异
    # db_book = Book(
    #     book_name=book.book_name,
    #     author=book.author,
    #     price=book.price,
    #     publisher=book.publisher
    # )
    
    db.add(db_book)
    await db.commit()
    await db.refresh(db_book)  # 刷新对象,获取数据库生成的值(如 id, create_time)
    return db_book

注意:

  • status_code=201:RESTful 规范,创建成功返回 201
  • db.refresh():确保返回的对象包含数据库自动生成的字段(如时间戳)
  • .model_dump().__dict__ 结果相同,但是Pydantic v2推荐.model_dump()

3、更新(Update)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
@app.put("/books/{book_id}")
async def update_book(
    book_id: int,
    book_update: BookUpdate,
    db: AsyncSession = Depends(get_db)
):
    # 1. 查找
    db_book = await db.get(Book, book_id)
    if not db_book:
        raise HTTPException(status_code=404, detail="Book not found")
    
    # 2. 更新字段(仅更新非 None 字段)
    update_data = book_update.model_dump(exclude_unset=True)  # 只取传入的字段
    for key, value in update_data.items():
        setattr(db_book, key, value)
    
    # 3. 提交
    await db.commit()
    await db.refresh(db_book)  # 获取更新后的完整对象(含 update_time)
    return db_book

关键优化

  • exclude_unset=True:只更新客户端实际发送的字段(避免覆盖为 None
  • 使用 setattr 动态赋值,避免手动写每个字段
  • db.refresh() 获取更新后的 update_time

4、删除(Delete)

1
2
3
4
5
6
7
8
9
@app.delete("/books/{book_id}", status_code=204)
async def delete_book(book_id: int, db: AsyncSession = Depends(get_db)):
    db_book = await db.get(Book, book_id)
    if not db_book:
        raise HTTPException(status_code=404, detail="Book not found")
    
    await db.delete(db_book)
    await db.commit()
    # 204 No Content:删除成功不返回 body

RESTful 规范:删除成功返回 204 No Content
仍需先查后删,确保资源存在(避免误删)


八、CRUD 操作对比表

操作 HTTP 方法 状态码 关键方法 注意事项
Create POST 201 db.add() + commit() refresh() 获取自增 ID
Read GET 200 select() / db.get() 处理 404
Update PUT/PATCH 200 setattr + commit() 支持部分更新(exclude_unset
Delete DELETE 204 db.delete() + commit() 先查后删

PUT vs PATCH

  • PUT:全量更新(所有字段必填)
  • PATCH:部分更新(字段可选)→ 当前用 BookUpdate 实际是 PATCH 语义,可考虑改用 @app.patch

九、问题

忘记 await db.commit()

数据不会写入数据库

更新时覆盖为 None

1
2
# 错误:客户端没传 author,author 变成 None
book.author = book_update.author  # 如果 author 未传,值为 None

正确:在转化模型时,使用 exclude_unset=True + setattr

不处理 404

客户端无法区分“ID 不存在”和“服务器错误”