Uvicorn 原理及源码分析

Uvicorn[1]是一个基于 ASGI(Asynchronous Server Gateway Interface) 的轻量级 Web 服务器,专为异步 Web 应用程序而设计。

uvicorn

想要弄清楚 Uvicorn 的原理,并非一件很难的事,本文中将会对其原理及源码做详细的解析。在 Uvicorn 中涉及到的专业术语会比较多,我们会在下面的文章中详细阐述。在这里需要知道的是, ASGI 是 Python 社区为了支持异步 Web 开发而制定的一种标准,它允许 Web 框架和服务器之间通过异步的方式进行通信。目的是利用了 ASGI 的异步特性,提供了高性能、低延迟的Web服务。

1. 背景

要理解 Uvicorn 的基本原理,首先需要了解其产生的背景,这里面就会涉及到大量的专业术语。

1.1. Web 服务

首先, Uvicorn 是服务于 Python 的 Web 应用服务的。通常,我们利用一些 Python 的 Web 框架,如 Django[2],Flask[3] 等,去构建一个 Web 服务(包括网站或者接口服务等),此时我们称之为 Web 应用程序,用于处理用户的具体需求。在实际的开发过程中,我们或许并没有感知,以为开发完成的 Web 应用程序就是一个完整的 Web 服务,有这种感受是因为在这些 Web 框架中都自带了 Web 服务器,以 Flask 框架为例,其中就自带了 Werkzeug[4] 工具库。

注意: Werkzeug 并不是一个 Web 服务器或 Web 框架,而只是一个工具包,它封装了许多 Web 框架所需的功能。

这些自带的 Web 服务器或者框架性能相对比较差,一般不适合在生产环境投入使用,以 Flask 为例,有如下简单的代码:

from flask import Flask

app = Flask(__name__)

@app.route("/")
def hello_world():
	return "Hello, World!"

if __name__ == "__main__":
	app.run(host='0.0.0.0', port=9000)

保存成文件 app.py ,此时使用 python app.py 启动程序后会出现“ WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead. ”,这也是在告诉我们此时的 Web 服务器的性能相对较弱,在生产环境中需要更换更加强大的 WSGI 服务器。至此,对于一个完整的 Web 服务而言,由两部分组成,一部分是 Web 服务器,另一部分是 Web 应用程序。Web 服务器负责接收用户的请求,并将请求转发给 Web 应用程序完成相对应的功能,并将结果返回给用户,两者之间的关系如下所示:

WSGI

1.2. 从 WSGI 到 ASGI

上面提到在 Web 框架中提供的 Web 服务器的性能相对较弱,这时候就得寄希望于一些专业的 Web 服务器,目前使用较多的如 Nginx,Apache等,同时还有 Python 中使用较多的 gunicorn ,uvicorn 等。对于一个完整的 Web 服务而言,由两部分组成,一部分是 Web 服务器,另一部分是 Web 应用程序。 Web 服务器负责接收用户的请求,并将请求转发给 Web 应用程序完成相对应的功能,并将结果返回给用户,这其中就需要有一个通信的规范,能够实现不同的 Web 服务器与不同的 Web 应用程序顺畅的通信,这三者之间的关系如下所示:

WSAI

在 Python 中,服务器网关接口(Web Server Gateway Interface,WSGI)就是这样的通信规范,通信的双方需要遵循这样的规范,有了这样的通信规范,就可以实现 Web 服务器与应用程序的解耦,让我们在编写应用程序时更加方便、高效,也更具通用的可移植性,我们也能更加专注在应用程序的功能开发上面。

以 Python 内置的 wsgiref 为例构建一个 WSGI 规范的 Web 服务为例,代码如下:

from wsgiref.simple_server import make_server

def simple_wsgi_app(environ, start_response):
	# 设置响应状态和头部
	status = '200 OK'
	headers = [('Content-type', 'text/plain')]
	start_response(status, headers)

	# 返回响应内容
	return [b'Hello, World!']

if __name__ == '__main__':
	# 创建WSGI服务器
	server = make_server('', 8000, simple_wsgi_app)
	print("Serving on port 8000...")
	server.serve_forever()

代码中通过 wsgiref 提供的 make_server 创建了一个简单的 WEB 服务器,并监听 8000 端口。一旦有请求到达,便会自动执行 simple_wsgi_app 函数,该函数只是简单地返回响应内容。在这里应用程序接口只是一个 Python 可调用对象(代码中的 simple_wsgi_app 函数),它接受两个参数,分别是包含 HTTP 请求信息的字典( environ )和用于发送 HTTP 响应的回调函数( start_response )。基于 WSGI 的网络服务存在一些明显的不足[1]:

● WSGI 是同步的,当一个请求到达时,WSGI 应用程序会阻塞处理该请求,直到完成并返回响应,然后才能处理下一个请求。
● WSGI 只支持 HTTP 协议,只能处理 HTTP 请求和响应,不支持 WebSocket 等协议。

相应的,人们引入了 ASGI ,着力要去解决上面的两个不足,这也成为了ASGI 的优势:

● 纯异步支持,这意味着基于 ASGI 的应用程序具有无与伦比的性能。
● 长连接支持,ASGI 原生支持 WebSocket 协议,无需额外实现独立于协议的功能。

1.3. Uvicorn

Python Uvicorn 是一个快速的 ASGI(Asynchronous ServerGateway Interface)服务器,用于构建异步 Web 服务。它基于 asyncio 库,支持高性能的异步请求处理,适用于各种类型的 Web 应用程序。

Uvicorn 是由 Starlette 框架的作者编写的 ASGI 服务器,旨在提供高性能的异步请求处理能力。它使用 asyncio 库实现异步 I/O 操作,支持 HTTP 和 WebSocket 协议,可与各种 ASGI 应用程序框架(如 FastAPI 、Django 、Starlette 等)配合使用。 Uvicorn 的特点包括:

  1. 高性能:基于 uvloop 和 httptools 的高性能实现,能够处理大量并发请求。
  2. 易用性:简洁的 API 和命令行接口,方便快速部署和调试。
  3. 兼容性:支持多种 ASGI 框架,如 Starlette 、 FastAPI 等。
  4. 灵活性:支持多种配置选项,可以根据需要进行定制。

2. Uvicorn 基础知识

要想理解 Uvicorn 的源码,需要一些必要的基础知识,如 Python Socket 网络编程。

2.1. Python Socket 网络编程

Socket 是对 TCP/IP 协议的封装, Socket 本身并不是协议,而是一个调用接口,通过 Socket ,我们才能使用 TCP/IP 协议。通过内置的 socket 模块可以实现在 Python 中的 Socket 网络编程,可以方便地创建网络应用程序。对于服务端,一般需要如下的几个步骤:

1)创建套接字。使用 socket.socket() 函数创建一个 TCP 套接字。套接字可以是流式(TCP)或数据报式(UDP)

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

其中,socket.AF_INET:表示使用IPv4地址。 socket.SOCK_STREAM:表示套接字类型为流式套接字,即 TCP。

2)绑定地址和端口。使用 bind() 方法将套接字绑定到特定的 IP 地址和端口号上,以便客户端能够连接到该地址和端口。 IP 可以不指定表示本机地址。

server_address = ('localhost', 8080)
server_socket.bind(server_address)

3)监听连接请求。使用 listen() 方法开始监听客户端的连接请求。参数 backlog 指定等待连接的最大数量。当服务器忙于处理当前连接而不能立即处理新的连接请求时,这些新的连接请求将会被放入一个等待队列中,直到服务器有能力处理它们。 backlog 参数定义了这个等待队列的最大长度。

server_socket.listen(5) # 最多允许5个等待连接的请求

4)接受客户端连接。使用 accept() 方法接受客户端的连接。该方法会阻塞程序,直到有客户端连接进来。一旦有连接,accept() 方法返回一个新的套接字(client_socket)和客户端的地址信息(client_address)。

client_socket, client_address = server_socket.accept()

其中, client_socket 是与客户端通信的新套接字, client_address 是客户端的地址信息,通常是一个元组(IP地址, 端口号)。

5)与客户端通信。使用返回的 client_socket 对象与客户端进行数据交换。可以使用 recv() 方法接收客户端发送的数据, recv 方法会阻塞直到接收到数据,并使用 sendall() 方法发送响应给客户端。

data = client_socket.recv(1024) # 接收数据,指定缓冲区大小为1024字节
client_socket.sendall(data) # 发送响应数据给客户端

6)关闭连接。当通信结束后,使用 close() 方法关闭客户端套接字和服务器套接字,释放资源。

server_socket.close() # 关闭服务器套接字

一个完整的 Socket 程序应该包含客户端和服务端两个部分,两者之间的调用关系如下图所示:

socket

对于服务端,将上述的过程结合起来便是一个完整的 Socket 程序,完整的代码如下所示:

import socket

# 服务器地址和端口
SERVER_HOST = 'localhost'
SERVER_PORT = 8888

def handle_client(client_socket):
	try:
		# 接收客户端的数据
		data = client_socket.recv(1024)
		print(f'收到客户端消息: {data.decode()}') # decode:把字节数据转换成字符串数据
		# 发送响应给客户端
		ret = "Hello World"
		message = 'HTTP/1.1 200 OK\r\nContent-Type: text/html;charset=utf-8\r\n\r\n'+ret
		client_socket.sendall(message.encode()) # encode:把字符串数据转换成字节数据
	except Exception as e:
		print(f'处理客户端连接时出错: {e}')
	finally:
		# 关闭客户端连接
		client_socket.close()

def start_server():
	# 创建TCP套接字
	server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
	# 设置端口复用选项
	server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
	
	try:
		# 绑定服务器地址和端口
		server_socket.bind((SERVER_HOST, SERVER_PORT))
		# 开始监听连接
		server_socket.listen(128)
		print(f'服务器启动,监听 {SERVER_HOST}:{SERVER_PORT}')
		while True:
			# 等待客户端连接
			client_socket, client_address = server_socket.accept()
			print(f'接受来自 {client_address} 的连接')
			
			handle_client(client_socket)
	except Exception as e:
		print(f'服务器启动失败: {e}')
	finally:
		# 关闭服务器套接字
		server_socket.close()

if __name__ == "__main__":
	start_server()

2.2. 利用 asyncio 库实现高效网络 I/O

上述用 Python 内置的 Socket 库实现了网络的连接,这样的代码也只能是概念上的解释,无法与高效相关联,为此,就得寻求更高效的网络 I/O 库, asyncio [5]就是这样的库。 asyncio 是 Python 中用来编写并发代码的库,在 Uvicorn 中,使用 asyncio 来构建网络服务,并且 Python 在 asyncio 库中,提供了一种简单的网络传输模型,称为协议与传输( Protocol & Transport ),这将会大大减少像使用 Socket 那样写大段的代码用于处理网络的连接,传输等,协议和传输是在 Socket 的基础上进行的封装,是更高一层次的应用。 asyncio 的内容也是比较多的,无法在这篇文章中都涉及到,本文中只会提及到 Uvicorn 中使用到的一些概念。对应于 Socket 部分,在使用 asyncio 构建网络服务时,也分为如下的几个部分:

1)设置当前事件循环策略。通过 asyncio.set_event_loop_policy() 方法对当前的环境下设置事件循环策略,如在 Uvicorn 中提供了 uvloop 事件循环和 asyncio 事件循环,这将会在下面介绍到。

2)获取当前 OS 线程中正在运行的事件循环。通过 asyncio.get_running_loop() 方法可以获取到当前正在运行的事件循环。

3)创建 TCP 服务。通过 loop.create_server() 方法可以创建一个 TCP 服务,并返回一个 Server 对象。 create_server 方法的定义如下:

coroutine loop.create_server(
	protocol_factory,
	host=None,
	port=None,
	*,
	family=socket.AF_UNSPEC,
	flags=socket.AI_PASSIVE,
	sock=None,
	backlog=100,
	ssl=None,
	reuse_address=None,
	reuse_port=None,
	ssl_handshake_timeout=None,
	start_serving=True)

其中, protocol_factory 是一个可以调用的工厂函数,其返回一个协议 asyncio.Protocol 实例,以参考文献[6]给出的 “TCP Echo Server” 为例,这里对其进行稍微的修改

import asyncio

class EchoServerProtocol(asyncio.Protocol):
	def connection_made(self, transport):
		# transport创建成功后的回调,这里需要跟Protocol进行绑定
		peername = transport.get_extra_info('peername')
		print('Connection from {}'.format(peername))
		self.transport = transport

	def data_received(self, data):
		# socket收到数据的回调事件,这里先接收消息,再把内容返回给客户端最后再关闭
		message = data.decode()
		print('Data received: {!r}'.format(message))

		ret = "Hello World"
		ans = 'HTTP/1.1 200 OK\r\nContent-Type: text/html;charset=utf-8\r\n\r\n'+ret
		self.transport.write(bytes(ans, "utf-8"))

		print('Close the client socket')
		self.transport.close()

async def main():
	# 返回当前 OS 线程中正在运行的事件循环。
	loop = asyncio.get_running_loop()
	# 创建TCP服务
	server = await loop.create_server(
		lambda: EchoServerProtocol(), host='127.0.0.1', port=8888
	)
	async with server:
		await server.serve_forever()

asyncio.run(main())

4)负责连接与数据传输的协议与传输。Protocol 类继承自 BaseProtocol 类,用于实现流式协议,如 TCP , Unix 套接字等等的基类。这个类里面的主要的方法有:

● connection_made:连接建立时被调用
● connection_lost:连接丢失或关闭时将被调用
● eof_received:当发出信号的另一端不再继续发送数据时被调用
● data_received(data):当收到数据时被调用。 data 为包含入站数据的非空字节串对象

除了 Protocol 类之外,还有 Transport 类,在 asyncio 的定义中, Protocol & Transport 是无法分开的一个整体,它们一起定义了网络 I/O 和进程间 I/O 的抽象接口,对于开发者来说可以简单的把 Protocol 理解为专门负责处理被动调用的,也就是连接什么时候建立,连接什么时候接收了数据;而 Transport 则是提供了许多开发者可以主动调用的接口,包括了向连接发送数据,关闭连接等等。

5)除了上述,还有一个是保证服务器不被关闭的函数: server.serve_forever()。

这样便利用 asyncio 实现了一个简易的 HTTP 服务。其实在 Uvicorn 中,并未使用到 server.serve_forever() 这个函数使得服务器不断接收请求,而是做了另一种方式,如下代码:

import asyncio
import uvloop

class EchoServerProtocol(asyncio.Protocol):
	def connection_made(self, transport):
		# transport创建成功后的回调,这里需要跟Protocol进行绑定
		peername = transport.get_extra_info('peername')
		print('Connection from {}'.format(peername))
		self.transport = transport

	def data_received(self, data):
		# socket收到数据的回调事件,这里先接收消息,再把内容返回给客户端最后再关闭
		message = data.decode()
		print('Data received: {!r}'.format(message))
		
		ret = "Hello World"
		ans = 'HTTP/1.1 200 OK\r\nContent-Type: text/html;charset=utf-8\r\n\r\n'+ret
		self.transport.write(bytes(ans, "utf-8"))
		
		print('Close the client socket')
		self.transport.close()
		
async def main_loop():
	while True:
		await asyncio.sleep(0.1)

async def main():
	# 返回当前 OS 线程中正在运行的事件循环。
	loop = asyncio.get_running_loop()
	# 创建TCP服务
	server = await loop.create_server(
		lambda: EchoServerProtocol(), host='127.0.0.1', port=8888
	)
	# 让主进程循环执行
	await main_loop()

# 设置当前事件循环策略
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
asyncio.run(main())

2.3. uvloop 和 httptools

Uvicorn 的工作原理主要基于 uvloop 和 httptools 两个库。其中:

● uvloop 是 Python 标准库 asyncio 的一个高性能替代品,它使用 libuv 库来提供更快的异步 I/O 操作。
● httptools 是一个用 C 编写的 HTTP 解析器,它比 Python 内置的 HTTP 解析器更快、更高效。

当 Uvicorn 启动一个 Web 服务器时,它会监听指定的端口和地址,等待客户端的请求。一旦接收到请求, Uvicorn 会将其解析为一个 HTTP 请求对象,并将其传递给 ASGI 应用程序。 ASGI 应用程序会异步地处理请求,并生成一个 HTTP 响应对象。 Uvicorn 会将这个响应对象转换为一个 HTTP 响应,并发送给客户端。在整个过程中, Uvicorn 充分利用了 uvloop 和 httptools 的性能优势,确保了 Web 服务器的高性能和低延迟。

2.4. ASGI 应用程序

ASGI[7] 既然是一种规范,那么在书写时就必须符合一些基本的要求,在 ASGI 的规范中规定, ASGI 应用程序是一个异步可调用对象,接受三个参数:

● scope: 链接范围字典,至少包含一个 type 表明协议类型
● receive: 一个可等待的可调用对象,用于读取数据
● send: 一个可等待的可调用对象,用于发送数据

以 Uvicorn 为例,我们有如下的例子:

import uvicorn

async def app(scope, receive, send):
	assert scope['type'] == 'http'

	print(f'''receive: {vars(receive)}\nsend: {send}''')

	await send({
		'type': 'http.response.start',
		'status': 200,
		'headers': [
			(b'content-type', b'text/plain'),
		],
	})
	await send({
		'type': 'http.response.body',
		'body': b'Hello, world!',
	})

if __name__ == "__main__":
	uvicorn.run(app)

3. Uvicorn 源码解析

为了剖析 Uvicorn 的源码,同时有了上面的基础,我们先从整体上把握一下基于 Uvicorn 搭建的 Web 应用的执行方式,如下图所示:

在核心的代码模块中,我将其分成了两个部分,分别为启动服务部分,即上图中的浅红色部分,另一部分是服务部分,即上述的浅绿色部分。在启动服务部分,根据不同的参数选择不同的服务启动方式,这部分会在下面的内容中介绍;在服务部分,相对于启动服务部分,则会显得更加复杂,首先是主进程 Server ,在此基础上,又通过异步方式创建了一个 task (lifespan),立一个是异步的 TCP server 。其中,lifespan 的作用是控制和管理 ASGI 应用程序;TCP server 的作用是接收用户的请求,将其转发至 ASGI 应用程序处理,最终将 ASGI 应用程序的结果返回给用户,详细的分析会在下面的内容中介绍。

3.1. 启动服务

在代码 main.py 中是服务的启动流程,在启动服务的过程中,根据参数的不同,有三种模式:

  • 根据 should_reload 设置服务的可重载方式,若代码发生变更,服务可自动重载;
  • 根据 workers 设置进程个数,可实现多进程启动 socket ;
  • 一般的启动模式,本文也是沿着一般的启动模式分析 Uvicorn 的原理
try:
	if config.should_reload: # 可重载的方式:代码发生变更,自动的重载
		sock = config.bind_socket()
		ChangeReload(config, target=server.run, sockets=[sock]).run()
	elif config.workers > 1: # 多进程启动 socket
		sock = config.bind_socket()
		Multiprocess(config, target=server.run, sockets=[sock]).run()
	else: # 一般模式
		server.run() # 入口,服务的启动
except KeyboardInterrupt:
	pass # pragma: full coverage
finally:
	if config.uds and os.path.exists(config.uds):
		os.remove(config.uds) # pragma: py-win32

3.2. 服务

服务部分除了自身的运行,还需要通过异步方式创建一个 Task (lifespan)和一个 TCP server ,其中,lifespan 是用于控制和管理 application 的,TCP server 是用于接收用户的请求并给用户返回系统的响应。

启动服务的代码如下:

async def _serve(self, sockets: list[socket.socket] | None = None) -> None:
	process_id = os.getpid()
	
	config = self.config
	if not config.loaded:
		config.load() # 加载配置

	self.lifespan = config.lifespan_class(config) # lifespan

	message = "Started server process [%d]"
	color_message = "Started server process [" + click.style("%d", fg="cyan") + "]"
	logger.info(message, process_id, extra={"color_message": color_message})

await self.startup(sockets=sockets) # 异步方式创建 Task 和 TCP Server
if self.should_exit:
	return
await self.main_loop()
await self.shutdown(sockets=sockets)

message = "Finished server process [%d]"
color_message = "Finished server process [" + click.style("%d", fg="cyan") + "]"
logger.info(message, process_id, extra={"color_message": color_message})

startup 部分用于启动 lifespan 和 TCP Server 的代码如下:

async def startup(self, sockets: list[socket.socket] | None = None) -> None:
	await self.lifespan.startup() # 启动 lifespan
	(省略无关代码)

	loop = asyncio.get_running_loop() # 获取当前运行的事件循环首选函数

	listeners: Sequence[socket.SocketType]
	if sockets is not None: # pragma: full coverage
	elif config.fd is not None: # pragma: py-win32
	elif config.uds is not None: # pragma: py-win32
	else:
		# Standard case. Create a socket from a host/port pair.
		# 创建一个 socket 服务
		try:
			# 创建一个 TCP server
			server = await loop.create_server(
				create_protocol, # protocol_factory
				host=config.host,
				port=config.port,
				ssl=config.ssl,
				backlog=config.backlog,
			)
		except OSError as exc:
			logger.error(exc)
			await self.lifespan.shutdown()
			sys.exit(1)

		assert server.sockets is not None
		listeners = server.sockets
		self.servers = [server]

	if sockets is None:
		self._log_started_message(listeners)
	else:
		# We're most likely running multiple workers, so a message has already been
		# logged by `config.bind_socket()`.
		pass # pragma: full coverage

self.started = True

在这里注意的是,在上图中,用红色虚线标记出的部分用于控制整个 Uvicorn 的执行,根据 should_exit 参数判断是否继续执行,而对于该参数的修改则是通过信号机制完成的。

3.2.1. 信号机制

在服务的启动过程中,使用到了信息的处理,信号 signal 是一种进程间的通信机制,当某个进程接收到信号时,就会执行相应的处理程序,实现对进程的控制和管理。而对进程的控制和管理是通过事件处理函数完成的,简单来说,信号机制就是事先对于不同的信号绑定一个事件处理函数,如在 Uvicorn 代码中有一个退出的事件处理函数:

def handle_exit(self, sig: int, frame: FrameType | None) -> None:
	self._captured_signals.append(sig)
	if self.should_exit and sig == signal.SIGINT:
		self.force_exit = True # pragma: full coverage
	else:
		self.should_exit = True # 设置 should_exit

在 Uvicorn 代码中也是提供了两种信号处理机制:

HANDLED_SIGNALS = (
	signal.SIGINT, # Unix signal 2. Sent by Ctrl+C.
	signal.SIGTERM, # Unix signal 15. Sent by `kill <pid>`.
)

更加巧妙的是整个信号处理机制被设计成上下文管理器的形式:

@contextlib.contextmanager
def capture_signals(self) -> Generator[None, None, None]:
	# Signals can only be listened to from the main thread.
	if threading.current_thread() is not threading.main_thread():
		yield
		return
	# always use signal.signal, even if loop.add_signal_handler is available
	# this allows to restore previous signal handlers later on
	original_handlers = {sig: signal.signal(sig, self.handle_exit) for sig in HANDLED_SIGNALS} # 注册信号处理函数
	try:
		yield
	finally:
		for sig, handler in original_handlers.items():
			signal.signal(sig, handler) # 注册信号处理函数
	# If we did gracefully shut down due to a signal, try to
	# trigger the expected behaviour now; multiple signals would be
	# done LIFO, see https://stackoverflow.com/questions/48434964
	for captured_signal in reversed(self._captured_signals):
		signal.raise_signal(captured_signal) # 发送信号

调用处的代码:

async def serve(self, sockets: list[socket.socket] | None = None) -> None:
	with self.capture_signals():
		await self._serve(sockets)

在 Uvicorn 的代码库中提供了对 http/1.1 和 websocket 两种协议的支持。其中, http 协议支持如下几种选项:

HTTP_PROTOCOLS: Dict[HTTPProtocolType, str] = {
	"auto": "uvicorn.protocols.http.auto:AutoHTTPProtocol",
	"h11": "uvicorn.protocols.http.h11_impl:H11Protocol",
	"httptools": "uvicorn.protocols.http.httptools_impl:HttpToolsProtocol",
}

3.2.2. Lifespan 的作用

在 Uvicorn 中,lifespan 是 ASGI 协议的一部分,它用于在应用的启动和关闭过程中对应用程序管理和与应用程序通信。在 lifespan 中,有两个重要的函数调用:startup 和 shutdown。要想弄清楚 lifespan 的工作原理,需要配合一个完善的 Application 框架一起,在这里,我选择了 starlette[8]。可能很多人对于 starlette 还比较陌生,熟悉 FastApi 的人就会知道,FastApi 是在 starlette 的基础上构建的。

我们知道,lifespan 的一大功能就是与 Application 通信,如上图中的 1,同时会将 receive 函数和 send 函数作为回调函数传递给应用程序:

async def main(self) -> None:
	try:
		app = self.config.loaded_app # ASGI 应用程序
		scope: LifespanScope = {
			"type": "lifespan",
			"asgi": {"version": self.config.asgi_version, "spec_version": "2.0"},
			"state": self.state,
		}
		await app(scope, self.receive, self.send) # 调用 ASCI 的应用程序
	except BaseException as exc:
		self.asgi = None
		self.error_occured = True
		if self.startup_failed or self.shutdown_failed:
			return
		if self.config.lifespan == "auto":
			msg = "ASGI 'lifespan' protocol appears unsupported."
			self.logger.info(msg)
		else:
			msg = "Exception in 'lifespan' protocol\n"
			self.logger.error(msg, exc_info=exc)
	finally:
		self.startup_event.set()
		self.shutdown_event.set()

当应用程序后会对 lifespan 传递过来的信息处理:

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
	scope["app"] = self
	if self.middleware_stack is None:
		self.middleware_stack = self.build_middleware_stack()
	await self.middleware_stack(scope, receive, send) # 进一步处理

可能你会对这里的 middleware_stack 感到一筹莫展,其实顺藤摸瓜,这里其实就是对应到 Router 类中的 app 函数。

注意:类似于 FastApi 等框架中都是以路由来区分不同的应用的。

在路由的 app 函数中,根据信息中的 type 信息区分不同的类型,因为我们是要分析 lifespan,在下面的代码中,我们去掉了与 lifespan 无关的代码:

async def app(self, scope: Scope, receive: Receive, send: Send) -> None:
	assert scope["type"] in ("http", "websocket", "lifespan")

	if "router" not in scope:
		scope["router"] = self

	if scope["type"] == "lifespan":
		await self.lifespan(scope, receive, send)
		return
	# (去掉与lifespan无关的代码)

接下来就比较清晰了,lifespan 函数的代码如下:

async def lifespan(self, scope: Scope, receive: Receive, send: Send) -> None:
	"""
	Handle ASGI lifespan messages, which allows us to manage application
	startup and shutdown events.
	"""
	started = False
	app: typing.Any = scope.get("app")
	await receive()
	try:
		async with self.lifespan_context(app) as maybe_state:
			if maybe_state is not None:
				if "state" not in scope:
					raise RuntimeError('The server does not support "state" in the lifespan scope.')
				scope["state"].update(maybe_state)
			await send({"type": "lifespan.startup.complete"})
			started = True
			await receive()
	except BaseException:
		exc_text = traceback.format_exc()
		if started:
			await send({"type": "lifespan.shutdown.failed", "message": exc_text})
		else:
			await send({"type": "lifespan.startup.failed", "message": exc_text})
		raise
	else:
		print(f"send: {send}")
		await send({"type": "lifespan.shutdown.complete"})

这段代码也就对应了上图中的 2,3,4。接下来就是在 Uvicorn 的 lifespan 中的 receive 函数和 send 函数,其中,self.receive 的代码如下:

async def receive(self) -> LifespanReceiveMessage:
	return await self.receive_queue.get()

self.send 的代码如下:

async def send(self, message: LifespanSendMessage) -> None:
	assert message["type"] in ( # 接收应用程序返回的类型主要分为四种
		"lifespan.startup.complete",
		"lifespan.startup.failed",
		"lifespan.shutdown.complete",
		"lifespan.shutdown.failed",
	)

	if message["type"] == "lifespan.startup.complete":
		assert not self.startup_event.is_set(), STATE_TRANSITION_ERROR
		assert not self.shutdown_event.is_set(), STATE_TRANSITION_ERROR
		self.startup_event.set()

	elif message["type"] == "lifespan.startup.failed":
		assert not self.startup_event.is_set(), STATE_TRANSITION_ERROR
		assert not self.shutdown_event.is_set(), STATE_TRANSITION_ERROR
		self.startup_event.set()
		self.startup_failed = True
		if message.get("message"):
			self.logger.error(message["message"])

	elif message["type"] == "lifespan.shutdown.complete":
		assert self.startup_event.is_set(), STATE_TRANSITION_ERROR
		assert not self.shutdown_event.is_set(), STATE_TRANSITION_ERROR
		self.shutdown_event.set()

	elif message["type"] == "lifespan.shutdown.failed":
		assert self.startup_event.is_set(), STATE_TRANSITION_ERROR
		assert not self.shutdown_event.is_set(), STATE_TRANSITION_ERROR
		self.shutdown_event.set()
		self.shutdown_failed = True
		if message.get("message"):
			self.logger.error(message["message"])

Lifespan 管理的消息类型主要分为四种:

  • lifespan.startup.complete
  • lifespan.startup.failed
  • lifespan.shutdown.complete
  • lifespan.shutdown.failed

针对地会触发不同的事件类型,如图上的 5,6。

3.2.3. 协议 protocols

在 Uvicorn 的代码库中提供了对 http/1.1 和 websocket 两种协议的支持。其中, http 协议支持如下几种选项:

HTTP_PROTOCOLS: Dict[HTTPProtocolType, str] = {
	"auto": "uvicorn.protocols.http.auto:AutoHTTPProtocol",
	"h11": "uvicorn.protocols.http.h11_impl:H11Protocol",
	"httptools": "uvicorn.protocols.http.httptools_impl:HttpToolsProtocol",
}

H11 是一个实现 http 协议库。Uvicorn 用了 HTTP 协议库做了相应的 Protocol。交由 asyncio 提供的网络应用服务处理

4. 总结

至此,我们对 Uvicorn 的基本原理以及部分源码进行了分析,了解到基本的工作流程,但是完整的 Uvicorn 中的内容还远不止这些,本文中也只是沿着 Uvicorn 的工作流程做简单的梳理,要想对所有相关的内容有详细的认知,还得对以下的部分做详细分析:

  • Uvicorn 热更新:ChangeReload 函数部分
  • 多进程启动服务:Multiprocess 函数部分
  • 协议部分:包括了 H11Protocol,HttpToolsProtocol,WSProtocol,WebSocketProtocol
  • 在与 Application 通信的过程中使用到的 ProxyHeadersMiddleware

参考文献

[1] https://www.uvicorn.org/
[2] https://docs.djangoproject.com/zh-hans/5.1/
[3] https://flask.palletsprojects.com/en/3.0.x/
[4] https://werkzeug.palletsprojects.com/en/3.0.x/
[5] https://docs.python.org/zh-cn/3/library/asyncio.html
[6] https://docs.python.org/3/library/asyncio-protocol.html#tcp-echo-server
[7] https://asgi.readthedocs.io/en/latest/
[8] https://www.starlette.io/