在 FastAPI 的 @app.on_event("startup") 中启动后台线程

在 FastAPI 项目中,某些任务可能需要在应用启动时运行,并在后台持续执行,如定时清理数据库、监控资源或定时获取数据等。为此,我们可以在 FastAPI 的 @app.on_event("startup") 中启动后台线程,确保应用在启动后立刻开始这些任务。

背景知识

FastAPI 是一个支持异步编程的 Python Web 框架,运行在异步事件循环上,能够高效处理 I/O 密集型任务。不过,在应用启动的过程中,有些任务(如 CPU 密集型任务)更适合使用传统的多线程来异步执行,以避免阻塞主事件循环。这时,可以使用 Python 标准库的 threading 模块,结合 @app.on_event("startup") 启动线程,以便在后台执行这些任务。

startup 中启动后台线程的方式

有几种方式可以在 startup 中启动后台线程,以下是几种常见的方法。

1. 使用 threading.Thread 启动后台线程

threading.Thread 是 Python 标准库提供的多线程模块,可以在启动应用时启动一个守护线程,让它在后台运行。

示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import threading
import time
from fastapi import FastAPI

app = FastAPI()

def background_task():
while True:
print("Running background task...")
time.sleep(10) # 模拟一个需要重复执行的任务

@app.on_event("startup")
async def startup():
# 创建并启动一个守护线程,以便在应用启动时开始执行后台任务
thread = threading.Thread(target=background_task, daemon=True)
thread.start()
print("Background thread has started.")

@app.on_event("shutdown")
async def shutdown():
print("Application is shutting down.")

解释

  • 守护线程daemon=True 表示该线程是守护线程,应用关闭时不会等待该线程结束,确保应用可以快速关闭。
  • 持续运行background_task 函数中的 while True 循环使得该线程持续执行,可以通过 time.sleep() 控制任务的频率。

2. 使用 concurrent.futures.ThreadPoolExecutor 启动线程池

如果有多个后台任务需要并行运行,concurrent.futures.ThreadPoolExecutor 是更好的选择,它可以创建一个线程池来管理多个后台任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from concurrent.futures import ThreadPoolExecutor
import time
from fastapi import FastAPI

app = FastAPI()
executor = ThreadPoolExecutor(max_workers=3)

def task_one():
while True:
print("Running task one...")
time.sleep(10)

def task_two():
while True:
print("Running task two...")
time.sleep(15)

@app.on_event("startup")
async def startup():
# 使用线程池启动多个后台任务
executor.submit(task_one)
executor.submit(task_two)
print("Background tasks have started.")

@app.on_event("shutdown")
async def shutdown():
executor.shutdown(wait=False) # 停止所有后台任务
print("Application is shutting down.")

解释

  • 多个任务:每个任务可以独立定义,并通过 executor.submit() 添加到线程池。
  • 线程池ThreadPoolExecutor 会自动管理线程,最大线程数通过 max_workers 控制。

3. 使用 asyncio.create_task 启动异步任务

如果任务是异步的,可以使用 asyncio.create_task 启动一个异步任务,这样就不需要创建新的线程。异步任务适合用于 I/O 密集型操作,比如网络请求、数据库访问等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio
from fastapi import FastAPI

app = FastAPI()

async def async_background_task():
while True:
print("Running async background task...")
await asyncio.sleep(10) # 模拟异步操作

@app.on_event("startup")
async def startup():
# 使用 asyncio 启动异步任务
asyncio.create_task(async_background_task())
print("Async background task has started.")

@app.on_event("shutdown")
async def shutdown():
print("Application is shutting down.")

解释

  • 异步任务async_background_task 使用 async def 声明,是一个异步函数。
  • 事件循环asyncio.create_task 在主事件循环上创建任务,而不需要额外线程。这对 I/O 密集型任务非常高效,因为异步任务会在等待 I/O 操作时让出控制权,不会阻塞主线程。

注意事项

  1. 守护线程:确保后台线程设置为守护线程(daemon=True),否则 FastAPI 关闭时会等待线程结束。
  2. 资源清理:在 @app.on_event("shutdown") 中执行清理操作,如释放数据库连接、关闭文件等,防止资源泄露。
  3. 选择合适的方式:根据任务类型选择合适的实现方式:
    • I/O 密集型任务使用 asyncio.create_task 启动异步任务。
    • CPU 密集型任务使用 threading.Thread 启动后台线程。
    • 多个并发任务使用 ThreadPoolExecutor 来管理。

结语

在 FastAPI 中启动后台线程,可以帮助我们实现一些自动化或定时操作,确保在应用启动时即开始执行这些任务。选择适合的方式可以使后台任务更加高效,同时减少对主线程的影响,从而提升应用的响应速度。希望本文帮助大家理解在 FastAPI 中启动后台线程的不同方法。