事件循環(huán)?
源代碼: Lib/asyncio/events.py, Lib/asyncio/base_events.py
前言
事件循環(huán)是每個 asyncio 應用的核心。 事件循環(huán)會運行異步任務和回調,執(zhí)行網絡 IO 操作,以及運行子進程。
應用開發(fā)者通常應當使用高層級的 asyncio 函數(shù),例如 asyncio.run(),應當很少有必要引用循環(huán)對象或調用其方法。 本節(jié)所針對的主要是低層級代碼、庫和框架的編寫者,他們需要更細致地控制事件循環(huán)行為。
獲取事件循環(huán)
以下低層級函數(shù)可被用于獲取、設置或創(chuàng)建事件循環(huán):
- asyncio.get_running_loop()?
返回當前 OS 線程中正在運行的事件循環(huán)。
如果沒有正在運行的事件循環(huán)則會引發(fā)
RuntimeError。 此函數(shù)只能由協(xié)程或回調來調用。3.7 新版功能.
- asyncio.get_event_loop()?
獲取當前事件循環(huán)。
如果當前 OS 線程沒有設置當前事件循環(huán),該 OS 線程為主線程,并且
set_event_loop()還沒有被調用,則 asyncio 將創(chuàng)建一個新的事件循環(huán)并將其設為當前事件循環(huán)。由于此函數(shù)具有相當復雜的行為(特別是在使用了自定義事件循環(huán)策略的時候),更推薦在協(xié)程和回調中使用
get_running_loop()函數(shù)而非get_event_loop()。應該考慮使用
asyncio.run()函數(shù)而非使用低層級函數(shù)來手動創(chuàng)建和關閉事件循環(huán)。3.10 版后已移除: 如果沒有正在運行的事件循環(huán)則會發(fā)出棄用警告。 在未來的 Python 發(fā)行版中,此函數(shù)將成為
get_running_loop()的別名。
- asyncio.set_event_loop(loop)?
將 loop 設置為當前 OS 線程的當前事件循環(huán)。
- asyncio.new_event_loop()?
Create and return a new event loop object.
請注意 get_event_loop(), set_event_loop() 以及 new_event_loop() 函數(shù)的行為可以通過 設置自定義事件循環(huán)策略 來改變。
目錄
本文檔包含下列小節(jié):
事件循環(huán)方法集 章節(jié)是事件循環(huán)APIs的參考文檔;
回調處理 章節(jié)是從調度方法 例如
loop.call_soon()和loop.call_later()中返回Handle和TimerHandle實例的文檔。Server Objects 章節(jié)記錄了從事件循環(huán)方法返回的類型,比如
loop.create_server();Event Loop Implementations 章節(jié)記錄了
SelectorEventLoop和ProactorEventLoop類;Examples 章節(jié)展示了如何使用某些事件循環(huán)API。
事件循環(huán)方法集?
事件循環(huán)有下列 低級 APIs:
運行和停止循環(huán)?
- loop.run_until_complete(future)?
運行直到 future (
Future的實例 ) 被完成。如果參數(shù)是 coroutine object ,將被隱式調度為
asyncio.Task來運行。返回 Future 的結果 或者引發(fā)相關異常。
- loop.run_forever()?
運行事件循環(huán)直到
stop()被調用。如果
stop()在調用run_forever()之前被調用,循環(huán)將輪詢一次 I/O 選擇器并設置超時為零,再運行所有已加入計劃任務的回調來響應 I/O 事件(以及已加入計劃任務的事件),然后退出。如果
stop()在run_forever()運行期間被調用,循環(huán)將運行當前批次的回調然后退出。 請注意在此情況下由回調加入計劃任務的新回調將不會運行;它們將會在下次run_forever()或run_until_complete()被調用時運行。
- loop.stop()?
停止事件循環(huán)。
- loop.is_running()?
返回
True如果事件循環(huán)當前正在運行。
- loop.is_closed()?
如果事件循環(huán)已經被關閉,返回
True。
- loop.close()?
關閉事件循環(huán)。
當這個函數(shù)被調用的時候,循環(huán)必須處于非運行狀態(tài)。pending狀態(tài)的回調將被丟棄。
此方法清除所有的隊列并立即關閉執(zhí)行器,不會等待執(zhí)行器完成。
這個方法是冪等的和不可逆的。事件循環(huán)關閉后,不應調用其他方法。
- coroutine loop.shutdown_asyncgens()?
安排所有當前打開的 asynchronous generator 對象通過
aclose()調用來關閉。 在調用此方法后,如果有新的異步生成器被迭代事件循環(huán)將會發(fā)出警告。 這應當被用來可靠地完成所有已加入計劃任務的異步生成器。請注意當使用
asyncio.run()時不必調用此函數(shù)。示例:
try: loop.run_forever() finally: loop.run_until_complete(loop.shutdown_asyncgens()) loop.close()
3.6 新版功能.
- coroutine loop.shutdown_default_executor()?
安排默認執(zhí)行器的關閉并等待它合并
ThreadPoolExecutor中的所有線程。 在調用此方法后,如果在使用默認執(zhí)行器期間調用了loop.run_in_executor()則將會引發(fā)RuntimeError。請注意當使用
asyncio.run()時不必調用此函數(shù)。3.9 新版功能.
安排回調?
- loop.call_soon(callback, *args, context=None)?
安排 callback callback 在事件循環(huán)的下一次迭代時附帶 args 參數(shù)被調用。
回調按其注冊順序被調用。每個回調僅被調用一次。
可選鍵值類的參數(shù) context 允許 callback 運行在一個指定的自定義
contextvars.Context對象中。如果沒有提供 context ,則使用當前上下文。返回一個能用來取消回調的
asyncio.Handle實例。這個方法不是線程安全的。
- loop.call_soon_threadsafe(callback, *args, context=None)?
call_soon()的線程安全變體。必須被用于安排 來自其他線程 的回調。如果在已被關閉的循環(huán)上調用則會引發(fā)
RuntimeError。 這可能會在主應用程序被關閉時在二級線程上發(fā)生。參見 concurrency and multithreading 部分的文檔。
在 3.7 版更改: 加入鍵值類形參 context。請參閱 PEP 567 查看更多細節(jié)。
備注
大多數(shù) asyncio 的調度函數(shù)不讓傳遞關鍵字參數(shù)。為此,請使用 functools.partial() :
# will schedule "print("Hello", flush=True)"
loop.call_soon(
functools.partial(print, "Hello", flush=True))
使用 partial 對象通常比使用lambda更方便,asyncio 在調試和錯誤消息中能更好的呈現(xiàn) partial 對象。
調度延遲回調?
事件循環(huán)提供安排調度函數(shù)在將來某個時刻調用的機制。事件循環(huán)使用單調時鐘來跟蹤時間。
- loop.call_later(delay, callback, *args, context=None)?
安排 callback 在給定的 delay 秒(可以是 int 或者 float)后被調用。
返回一個
asyncio.TimerHandle實例,該實例能用于取消回調。callback 只被調用一次。如果兩個回調被安排在同樣的時間點,執(zhí)行順序未限定。
可選的位置參數(shù) args 在被調用的時候傳遞給 callback 。 如果你想把關鍵字參數(shù)傳遞給 callback ,請使用
functools.partial()。可選鍵值類的參數(shù) context 允許 callback 運行在一個指定的自定義
contextvars.Context對象中。如果沒有提供 context ,則使用當前上下文。在 3.7 版更改: 加入鍵值類形參 context。請參閱 PEP 567 查看更多細節(jié)。
在 3.8 版更改: 在 Python 3.7 和更早版本的默認事件循環(huán)實現(xiàn)中, delay 不能超過一天。 這在 Python 3.8 中已被修復。
- loop.call_at(when, callback, *args, context=None)?
安排 callback 在給定的絕對時間戳 when (int 或 float) 被調用,使用與
loop.time()同樣的時間參考。本方法的行為和
call_later()方法相同。返回一個
asyncio.TimerHandle實例,該實例能用于取消回調。在 3.7 版更改: 加入鍵值類形參 context。請參閱 PEP 567 查看更多細節(jié)。
在 3.8 版更改: 在 Python 3.7 和更早版本的默認事件循環(huán)實現(xiàn)中,when 和當前時間相差不能超過一天。 在這 Python 3.8 中已被修復。
備注
在 3.8 版更改: 在 Python 3.7 和更早版本中超時 (相對的 delay 或絕對的 when) 不能超過一天。 這在 Python 3.8 中已被修復。
參見
asyncio.sleep() 函數(shù)。
創(chuàng)建 Future 和 Task?
- loop.create_future()?
創(chuàng)建一個附加到事件循環(huán)中的
asyncio.Future對象。這是在asyncio中創(chuàng)建Futures的首選方式。這讓第三方事件循環(huán)可以提供Future 對象的替代實現(xiàn)(更好的性能或者功能)。
3.5.2 新版功能.
- loop.create_task(coro, *, name=None, context=None)?
安排一個 協(xié)程 的執(zhí)行。返回一個
Task對象。第三方的事件循環(huán)可以使用它們自己的
Task子類來滿足互操作性。這種情況下結果類型是一個Task的子類。如果提供了 name 參數(shù)且不為
None,它會使用Task.set_name()來設為任務的名稱。An optional keyword-only context argument allows specifying a custom
contextvars.Contextfor the coro to run in. The current context copy is created when no context is provided.在 3.8 版更改: Added the name parameter.
在 3.11 版更改: Added the context parameter.
- loop.set_task_factory(factory)?
設置一個任務工廠,它將由
loop.create_task()來使用。If factory is
Nonethe default task factory will be set. Otherwise, factory must be a callable with the signature matching(loop, coro, context=None), where loop is a reference to the active event loop, and coro is a coroutine object. The callable must return aasyncio.Future-compatible object.
- loop.get_task_factory()?
返回一個任務工廠,或者如果是使用默認值則返回
None。
打開網絡連接?
- coroutine loop.create_connection(protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None)?
打開一個流式傳輸連接,連接到由 host 和 port 指定的地址。
套接字族可以是
AF_INET或AF_INET6,具體取決于 host (或 family 參數(shù),如果有提供的話)。套接字類型將為
SOCK_STREAM。protocol_factory 必須為一個返回 asyncio 協(xié)議 實現(xiàn)的可調用對象。
這個方法會嘗試在后臺創(chuàng)建連接。當創(chuàng)建成功,返回
(transport, protocol)組合。底層操作的大致的執(zhí)行順序是這樣的:
創(chuàng)建連接并為其創(chuàng)建一個 傳輸。
不帶參數(shù)地調用 protocol_factory 并預期返回一個 協(xié)議 實例。
協(xié)議實例通過調用其
connection_made()方法與傳輸進行配對。成功時返回一個
(transport, protocol)元組。
創(chuàng)建的傳輸是一個具體實現(xiàn)相關的雙向流。
其他參數(shù):
ssl: 如果給定該參數(shù)且不為假值,則會創(chuàng)建一個 SSL/TLS 傳輸(默認創(chuàng)建一個純 TCP 傳輸)。 如果 ssl 是一個
ssl.SSLContext對象,則會使用此上下文來創(chuàng)建傳輸對象;如果 ssl 為True,則會使用從ssl.create_default_context()返回的默認上下文。server_hostname 設置或重載目標服務器的證書將要匹配的主機名。 應當只在 ssl 不為
None時傳入。 默認情況下會使用 host 參數(shù)的值。 如果 host 為空那就沒有默認值,你必須為 server_hostname 傳入一個值。 如果 server_hostname 為空字符串,則主機名匹配會被禁用(這是一個嚴重的安全風險,使得潛在的中間人攻擊成為可能)。family, proto, flags 是可選的地址族、協(xié)議和標志,它們會被傳遞給 getaddrinfo() 來對 host 進行解析。如果要指定的話,這些都應該是來自于
socket模塊的對應常量。如果給出 happy_eyeballs_delay,它將為此鏈接啟用 Happy Eyeballs。 該函數(shù)應當為一個表示在開始下一個并行嘗試之前要等待連接嘗試完成的秒數(shù)的浮點數(shù)。 這也就是在 RFC 8305 中定義的 "連接嘗試延遲"。 該 RFC 所推薦的合理默認值為
0.25(250 毫秒)。interleave 控制當主機名解析為多個 IP 地址時的地址重排序。 如果該參數(shù)為
0或未指定,則不會進行重排序,這些地址會按getaddrinfo()所返回的順序進行嘗試。 如果指定了一個正整數(shù),這些地址會按地址族交錯排列,而指定的整數(shù)會被解讀為 RFC 8305 所定義的 "首個地址族計數(shù)"。 如果 happy_eyeballs_delay 未指定則默認值為0,否則為1。如果給出 sock,它應當是一個已存在、已連接并將被傳輸所使用的
socket.socket對象。 如果給出了 sock,則 host, port, family, proto, flags, happy_eyeballs_delay, interleave 和 local_addr 都不應當被指定。如果給出 local_addr,它應當是一個用來在本地綁定套接字的
(local_host, local_port)元組。 local_host 和 local_port 會使用getaddrinfo()來查找,這與 host 和 port 類似。ssl_handshake_timeout 是(用于 TLS 連接的)在放棄連接之前要等待 TLS 握手完成的秒數(shù)。 如果參數(shù)為
None則使用 (默認的)60.0。ssl_shutdown_timeout is the time in seconds to wait for the SSL shutdown to complete before aborting the connection.
30.0seconds ifNone(default).
在 3.5 版更改:
ProactorEventLoop類中添加 SSL/TLS 支持。在 3.6 版更改: 套接字選項
TCP_NODELAY默認已為所有 TCP 連接進行了設置。在 3.7 版更改: Added the ssl_handshake_timeout parameter.
在 3.8 版更改: 增加了 happy_eyeballs_delay 和 interleave 形參。
Happy Eyeballs 算法:成功使用雙棧主機。 當服務器的 IPv4 路徑和協(xié)議工作正常,但服務器的 IPv6 路徑和協(xié)議工作不正常時,雙棧客戶端應用程序相比單獨 IPv4 客戶端會感受到明顯的連接延遲。 這是不可接受的因為它會導致雙棧客戶端糟糕的用戶體驗。 此文檔指明了減少這種用戶可見延遲的算法要求并提供了具體的算法。
在 3.11 版更改: Added the ssl_shutdown_timeout parameter.
參見
open_connection()函數(shù)是一個高層級的替代 API。 它返回一對 (StreamReader,StreamWriter),可在 async/await 代碼中直接使用。
- coroutine loop.create_datagram_endpoint(protocol_factory, local_addr=None, remote_addr=None, *, family=0, proto=0, flags=0, reuse_port=None, allow_broadcast=None, sock=None)?
創(chuàng)建一個數(shù)據(jù)報連接。
套接字族可以是
AF_INET,AF_INET6或AF_UNIX,具體取決于 host (或 family 參數(shù),如果有提供的話)。socket類型將是
SOCK_DGRAM。protocol_factory 必須為一個返回 協(xié)議 實現(xiàn)的可調用對象。
成功時返回一個
(transport, protocol)元組。其他參數(shù):
如果給出 local_addr,它應當是一個用來在本地綁定套接字的
(local_host, local_port)元組。 local_host 和 local_port 是使用getaddrinfo()來查找的。remote_addr,如果指定的話,就是一個
(remote_host, remote_port)元組,用于同一個遠程地址連接。remote_host 和 remote_port 是使用getaddrinfo()來查找的。family, proto, flags 是可選的地址族,協(xié)議和標志,其會被傳遞給
getaddrinfo()來完成 host 的解析。如果要指定的話,這些都應該是來自于socket模塊的對應常量。reuse_port 告知內核,只要在創(chuàng)建時都設置了這個旗標,就允許此端點綁定到其他現(xiàn)有端點所綁定的相同端口上。 這個選項在 Windows 和某些 Unix 上不受支持。 如果
SO_REUSEPORT常量未定義則此功能就是不受支持的。allow_broadcast 告知內核允許此端點向廣播地址發(fā)送消息。
sock 可選擇通過指定此值用于使用一個預先存在的,已經處于連接狀態(tài)的
socket.socket對象,并將其提供給此傳輸對象使用。如果指定了這個值, local_addr 和 remote_addr 就應該被忽略 (必須為None)。
參見 UDP echo 客戶端協(xié)議 和 UDP echo 服務端協(xié)議 的例子。
在 3.4.4 版更改: The family, proto, flags, reuse_address, reuse_port, allow_broadcast, and sock parameters were added.
在 3.8.1 版更改: The reuse_address parameter is no longer supported, as using
SO_REUSEADDRposes a significant security concern for UDP. Explicitly passingreuse_address=Truewill raise an exception.當具有不同 UID 的多個進程將套接字賦給具有
SO_REUSEADDR的相同 UDP 套接字地址時,傳入的數(shù)據(jù)包可能會在套接字間隨機分配。對于受支持的平臺,reuse_port 可以被用作類似功能的替代。 通過 reuse_port 將改用
SO_REUSEPORT,它能夠防止具有不同 UID 的進程將套接字賦給相同的套接字地址。在 3.8 版更改: 添加WIndows的支持。
在 3.11 版更改: The reuse_address parameter, disabled since Python 3.9.0, 3.8.1, 3.7.6 and 3.6.10, has been entirely removed.
- coroutine loop.create_unix_connection(protocol_factory, path=None, *, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)?
創(chuàng)建Unix 連接
套接字族將為
AF_UNIX;套接字類型將為SOCK_STREAM。成功時返回一個
(transport, protocol)元組。path 是所要求的 Unix 域套接字的名字,除非指定了 sock 形參。 抽象的 Unix 套接字,
str,bytes和Path路徑都是受支持的。請查看
loop.create_connection()方法的文檔了解有關此方法的參數(shù)的信息。可用性: Unix。
在 3.7 版更改: Added the ssl_handshake_timeout parameter. The path parameter can now be a path-like object.
在 3.11 版更改: Added the ssl_shutdown_timeout parameter.
創(chuàng)建網絡服務?
- coroutine loop.create_server(protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)?
創(chuàng)建TCP服務 (socket 類型
SOCK_STREAM) 監(jiān)聽 host 地址的 port 端口。返回一個
Server對象。參數(shù):
protocol_factory 必須為一個返回 協(xié)議 實現(xiàn)的可調用對象。
host 形參可被設為幾種類型,它確定了服務器所應監(jiān)聽的位置:
如果 host 是一個字符串,則 TCP 服務器會被綁定到 host 所指明的單一網絡接口。
如果 host 是一個字符串序列,則 TCP 服務器會被綁定到序列所指明的所有網絡接口。
如果 host 是一個空字符串或
None,則會應用所有接口并將返回包含多個套接字的列表(通常是一個 IPv4 的加一個 IPv6 的)。
The port parameter can be set to specify which port the server should listen on. If
0orNone(the default), a random unused port will be selected (note that if host resolves to multiple network interfaces, a different random port will be selected for each interface).family 可被設為
socket.AF_INET或AF_INET6以強制此套接字使用 IPv4 或 IPv6。 如果未設定,則 family 將通過主機名稱來確定 (默認為AF_UNSPEC)。flags 是用于
getaddrinfo()的位掩碼。可以選擇指定 sock 以便使用預先存在的套接字對象。 如果指定了此參數(shù),則不可再指定 host 和 port。
backlog 是傳遞給
listen()的最大排隊連接的數(shù)量(默認為100)。ssl 可被設置為一個
SSLContext實例以在所接受的連接上啟用 TLS。reuse_address 告知內核要重用一個處于
TIME_WAIT狀態(tài)的本地套接字,而不是等待其自然超時失效。 如果未指定此參數(shù)則在 Unix 上將自動設置為True。reuse_port 告知內核,只要在創(chuàng)建的時候都設置了這個標志,就允許此端點綁定到其它端點列表所綁定的同樣的端口上。這個選項在 Windows 上是不支持的。
ssl_handshake_timeout 是(用于 TLS 服務器的)在放棄連接之前要等待 TLS 握手完成的秒數(shù)。 如果參數(shù)為 (默認值)
None則為60.0秒。ssl_shutdown_timeout is the time in seconds to wait for the SSL shutdown to complete before aborting the connection.
30.0seconds ifNone(default).start_serving 設置成
True(默認值) 會導致創(chuàng)建server并立即開始接受連接。設置成False,用戶需要等待Server.start_serving()或者Server.serve_forever()以使server開始接受連接。
在 3.5 版更改:
ProactorEventLoop類中添加 SSL/TLS 支持。在 3.5.1 版更改: host 形參可以是一個字符串的序列。
在 3.6 版更改: Added ssl_handshake_timeout and start_serving parameters. The socket option
TCP_NODELAYis set by default for all TCP connections.在 3.11 版更改: Added the ssl_shutdown_timeout parameter.
參見
start_server()函數(shù)是一個高層級的替代 API,它返回一對StreamReader和StreamWriter,可在 async/await 代碼中使用。
- coroutine loop.create_unix_server(protocol_factory, path=None, *, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)?
與
loop.create_server()類似但是專用于AF_UNIX套接字族。path 是必要的 Unix 域套接字名稱,除非提供了 sock 參數(shù)。 抽象的 Unix 套接字,
str,bytes和Path路徑都是受支持的。請查看
loop.create_server()方法的文檔了解有關此方法的參數(shù)的信息。可用性: Unix。
在 3.7 版更改: Added the ssl_handshake_timeout and start_serving parameters. The path parameter can now be a
Pathobject.在 3.11 版更改: Added the ssl_shutdown_timeout parameter.
- coroutine loop.connect_accepted_socket(protocol_factory, sock, *, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)?
將已被接受的連接包裝成一個傳輸/協(xié)議對。
此方法可被服務器用來接受 asyncio 以外的連接,但是使用 asyncio 來處理它們。
參數(shù):
protocol_factory 必須為一個返回 協(xié)議 實現(xiàn)的可調用對象。
sock 是一個預先存在的套接字對象,它是由
socket.accept返回的。ssl 可被設置為一個
SSLContext以在接受的連接上啟用 SSL。ssl_handshake_timeout 是(為一個SSL連接)在中止連接前,等待SSL握手完成的時間【單位秒】。如果為
None(缺省) 則是60.0秒。ssl_shutdown_timeout is the time in seconds to wait for the SSL shutdown to complete before aborting the connection.
30.0seconds ifNone(default).
返回一個
(transport, protocol)對。3.5.3 新版功能.
在 3.7 版更改: Added the ssl_handshake_timeout parameter.
在 3.11 版更改: Added the ssl_shutdown_timeout parameter.
傳輸文件?
- coroutine loop.sendfile(transport, file, offset=0, count=None, *, fallback=True)?
將 file 通過 transport 發(fā)送。 返回所發(fā)送的字節(jié)總數(shù)。
如果可用的話,該方法將使用高性能的
os.sendfile()。file 必須是個二進制模式打開的常規(guī)文件對象。
offset 指明從何處開始讀取文件。 如果指定了 count,它是要傳輸?shù)淖止?jié)總數(shù)而不再一直發(fā)送文件直至抵達 EOF。 文件位置總是會被更新,即使此方法引發(fā)了錯誤,并可以使用
file.tell()來獲取實際發(fā)送的字節(jié)總數(shù)。fallback 設為
True會使得 asyncio 在平臺不支持 sendfile 系統(tǒng)調用時手動讀取并發(fā)送文件(例如 Windows 或 Unix 上的 SSL 套接字)。如果系統(tǒng)不支持 sendfile 系統(tǒng)調用且 fallback 為
False則會引發(fā)SendfileNotAvailableError。3.7 新版功能.
TLS 升級?
- coroutine loop.start_tls(transport, protocol, sslcontext, *, server_side=False, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)?
將現(xiàn)有基于傳輸?shù)倪B接升級到 TLS。
返回一個新的傳輸實例,其中 protocol 必須在 await 之后立即開始使用。 傳給 start_tls 方法的 transport 實例應永遠不會被再次使用。
參數(shù):
transport 和 protocol 實例的方法與
create_server()和create_connection()所返回的類似。sslcontext :一個已經配置好的
SSLContext實例。當服務端連接已升級時 (如
create_server()所創(chuàng)建的對象) server_side 會傳入True。server_hostname :設置或者覆蓋目標服務器證書中相對應的主機名。
ssl_handshake_timeout 是(用于 TLS 連接的)在放棄連接之前要等待 TLS 握手完成的秒數(shù)。 如果參數(shù)為
None則使用 (默認的)60.0。ssl_shutdown_timeout is the time in seconds to wait for the SSL shutdown to complete before aborting the connection.
30.0seconds ifNone(default).
3.7 新版功能.
在 3.11 版更改: Added the ssl_shutdown_timeout parameter.
監(jiān)控文件描述符?
- loop.add_reader(fd, callback, *args)?
開始監(jiān)視 fd 文件描述符以獲取讀取的可用性,一旦 fd 可用于讀取,使用指定的參數(shù)調用 callback 。
- loop.remove_reader(fd)?
停止對文件描述符 fd 讀取可用性的監(jiān)視。
- loop.add_writer(fd, callback, *args)?
開始監(jiān)視 fd 文件描述符的寫入可用性,一旦 fd 可用于寫入,使用指定的參數(shù)調用 callback 。
使用
functools.partial()傳遞關鍵字參數(shù) 給 callback.
- loop.remove_writer(fd)?
停止對文件描述符 fd 的寫入可用性監(jiān)視。
另請查看 平臺支持 一節(jié)了解以上方法的某些限制。
直接使用 socket 對象?
通常,使用基于傳輸?shù)?API 的協(xié)議實現(xiàn),例如 loop.create_connection() 和 loop.create_server() 比直接使用套接字的實現(xiàn)更快。 但是,在某些應用場景下性能并不非常重要,直接使用 socket 對象會更方便。
- coroutine loop.sock_recv(sock, nbytes)?
從 sock 接收至多 nbytes。
socket.recv()的異步版本。返回接收到的數(shù)據(jù)【bytes對象類型】。
sock 必須是個非阻塞socket。
在 3.7 版更改: 雖然這個方法總是被記錄為協(xié)程方法,但它在 Python 3.7 之前的發(fā)行版中會返回一個
Future。 從 Python 3.7 開始它則是一個async def方法。
- coroutine loop.sock_recv_into(sock, buf)?
從 sock 接收數(shù)據(jù)放入 buf 緩沖區(qū)。 模仿了阻塞型的
socket.recv_into()方法。返回寫入緩沖區(qū)的字節(jié)數(shù)。
sock 必須是個非阻塞socket。
3.7 新版功能.
- coroutine loop.sock_recvfrom(sock, bufsize)?
Receive a datagram of up to bufsize from sock. Asynchronous version of
socket.recvfrom().Return a tuple of (received data, remote address).
sock 必須是個非阻塞socket。
3.11 新版功能.
- coroutine loop.sock_recvfrom_into(sock, buf, nbytes=0)?
Receive a datagram of up to nbytes from sock into buf. Asynchronous version of
socket.recvfrom_into().Return a tuple of (number of bytes received, remote address).
sock 必須是個非阻塞socket。
3.11 新版功能.
- coroutine loop.sock_sendall(sock, data)?
將 data 發(fā)送到 sock 套接字。
socket.sendall()的異步版本。此方法會持續(xù)發(fā)送數(shù)據(jù)到套接字直至 data 中的所有數(shù)據(jù)發(fā)送完畢或是有錯誤發(fā)生。 當成功時會返回
None。 當發(fā)生錯誤時,會引發(fā)一個異常。 此外,沒有辦法能確定有多少數(shù)據(jù)或是否有數(shù)據(jù)被連接的接收方成功處理。sock 必須是個非阻塞socket。
在 3.7 版更改: 雖然這個方法一直被標記為協(xié)程方法。但是,Python 3.7 之前,該方法返回
Future,從Python 3.7 開始,這個方法是async def方法。
- coroutine loop.sock_sendto(sock, data, address)?
Send a datagram from sock to address. Asynchronous version of
socket.sendto().Return the number of bytes sent.
sock 必須是個非阻塞socket。
3.11 新版功能.
- coroutine loop.sock_connect(sock, address)?
將 sock 連接到位于 address 的遠程套接字。
socket.connect()的異步版本。sock 必須是個非阻塞socket。
在 3.5.2 版更改:
address不再需要被解析。sock_connect將嘗試檢查 address 是否已通過調用socket.inet_pton()被解析。 如果沒有,則將使用loop.getaddrinfo()來解析 address。
- coroutine loop.sock_accept(sock)?
接受一個連接。 模仿了阻塞型的
socket.accept()方法。此 scoket 必須綁定到一個地址上并且監(jiān)聽連接。返回值是一個
(conn, address)對,其中 conn 是一個 新*的套接字對象,用于在此連接上收發(fā)數(shù)據(jù),*address 是連接的另一端的套接字所綁定的地址。sock 必須是個非阻塞socket。
在 3.7 版更改: 雖然這個方法一直被標記為協(xié)程方法。但是,Python 3.7 之前,該方法返回
Future,從Python 3.7 開始,這個方法是async def方法。
- coroutine loop.sock_sendfile(sock, file, offset=0, count=None, *, fallback=True)?
在可能的情況下使用高性能的
os.sendfile發(fā)送文件。 返回所發(fā)送的字節(jié)總數(shù)。socket.sendfile()的異步版本。sock 必須為非阻塞型的
socket.SOCK_STREAMsocket。file 必須是個用二進制方式打開的常規(guī)文件對象。
offset 指明從何處開始讀取文件。 如果指定了 count,它是要傳輸?shù)淖止?jié)總數(shù)而不再一直發(fā)送文件直至抵達 EOF。 文件位置總是會被更新,即使此方法引發(fā)了錯誤,并可以使用
file.tell()來獲取實際發(fā)送的字節(jié)總數(shù)。當 fallback 被設為
True時,會使用 asyncio 在平臺不支持 sendfile 系統(tǒng)調用時手動讀取并發(fā)送文件(例如 Windows 或 Unix 上的 SSL 套接字)。如果系統(tǒng)不支持 sendfile 并且 fallback 為
False,引發(fā)SendfileNotAvailableError異常。sock 必須是個非阻塞socket。
3.7 新版功能.
DNS?
- coroutine loop.getaddrinfo(host, port, *, family=0, type=0, proto=0, flags=0)?
異步版的
socket.getaddrinfo()。
- coroutine loop.getnameinfo(sockaddr, flags=0)?
異步版的
socket.getnameinfo()。
在 3.7 版更改: getaddrinfo 和 getnameinfo 方法一直被標記返回一個協(xié)程,但是Python 3.7之前,實際返回的是 asyncio.Future 對象。從Python 3.7 開始,這兩個方法是協(xié)程。
使用管道?
- coroutine loop.connect_read_pipe(protocol_factory, pipe)?
在事件循環(huán)中注冊 pipe 的讀取端。
protocol_factory 必須為一個返回 asyncio 協(xié)議 實現(xiàn)的可調用對象。
pipe 是個 類似文件型對象.
返回一對
(transport, protocol),其中 transport 支持ReadTransport接口而 protocol 是由 protocol_factory 所實例化的對象。使用
SelectorEventLoop事件循環(huán), pipe 被設置為非阻塞模式。
- coroutine loop.connect_write_pipe(protocol_factory, pipe)?
在事件循環(huán)中注冊 pipe 的寫入端。
protocol_factory 必須為一個返回 asyncio 協(xié)議 實現(xiàn)的可調用對象。
pipe 是個 類似文件型對象.
返回一對
(transport, protocol),其中 transport 支持WriteTransport接口而 protocol 是由 protocol_factory 所實例化的對象。使用
SelectorEventLoop事件循環(huán), pipe 被設置為非阻塞模式。
備注
在 Windows 中 SelectorEventLoop 不支持上述方法。 對于 Windows 請改用 ProactorEventLoop。
Unix 信號?
- loop.add_signal_handler(signum, callback, *args)?
設置 callback 作為 signum 信號的處理程序。
此回調將與該事件循環(huán)中其他加入隊列的回調和可運行協(xié)程一起由 loop 發(fā)起調用。 不同與使用
signal.signal()注冊的信號處理程序,使用此函數(shù)注冊的回調可以與事件循環(huán)進行交互。如果信號數(shù)字非法或者不可捕獲,就拋出一個
ValueError。如果建立處理器的過程中出現(xiàn)問題,會拋出一個RuntimeError。使用
functools.partial()傳遞關鍵字參數(shù) 給 callback.和
signal.signal()一樣,這個函數(shù)只能在主線程中調用。
- loop.remove_signal_handler(sig)?
移除 sig 信號的處理程序。
如果信號處理程序被移除則返回
True,否則如果給定信號未設置處理程序則返回False。可用性: Unix。
參見
signal 模塊。
在線程或者進程池中執(zhí)行代碼。?
- awaitable loop.run_in_executor(executor, func, *args)?
安排在指定的執(zhí)行器中調用 func 。
The executor argument should be an
concurrent.futures.Executorinstance. The default executor is used if executor isNone.示例:
import asyncio import concurrent.futures def blocking_io(): # File operations (such as logging) can block the # event loop: run them in a thread pool. with open('/dev/urandom', 'rb') as f: return f.read(100) def cpu_bound(): # CPU-bound operations will block the event loop: # in general it is preferable to run them in a # process pool. return sum(i * i for i in range(10 ** 7)) async def main(): loop = asyncio.get_running_loop() ## Options: # 1. Run in the default loop's executor: result = await loop.run_in_executor( None, blocking_io) print('default thread pool', result) # 2. Run in a custom thread pool: with concurrent.futures.ThreadPoolExecutor() as pool: result = await loop.run_in_executor( pool, blocking_io) print('custom thread pool', result) # 3. Run in a custom process pool: with concurrent.futures.ProcessPoolExecutor() as pool: result = await loop.run_in_executor( pool, cpu_bound) print('custom process pool', result) asyncio.run(main())
這個方法返回一個
asyncio.Future對象。使用
functools.partial()傳遞關鍵字參數(shù) 給 func 。在 3.5.3 版更改:
loop.run_in_executor()不會再配置它所創(chuàng)建的線程池執(zhí)行器的max_workers,而是將其留給線程池執(zhí)行器 (ThreadPoolExecutor) 來設置默認值。
- loop.set_default_executor(executor)?
Set executor as the default executor used by
run_in_executor(). executor must be an instance ofThreadPoolExecutor.在 3.11 版更改: executor must be an instance of
ThreadPoolExecutor.
錯誤處理API?
允許自定義事件循環(huán)中如何去處理異常。
- loop.set_exception_handler(handler)?
將 handler 設置為新的事件循環(huán)異常處理器。
如果 handler 為
None,將設置默認的異常處理程序。 在其他情況下,handler 必須是一個可調用對象且簽名匹配(loop, context),其中loop是對活動事件循環(huán)的引用,而context是一個包含異常詳情的dict(請查看call_exception_handler()文檔來獲取關于上下文的更多信息)。
- loop.get_exception_handler()?
返回當前的異常處理器,如果沒有設置異常處理器,則返回
None。3.5.2 新版功能.
- loop.default_exception_handler(context)?
默認的異常處理器。
此方法會在發(fā)生異常且未設置異常處理程序時被調用。 此方法也可以由想要具有不同于默認處理程序的行為的自定義異常處理程序來調用。
context 參數(shù)和
call_exception_handler()中的同名參數(shù)完全相同。
- loop.call_exception_handler(context)?
調用當前事件循環(huán)的異常處理器。
context 是個包含下列鍵的
dict對象(未來版本的Python可能會引入新鍵):'message': 錯誤消息;
'exception' (可選): 異常對象;
'future' (可選):
asyncio.Future實例;'task' (可選):
asyncio.Task實例;'handle' (可選):
asyncio.Handle實例;'protocol' (可選): Protocol 實例;
'transport' (可選): Transport 實例;
'socket' (可選):
socket.socket實例;- 'asyncgen' (可選): 異步生成器,它導致了
這個異常
備注
此方法不應在子類化的事件循環(huán)中被重載。 對于自定義的異常處理,請使用
set_exception_handler()方法。
開啟調試模式?
- loop.get_debug()?
獲取事件循環(huán)調試模式設置(
bool)。如果環(huán)境變量
PYTHONASYNCIODEBUG是一個非空字符串,就返回True,否則就返回False。
- loop.set_debug(enabled: bool)?
設置事件循環(huán)的調試模式。
在 3.7 版更改: 現(xiàn)在也可以通過新的 Python 開發(fā)模式 來啟用調試模式。
運行子進程?
本小節(jié)所描述的方法都是低層級的。 在常規(guī) async/await 代碼中請考慮改用高層級的 asyncio.create_subprocess_shell() 和 asyncio.create_subprocess_exec() 便捷函數(shù)。
備注
On Windows, the default event loop ProactorEventLoop supports
subprocesses, whereas SelectorEventLoop does not. See
Subprocess Support on Windows for
details.
- coroutine loop.subprocess_exec(protocol_factory, *args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)?
用 args 指定的一個或者多個字符串型參數(shù)創(chuàng)建一個子進程。
args 必須是個由下列形式的字符串組成的列表:
str;或者由 文件系統(tǒng)編碼格式 編碼的
bytes。
第一個字符串指定可執(zhí)行程序,其余的字符串指定其參數(shù)。 所有字符串參數(shù)共同組成了程序的
argv。此方法類似于調用標準庫
subprocess.Popen類,設置shell=False并將字符串列表作為第一個參數(shù)傳入;但是,Popen只接受一個單獨的字符串列表參數(shù),而 subprocess_exec 接受多個字符串參數(shù)。protocol_factory 必須為一個返回
asyncio.SubprocessProtocol類的子類的可調用對象。其他參數(shù):
stdin 可以是以下對象之一:
一個文件類對象,表示要使用
connect_write_pipe()連接到子進程的標準輸入流的管道subprocess.PIPE常量(默認),將創(chuàng)建并連接一個新的管道。None值,這將使得子進程繼承來自此進程的文件描述符subprocess.DEVNULL常量,這表示將使用特殊的os.devnull文件
stdout 可以是以下對象之一:
一個文件類對象,表示要使用
connect_write_pipe()連接到子進程的標準輸出流的管道subprocess.PIPE常量(默認),將創(chuàng)建并連接一個新的管道。None值,這將使得子進程繼承來自此進程的文件描述符subprocess.DEVNULL常量,這表示將使用特殊的os.devnull文件
stderr 可以是以下對象之一:
一個文件類對象,表示要使用
connect_write_pipe()連接到子進程的標準錯誤流的管道subprocess.PIPE常量(默認),將創(chuàng)建并連接一個新的管道。None值,這將使得子進程繼承來自此進程的文件描述符subprocess.DEVNULL常量,這表示將使用特殊的os.devnull文件subprocess.STDOUT常量,將把標準錯誤流連接到進程的標準輸出流
所有其他關鍵字參數(shù)會被不加解釋地傳給
subprocess.Popen,除了 bufsize, universal_newlines, shell, text, encoding 和 errors,它們都不應當被指定。asyncio子進程 API 不支持將流解碼為文本。 可以使用bytes.decode()來將從流返回的字節(jié)串轉換為文本。
其他參數(shù)的文檔,請參閱
subprocess.Popen類的構造函數(shù)。返回一對
(transport, protocol),其中 transport 來自asyncio.SubprocessTransport基類而 protocol 是由 protocol_factory 所實例化的對象。
- coroutine loop.subprocess_shell(protocol_factory, cmd, *, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)?
基于 cmd 創(chuàng)建一個子進程,該參數(shù)可以是一個
str或者按 文件系統(tǒng)編碼格式 編碼得到的bytes,使用平臺的 "shell" 語法。這類似與用
shell=True調用標準庫的subprocess.Popen類。protocol_factory 必須為一個返回
SubprocessProtocol類的子類的可調用對象。請參閱
subprocess_exec()了解有關其余參數(shù)的詳情。返回一對
(transport, protocol),其中 transport 來自SubprocessTransport基類而 protocol 是由 protocol_factory 所實例化的對象。
備注
應用程序要負責確保正確地轉義所有空白字符和特殊字符以防止 shell 注入 漏洞。 shlex.quote() 函數(shù)可以被用來正確地轉義字符串中可能被用來構造 shell 命令的空白字符和特殊字符。
回調處理?
- class asyncio.Handle?
由
loop.call_soon(),loop.call_soon_threadsafe()所返回的回調包裝器對象。- cancel()?
取消回調。 如果此回調已被取消或已被執(zhí)行,此方法將沒有任何效果。
- cancelled()?
如果此回調已被取消則返回
True。3.7 新版功能.
- class asyncio.TimerHandle?
由
loop.call_later()和loop.call_at()所返回的回調包裝器對象。這個類是
Handle的子類。- when()?
返回加入計劃任務的回調時間,以
float值表示的秒數(shù)。時間值是一個絕對時間戳,使用與
loop.time()相同的時間引用。3.7 新版功能.
Server 對象?
Server 對象可使用 loop.create_server(), loop.create_unix_server(), start_server() 和 start_unix_server() 等函數(shù)來創(chuàng)建。
請不要直接實例化該類。
- class asyncio.Server?
Server 對象是異步上下文管理器。當用于
async with語句時,異步上下文管理器可以確保 Server 對象被關閉,并且在async with語句完成后,不接受新的連接。srv = await loop.create_server(...) async with srv: # some code # At this point, srv is closed and no longer accepts new connections.
在 3.7 版更改: Python3.7 開始,Server 對象是一個異步上下文管理器。
- close()?
停止服務:關閉監(jiān)聽的套接字并且設置
sockets屬性為None。用于表示已經連進來的客戶端連接會保持打開的狀態(tài)。
服務器是被異步關閉的,使用
wait_closed()協(xié)程來等待服務器關閉。
- get_loop()?
返回與服務器對象相關聯(lián)的事件循環(huán)。
3.7 新版功能.
- coroutine start_serving()?
開始接受連接。
This method is idempotent, so it can be called when the server is already serving.
傳給
loop.create_server()和asyncio.start_server()的 start_serving 僅限關鍵字形參允許創(chuàng)建不接受初始連接的 Server 對象。 在此情況下可以使用Server.start_serving()或Server.serve_forever()讓 Server 對象開始接受連接。3.7 新版功能.
- coroutine serve_forever()?
開始接受連接,直到協(xié)程被取消。
serve_forever任務的取消將導致服務器被關閉。如果服務器已經在接受連接了,這個方法可以被調用。每個 Server 對象,僅能有一個
serve_forever任務。示例:
async def client_connected(reader, writer): # Communicate with the client with # reader/writer streams. For example: await reader.readline() async def main(host, port): srv = await asyncio.start_server( client_connected, host, port) await srv.serve_forever() asyncio.run(main('127.0.0.1', 0))
3.7 新版功能.
- is_serving()?
如果服務器正在接受新連接的狀態(tài),返回
True。3.7 新版功能.
- sockets?
服務器監(jiān)聽的
socket.socket對象列表。在 3.7 版更改: 在 Python 3.7 之前
Server.sockets會直接返回內部的服務器套接字列表。 在 3.7 版則會返回該列表的副本。
事件循環(huán)實現(xiàn)?
asyncio 帶有兩種不同的事件循環(huán)實現(xiàn): SelectorEventLoop 和 ProactorEventLoop。
默認情況下 asyncio 被配置為在 Unix 上使用 SelectorEventLoop 而在 Windows 上使用 ProactorEventLoop。
- class asyncio.SelectorEventLoop?
基于
selectors模塊的事件循環(huán)。使用給定平臺中最高效的可用 selector。 也可以手動配置要使用的特定 selector:
import asyncio import selectors selector = selectors.SelectSelector() loop = asyncio.SelectorEventLoop(selector) asyncio.set_event_loop(loop)
可用性: Unix, Windows。
- class asyncio.ProactorEventLoop?
用 "I/O Completion Ports" (IOCP) 構建的專為Windows 的事件循環(huán)。
可用性: Windows。
- class asyncio.AbstractEventLoop?
asyncio 兼容事件循環(huán)的抽象基類。
事件循環(huán)方法 一節(jié)列出了
AbstractEventLoop的替代實現(xiàn)應當定義的所有方法。
例子?
請注意本節(jié)中的所有示例都 有意地 演示了如何使用低層級的事件循環(huán) API,例如 loop.run_forever() 和 loop.call_soon()。 現(xiàn)代的 asyncio 應用很少需要以這樣的方式編寫;請考慮使用高層級的函數(shù)例如 asyncio.run()。
call_soon() 的 Hello World 示例。?
一個使用 loop.call_soon() 方法來安排回調的示例。 回調會顯示 "Hello World" 然后停止事件循環(huán):
import asyncio
def hello_world(loop):
"""A callback to print 'Hello World' and stop the event loop"""
print('Hello World')
loop.stop()
loop = asyncio.get_event_loop()
# Schedule a call to hello_world()
loop.call_soon(hello_world, loop)
# Blocking call interrupted by loop.stop()
try:
loop.run_forever()
finally:
loop.close()
參見
一個類似的 Hello World 示例,使用協(xié)程和 run() 函數(shù)創(chuàng)建。
使用 call_later() 來展示當前的日期?
一個每秒刷新顯示當前日期的示例。 回調使用 loop.call_later() 方法在 5 秒后將自身重新加入計劃日程,然后停止事件循環(huán):
import asyncio
import datetime
def display_date(end_time, loop):
print(datetime.datetime.now())
if (loop.time() + 1.0) < end_time:
loop.call_later(1, display_date, end_time, loop)
else:
loop.stop()
loop = asyncio.get_event_loop()
# Schedule the first call to display_date()
end_time = loop.time() + 5.0
loop.call_soon(display_date, end_time, loop)
# Blocking call interrupted by loop.stop()
try:
loop.run_forever()
finally:
loop.close()
參見
一個類似的 current date 示例,使用協(xié)程和 run() 函數(shù)創(chuàng)建。
監(jiān)控一個文件描述符的讀事件?
使用 loop.add_reader() 方法,等到文件描述符收到一些數(shù)據(jù),然后關閉事件循環(huán):
import asyncio
from socket import socketpair
# Create a pair of connected file descriptors
rsock, wsock = socketpair()
loop = asyncio.get_event_loop()
def reader():
data = rsock.recv(100)
print("Received:", data.decode())
# We are done: unregister the file descriptor
loop.remove_reader(rsock)
# Stop the event loop
loop.stop()
# Register the file descriptor for read event
loop.add_reader(rsock, reader)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
try:
# Run the event loop
loop.run_forever()
finally:
# We are done. Close sockets and the event loop.
rsock.close()
wsock.close()
loop.close()
參見
一個類似的 示例,使用傳輸、協(xié)議和
loop.create_connection()方法創(chuàng)建。另一個類似的 示例,使用了高層級的
asyncio.open_connection()函數(shù)和流。
為SIGINT和SIGTERM設置信號處理器?
(這個 signals 示例只適用于 Unix。)
使用 loop.add_signal_handler() 方法為信號 SIGINT 和 SIGTERM 注冊處理程序:
import asyncio
import functools
import os
import signal
def ask_exit(signame, loop):
print("got signal %s: exit" % signame)
loop.stop()
async def main():
loop = asyncio.get_running_loop()
for signame in {'SIGINT', 'SIGTERM'}:
loop.add_signal_handler(
getattr(signal, signame),
functools.partial(ask_exit, signame, loop))
await asyncio.sleep(3600)
print("Event loop running for 1 hour, press Ctrl+C to interrupt.")
print(f"pid {os.getpid()}: send SIGINT or SIGTERM to exit.")
asyncio.run(main())