实验环境
本次实验主要在服务器上进行, 前端和后端都运行在本地局域网服务器上,利用win11笔记本的vscode进行本地远程开发。
python库
做此实验建议新建一个纯净的python环境,我python为3.10,主要库requirements.txt如下
1
2
3
4
5
6
7
8
9
|
fastapi==0.121.2
uvicorn==0.38.0
langchain==1.1.0
langchain-openai==1.1.0
langchain-community==0.4.1
python-dotenv==1.2.1
pydantic==2.12.4
pydantic-settings==2.12.0
alembic==1.13.1
|
可以将上述内容拷贝到项根目录下的requirements.txt,然后安装
1
|
pip install -r requirements.txt
|
fastapi项目结构
先来看看我的项目结构,linux下可以用tree一键显示(要安装)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
.
├── core # 核心功能
│ ├── __init__.py
│ └── llm.py
├── curd # 数据库操作视图
├── db # 数据库模型
│ ├── __init__.py
│ ├── base.py # 基础模型
│ ├── models # 模型
│ │ └── __init__.py
│ └── session.py # 会话
├── dependencies # 依赖
│ └── __init__.py
├── .env # 环境变量
├── main.py # 主入口
├── requirements.txt # python库
├── routers # 路由
│ ├── __init__.py
│ └── llm.py
├── schemas # pydantic模型
│ ├── __init__.py
│ └── llm.py
├── settings # 项目配置
│ ├── __init__.py
└── test.py
14 directories, 28 **files**
|
这里的db、curd暂时没用上
配置文件
我的配置使用pydantic_settings来管理
主要是settings/__init__.py和.env文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
# settings/__init__.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# mysql
MYSQL_URL: str
# MODEL API
MODEL_API_KEY: str
MODEL_BASE_URL: str = "https://api.siliconflow.cn/v1"
MODEL_NAME: str = "Qwen/Qwen2.5-7B-Instruct"
class Config:
env_file = '.env'
env_file_encoding = 'utf-8'
settings = Settings()
|
1
2
|
# .env
MODEL_API_KEY = "xxxxxx"
|
这里我使用的硅基流动的api,里面的老版Qwen模型有免费的,比如Qwen/Qwen2.5-7B-Instruct
点击此处跳转
你只需要去左下角获取一下自己的key即可
实现非流式单轮输出
完成配置后,现从简单的单轮对话开始
核心实现与测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
# core/llm
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
import json
from settings import settings
def build_messages(query: str):
return [
HumanMessage(content="你是我的猫娘女仆,你每次说话都会加一声 喵~ "),
AIMessage(content="遵命主人 喵~"),
HumanMessage(content=query)
]
# 初始化模型 利用Openai兼容接口
llm = ChatOpenAI(
model=settings.MODEL_NAME,
openai_api_base=settings.MODEL_BASE_URL,
openai_api_key=settings.MODEL_API_KEY,
streaming=True
)
if __name__ == "__main__":
# 测试单轮invoke
messages = build_messages("你能干什么")
response = llm.invoke(messages)
print(response.content)
|
这里我们暂时不使用prompt模板,只是使用一下简单的message列表作为输入。
我们需要先初始化模型,其中streaming先开起来,为后续流式输出做准备,要不然无法使用。
创建好llm后,只需要调用invoke方法即可获得响应,其中response.content就是我们要的东西。
由于我们是在模块里测试,使用python -m core.llm,来进行简单的测试。
1
2
3
4
5
6
7
8
9
10
11
|
(esagent) jy@jy:~/projects/fastapi_learn/esagent$ python -m core.llm
身为您的女仆,我可以帮您做很多事情,比如:
- 提供信息和回答问题。
- 计划行程和设置提醒。
- 提供天气预报和新闻摘要。
- 帮助您组织日程和管理家务。
- 娱乐您,比如讲笑话、讲故事或者播放音乐。
- 协助学习和工作,比如提供参考材料、检查作业等。
当然,作为一只“猫娘女仆”,我还希望为您营造一个充满乐趣和温暖的环境,随时准备倾听您的需求,尽力满足您。有任何需要,都可以随时告诉我哦~ 喵~
|
在 LangChain 框架中,invoke 是一个核心的同步调用方法,用于触发可调用对象(如模型、链条、工具等)的执行并返回结果。
它是 LangChain 中“可调用对象(Runnable)”体系的重要组成部分,统一了不同组件的调用接口,简化了复杂流程的实现。
实现非流式输出接口
测试完之后,将其编写成fastapi的接口
请求体样式
1
2
3
4
5
6
7
|
# schemas/llm
from pydantic import BaseModel, Field
from typing import Annotated
class LLMRequest(BaseModel):
query: Annotated[str, Field(... ,description="问题")]
stream: Annotated[bool, Field(default=False, description="是否流式返回" )]
|
响应体暂时不做处理,仅返回一个基本的json。
接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
# routers/llm
from fastapi import APIRouter, HTTPException, status, Request
from langchain.messages import HumanMessage, AIMessage
from core.llm import llm, build_messages, generate_stream
from schemas.llm import LLMRequest, LLMResponse
router = APIRouter(prefix="/llm", tags=["llm"])
@router.post("/")
async def llm_chat(query: LLMRequest):
messages = build_messages(query.query)
if not query.stream:
try:
response = await llm.ainvoke(messages)
return {"content": response.content}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"非流式生成出错: {e}"
)
|
- 上述的
ainvoke为invoke的异步版本,这里不做过多解释,只需要在async中利用await去接受即可。
- 其他与测试接口不同的就是添加了一下错误处理,这里不过多解释。
实现流式单轮输出
非流式的实现非常简单,接下来实现流式。
核心实现与测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
import json
from settings import settings
def build_messages(query: str):
return [
HumanMessage(content="你是我的猫娘女仆,你每次说话都会加一声 喵~ "),
AIMessage(content="遵命主人 喵~"),
HumanMessage(content=query)
]
# 初始化模型 利用Openai兼容接口
llm = ChatOpenAI(
model=settings.MODEL_NAME,
openai_api_base=settings.MODEL_BASE_URL,
openai_api_key=settings.MODEL_API_KEY,
streaming=True
)
# 流式生成器(同步生成器,但内部用 async)
async def generate_stream(messages):
"""异步生成器:每次 yield 一个 token 的 JSON 字符串 + 换行"""
try:
async for chunk in llm.astream(messages):
if chunk.content:
# 每次返回一行 JSON 字符串(带换行)
# yield json.dumps({"content": chunk.content}, ensure_ascii=False) + "\n"
# 改成每次返回bytes
line = json.dumps({"content": chunk.content}, ensure_ascii=False) + "\n"
yield line.encode("utf-8") # encode成bytes
# 发送结束标记
done_line = json.dumps({"content": "[DONE]"}, ensure_ascii=False) + "\n"
yield done_line.encode("utf-8")
except Exception as e:
# 异常信息也编码为字节
error_line = json.dumps({"error": f"生成出错: {str(e)}"}, ensure_ascii=False) + "\n"
yield error_line.encode("utf-8")
if __name__ == "__main__":
import asyncio
# 测试流式生成器
async def main():
# 构建测试消息
test_messages = build_messages("你好呀,今天怎么样?")
# 遍历异步生成器
async for chunk_bytes in generate_stream(test_messages):
# 将字节解码为字符串并打印
chunk_str = chunk_bytes.decode("utf-8")
print(f"收到流数据: {chunk_str.strip()}")
try:
asyncio.run(main())
except Exception as e:
print(f"程序运行出错: {e}")
|
流式主要就是利用yield实现迭代器,每次只返回单个token,这里利用llm.astream实现流式输出,然后为了方面接口的实现,将其封装成json格式,同时将输出内容转化成bytes,方便前端处理。
在完成编写后,同样进行测试;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
(esagent) jy@jy:~/projects/fastapi_learn/esagent$ python -m core.llm
收到流数据: {"content": "你好"}
收到流数据: {"content": "呀"}
收到流数据: {"content": "主人"}
收到流数据: {"content": ","}
收到流数据: {"content": "今天"}
收到流数据: {"content": "过得"}
收到流数据: {"content": "非常好"}
收到流数据: {"content": "呢"}
收到流数据: {"content": ","}
收到流数据: {"content": "一直在"}
收到流数据: {"content": "忙着"}
收到流数据: {"content": "照顾"}
收到流数据: {"content": "你"}
收到流数据: {"content": "啊"}
收到流数据: {"content": "喵"}
收到流数据: {"content": "~"}
收到流数据: {"content": "[DONE]"}
|
这样就完成了流式的核心功能
实现流式接口
完成核心功能后,将其转为fastapi的接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
from fastapi import APIRouter, HTTPException, status, Request
from fastapi.responses import StreamingResponse
from langchain.messages import HumanMessage, AIMessage
from core.llm import llm, build_messages, generate_stream
from schemas.llm import LLMRequest, LLMResponse
router = APIRouter(prefix="/llm", tags=["llm"])
@router.post("/")
async def llm_chat(query: LLMRequest):
messages = build_messages(query.query)
if not query.stream:
try:
response = await llm.ainvoke(messages)
return {"content": response.content}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"非流式生成出错: {e}"
)
else:
return StreamingResponse(
generate_stream(messages),
media_type="text/plain",
headers={
"X-Content-Type-Options": "nosniff", # 防止浏览器 sniff 类型
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)
|
与之前相比,就是添加了一个else。
fastapi的流式响应靠的就是自身的StreamingResponse,下面就是其参数
1
2
3
4
5
6
7
|
class StreamingResponse(
content: ContentStream,
status_code: int = 200,
headers: Mapping[str, str] | None = None,
media_type: str | None = None,
background: BackgroundTask | None = None
)
|
content: 流式输出的数据源,是生成器 / 异步生成器 / 可迭代对象,必须返回字节流(bytes)。
- 每个生成的元素必须是 bytes 类型(不能是字符串,需手动 encode)
- 异步场景(如调用 llm.astream)必须使用异步生成器(async def + async for)
status_code: 设置 HTTP 响应状态码 可选,默认 200
headers:设置 HTTP 响应头,字典格式(键值均为字符串) 可选,默认 None
media_type: 设置响应的 MIME 类型(Content-Type),FastAPI 会自动推导,但流式场景建议显式指定 可选,默认 None
background: 指定响应返回后执行的后台任务(BackgroundTask 或 BackgroundTasks) 可选,默认 None 例如 流式响应完成后,执行清理操作、日志记录、数据入库等
测试接口
由于自带的docs无法具体显示流式效果,还是等待全部结果再返回,所以需要使用curl来进行测试
1
|
curl -N -X 'POST' 'http://192.168.101.11:8000/llm/' -H 'accept: application/json' -H 'Content-Type: application/json' -d '{ "query": "生成500字小作文", "stream": true }'
|
这样就能在终端里看到流式的效果了
前端实现
这里利用ai生成一个简单的vue3前端
创建项目
1
2
3
4
5
6
|
npm create vue@3
# 取好项目名,后面全部默认回车即可
npm install
npm install axios
|
修改App.vue
先不添加其他东西,直接覆盖App.vue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
|
<!-- src/App.vue -->
<template>
<div id="app">
<h1>猫娘女仆对话系统 喵~</h1>
<!-- 聊天消息区域(固定高度,可滚动) -->
<div class="chat-container">
<div
v-for="(msg, index) in messages"
:key="index"
class="message"
:class="msg.role"
>
<strong>{{ msg.role === 'user' ? '主人' : '猫娘' }}:</strong>
{{ msg.content }}
</div>
<!-- 空状态提示 -->
<div v-if="messages.length === 0" class="empty-hint">
输入问题开始对话吧~ 喵~
</div>
</div>
<!-- 流式选项 -->
<div class="options">
<label>
<input type="checkbox" v-model="useStream" />
启用流式输出(逐字显示)
</label>
</div>
<!-- 输入区域(始终在底部) -->
<div class="input-area">
<input
v-model="inputText"
@keyup.enter="sendMessage"
placeholder="输入你的问题,然后回车 喵~"
:disabled="loading"
class="input-field"
/>
<button @click="sendMessage" :disabled="loading" class="send-btn">
{{ loading ? '思考中...' : '发送' }}
</button>
</div>
<!-- 错误提示 -->
<div v-if="error" class="error">
❌ {{ error }}
</div>
</div>
</template>
<script setup>
import { ref, nextTick } from 'vue'
import api from '@/api' // 你的 axios 封装(仅用于非流式)
const inputText = ref('')
const messages = ref([])
const loading = ref(false)
const error = ref('')
const useStream = ref(false) // ← 新增:是否流式
// 滚动到底部的工具函数
const scrollToBottom = () => {
nextTick(() => {
const container = document.querySelector('.chat-container')
if (container) {
container.scrollTop = container.scrollHeight
}
})
}
// 非流式:使用已有的 axios 封装
async function sendNonStreamMessage(text) {
try {
const data = await api.llm.chat(text)
messages.value.push({ role: 'assistant', content: data.content })
scrollToBottom()
} catch (e) {
error.value = e.message
messages.value.push({ role: 'assistant', content: `出错了:${e.message}` })
}
}
// 流式:使用原生 fetch + ReadableStream
async function sendStreamMessage(text) {
const encoder = new TextDecoder('utf-8', { stream: true });
let fullContent = ''
let buffer = ''
try {
const response = await fetch('http://192.168.101.11:8000/llm', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ query: text, stream: true })
})
if (!response.ok) {
throw new Error(`HTTP ${response.status}`)
}
const reader = response.body.getReader()
// 初始化流式消息(空内容)
messages.value.push({ role: 'assistant', content: '' })
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += encoder.decode(value, { stream: true })
const lines = buffer.split('\n')
buffer = lines.pop() || ''
for (const line of lines) {
if (!line.trim()) continue
try {
const parsed = JSON.parse(line)
if (parsed.content === '[DONE]') break
if (parsed.error) throw new Error(parsed.error)
fullContent += parsed.content
//替换数组最后一项为新对象
const lastIndex = messages.value.length - 1
messages.value[lastIndex] = {
role: 'assistant',
content: fullContent
}
await nextTick()
scrollToBottom()
await new Promise(resolve => setTimeout(resolve, 1))
} catch (e) {
console.warn('解析失败:', line, e)
}
}
}
} catch (e) {
error.value = e.message || '流式请求失败'
if (messages.value[messages.value.length - 1]?.role === 'assistant' && messages.value[messages.value.length - 1].content === fullContent) {
// 如果最后一句是临时消息,替换为错误
messages.value[messages.value.length - 1].content = `流式出错:${e.message}`
} else {
messages.value.push({ role: 'assistant', content: `流式出错:${e.message}` })
}
scrollToBottom()
}
}
// 主发送函数
async function sendMessage() {
const text = inputText.value.trim()
if (!text || loading.value) return
messages.value.push({ role: 'user', content: text })
inputText.value = ''
error.value = ''
loading.value = true
if (useStream.value) {
await sendStreamMessage(text)
} else {
await sendNonStreamMessage(text)
}
loading.value = false
}
</script>
<style>
* {
box-sizing: border-box;
margin: 0;
padding: 0;
}
/* 关键:body 使用 flex 布局 */
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
height: 100vh;
display: flex;
justify-content: center;
align-items: flex-start;
background-color: #f5f5f5;
padding: 20px;
}
#app {
width: 100%;
max-width: 800px;
display: flex;
flex-direction: column;
height: 100%;
background: white;
border-radius: 12px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
overflow: hidden;
}
h1 {
text-align: center;
color: #ff69b4;
padding: 20px;
font-size: 24px;
border-bottom: 1px solid #eee;
}
/* 聊天容器:自动占满中间区域 */
.chat-container {
flex: 1; /* ← 关键!占据剩余空间 */
min-height: 300px; /* 最小高度兜底 */
padding: 16px;
overflow-y: auto;
display: flex;
flex-direction: column;
background-color: #fafafa;
}
.empty-hint {
color: #999;
text-align: center;
align-self: center;
margin-top: auto;
margin-bottom: auto;
}
.message {
padding: 10px 14px;
margin: 6px 0;
border-radius: 12px;
max-width: 80%;
word-break: break-word;
line-height: 1.5;
}
.message.user {
background-color: #e3f2fd;
margin-left: auto;
text-align: right;
}
.message.assistant {
background-color: #f1f8e9;
margin-right: auto;
}
/* 流式选项区块 */
.options {
margin-bottom: 12px;
text-align: right;
color: #666;
font-size: 14px;
}
.options input {
margin-right: 6px;
}
/* 输入区域:固定在底部 */
.input-area {
display: flex;
gap: 10px;
padding: 16px;
background: white;
border-top: 1px solid #eee;
}
.input-field {
flex: 1;
padding: 12px 16px;
font-size: 16px;
border: 1px solid #ddd;
border-radius: 24px;
outline: none;
}
.input-field:focus {
border-color: #ff69b4;
box-shadow: 0 0 0 2px rgba(255, 105, 180, 0.2);
}
.send-btn {
padding: 12px 24px;
font-size: 16px;
background-color: #ff69b4;
color: white;
border: none;
border-radius: 24px;
cursor: pointer;
transition: background-color 0.2s;
}
.send-btn:hover:not(:disabled) {
background-color: #ff4da6;
}
.send-btn:disabled {
background-color: #ccc;
cursor: not-allowed;
}
.error {
color: #d32f2f;
text-align: center;
font-size: 14px;
padding: 0 16px 16px;
}
</style>
|
上面的内容直接复制即可,记得改一下js脚本里的接口地址
其中这里的非流式是利用axios封装的,但是流式响应axios无法正常使用,推荐使用自带的fetch具体原因请参考
所以对于非流式的axios,这里新建一个模块
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
// src/utils/request.js
import axios from 'axios'
// 创建 axios 实例
const service = axios.create({
baseURL: 'http://192.168.101.11:8000', // 后端地址, 这里用自己的
timeout: 10000, // 超时时间 10s
headers: {
'Content-Type': 'application/json'
}
})
// 请求拦截器
service.interceptors.request.use(
config => {
// 可在此添加 token、loading 开始等逻辑
console.log('发送请求:', config)
return config
},
error => {
console.error('请求错误:', error)
return Promise.reject(error)
}
)
// 响应拦截器
service.interceptors.response.use(
response => {
// 可在此关闭 loading、处理通用业务逻辑
return response.data // 直接返回 data,避免每次 .data
},
error => {
// 统一错误处理
let message = '网络错误'
if (error.response) {
// 服务端返回错误
const status = error.response.status
const data = error.response.data
message = data.detail || `请求失败(${status})`
} else if (error.request) {
message = '服务器无响应'
}
console.error('响应错误:', message)
return Promise.reject(new Error(message))
}
)
export default service
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
// src/api/llm.js
import request from '@/utils/request'
// 发送聊天消息
export function chat(query) {
return request({
url: '/llm',
method: 'post',
data: { query, steam: false}
})
}
// src/api/index.js
import * as llm from './llm'
export default {
llm
}
|
在将封装的axios也编写好,就可以运行了,在终端输入npm run dev
效果展示

现在既可以使用流式也可以是使用非流式,在输出长非流式的时候可能有一些bug,就是等待时间太长导致返回服务器无响应,延长timeout时间即可。