multiprocessing.shared_memory --- 可從進(jìn)程直接訪問的共享內(nèi)存?

源代碼: Lib/multiprocessing/shared_memory.py

3.8 新版功能.


該模塊提供了一個(gè) SharedMemory 類,用于分配和管理多核或?qū)ΨQ多處理器(SMP)機(jī)器上進(jìn)程間的共享內(nèi)存。為了協(xié)助管理不同進(jìn)程間的共享內(nèi)存生命周期,multiprocessing.managers 模塊也提供了一個(gè) BaseManager 的子類: SharedMemoryManager

本模塊中,共享內(nèi)存是指 "System V 類型" 的共享內(nèi)存塊(雖然可能和它實(shí)現(xiàn)方式不完全一致)而不是 “分布式共享內(nèi)存”。這種類型的的共享內(nèi)存允許不同進(jìn)程讀寫一片公共(或者共享)的易失性存儲區(qū)域。一般來說,進(jìn)程被限制只能訪問屬于自己進(jìn)程空間的內(nèi)存,但是共享內(nèi)存允許跨進(jìn)程共享數(shù)據(jù),從而避免通過進(jìn)程間發(fā)送消息的形式傳遞數(shù)據(jù)。相比通過磁盤、套接字或者其他要求序列化、反序列化和復(fù)制數(shù)據(jù)的共享形式,直接通過內(nèi)存共享數(shù)據(jù)擁有更出色性能。

class multiprocessing.shared_memory.SharedMemory(name=None, create=False, size=0)?

創(chuàng)建一個(gè)新的共享內(nèi)存塊或者連接到一片已經(jīng)存在的共享內(nèi)存塊。每個(gè)共享內(nèi)存塊都被指定了一個(gè)全局唯一的名稱。通過這種方式,進(jìn)程可以使用一個(gè)特定的名字創(chuàng)建共享內(nèi)存區(qū)塊,然后其他進(jìn)程使用同樣的名字連接到這個(gè)共享內(nèi)存塊。

作為一種跨進(jìn)程共享數(shù)據(jù)的方式,共享內(nèi)存塊的壽命可能超過創(chuàng)建它的原始進(jìn)程。一個(gè)共享內(nèi)存塊可能同時(shí)被多個(gè)進(jìn)程使用,當(dāng)一個(gè)進(jìn)程不再需要訪問這個(gè)共享內(nèi)存塊的時(shí)候,應(yīng)該調(diào)用 close() 方法。當(dāng)一個(gè)共享內(nèi)存塊不被任何進(jìn)程使用的時(shí)候,應(yīng)該調(diào)用 unlink() 方法以執(zhí)行必要的清理。

name 是共享內(nèi)存的唯一名稱,字符串類型。如果創(chuàng)建一個(gè)新共享內(nèi)存塊的時(shí)候,名稱指定為 None (默認(rèn)值),將會隨機(jī)產(chǎn)生一個(gè)新名稱。

create 指定創(chuàng)建一個(gè)新的共享內(nèi)存塊 (True) 還是連接到已存在的共享內(nèi)存塊 (False) 。

如果是新創(chuàng)建共享內(nèi)存塊則 size 用于指定塊的大小為多少字節(jié)。由于某些平臺是以內(nèi)存頁大小為最小單位來分配內(nèi)存的,最終得到的內(nèi)存塊大小可能大于或等于要求的大小。如果是連接到已經(jīng)存在的共享內(nèi)存塊, size 參數(shù)會被忽略。

close()?

關(guān)閉實(shí)例對于共享內(nèi)存的訪問連接。所有實(shí)例確認(rèn)自己不再需要使用共享內(nèi)存的時(shí)候都應(yīng)該調(diào)用 close() ,以保證必要的資源清理。調(diào)用 close() 并不會銷毀共享內(nèi)存區(qū)域。

請求銷毀底層的共享內(nèi)存塊。為了執(zhí)行必要的資源清理, 在所有使用這個(gè)共享內(nèi)存塊的進(jìn)程中, unlink() 應(yīng)該調(diào)用一次(且只能調(diào)用一次) 。發(fā)出此銷毀請求后,共享內(nèi)存塊可能會、也可能不會立即銷毀,且此行為在不同操作系統(tǒng)之間可能不同。調(diào)用 unlink() 后再嘗試方位其中的數(shù)據(jù)可能導(dǎo)致內(nèi)存錯誤。注意: 最后一個(gè)關(guān)閉共享內(nèi)存訪問權(quán)限的進(jìn)程可以以任意順序調(diào)用 unlink()close() 。

buf?

共享內(nèi)存塊內(nèi)容的 memoryview 。

name?

共享內(nèi)存塊的唯一標(biāo)識,只讀屬性。

size?

共享內(nèi)存塊的字節(jié)大小,只讀屬性。

以下示例展示了 SharedMemory 底層的用法:

>>>
>>> from multiprocessing import shared_memory
>>> shm_a = shared_memory.SharedMemory(create=True, size=10)
>>> type(shm_a.buf)
<class 'memoryview'>
>>> buffer = shm_a.buf
>>> len(buffer)
10
>>> buffer[:4] = bytearray([22, 33, 44, 55])  # Modify multiple at once
>>> buffer[4] = 100                           # Modify single byte at a time
>>> # Attach to an existing shared memory block
>>> shm_b = shared_memory.SharedMemory(shm_a.name)
>>> import array
>>> array.array('b', shm_b.buf[:5])  # Copy the data into a new array.array
array('b', [22, 33, 44, 55, 100])
>>> shm_b.buf[:5] = b'howdy'  # Modify via shm_b using bytes
>>> bytes(shm_a.buf[:5])      # Access via shm_a
b'howdy'
>>> shm_b.close()   # Close each SharedMemory instance
>>> shm_a.close()
>>> shm_a.unlink()  # Call unlink only once to release the shared memory

以下示例展示了一個(gè)現(xiàn)實(shí)中的例子,使用 SharedMemory 類和 NumPy arrays 結(jié)合, 從兩個(gè) Python shell 中訪問同一個(gè) numpy.ndarray :

>>>
>>> # In the first Python interactive shell
>>> import numpy as np
>>> a = np.array([1, 1, 2, 3, 5, 8])  # Start with an existing NumPy array
>>> from multiprocessing import shared_memory
>>> shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
>>> # Now create a NumPy array backed by shared memory
>>> b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
>>> b[:] = a[:]  # Copy the original data into shared memory
>>> b
array([1, 1, 2, 3, 5, 8])
>>> type(b)
<class 'numpy.ndarray'>
>>> type(a)
<class 'numpy.ndarray'>
>>> shm.name  # We did not specify a name so one was chosen for us
'psm_21467_46075'

>>> # In either the same shell or a new Python shell on the same machine
>>> import numpy as np
>>> from multiprocessing import shared_memory
>>> # Attach to the existing shared memory block
>>> existing_shm = shared_memory.SharedMemory(name='psm_21467_46075')
>>> # Note that a.shape is (6,) and a.dtype is np.int64 in this example
>>> c = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf)
>>> c
array([1, 1, 2, 3, 5, 8])
>>> c[-1] = 888
>>> c
array([  1,   1,   2,   3,   5, 888])

>>> # Back in the first Python interactive shell, b reflects this change
>>> b
array([  1,   1,   2,   3,   5, 888])

>>> # Clean up from within the second Python shell
>>> del c  # Unnecessary; merely emphasizing the array is no longer used
>>> existing_shm.close()

>>> # Clean up from within the first Python shell
>>> del b  # Unnecessary; merely emphasizing the array is no longer used
>>> shm.close()
>>> shm.unlink()  # Free and release the shared memory block at the very end
class multiprocessing.managers.SharedMemoryManager([address[, authkey]])?

BaseManager 的子類,可用于管理跨進(jìn)程的共享內(nèi)存塊。

調(diào)用 SharedMemoryManager 實(shí)例上的 start() 方法會啟動一個(gè)新進(jìn)程。這個(gè)新進(jìn)程的唯一目的就是管理所有由它創(chuàng)建的共享內(nèi)存塊的生命周期。想要釋放此進(jìn)程管理的所有共享內(nèi)存塊,可以調(diào)用實(shí)例的 shutdown() 方法。這會觸發(fā)執(zhí)行它管理的所有 SharedMemory 對象的 SharedMemory.unlink() 方法,然后停止這個(gè)進(jìn)程。通過 SharedMemoryManager 創(chuàng)建 SharedMemory 實(shí)例,我們可以避免手動跟蹤和釋放共享內(nèi)存資源。

這個(gè)類提供了創(chuàng)建和返回 SharedMemory 實(shí)例的方法,以及以共享內(nèi)存為基礎(chǔ)創(chuàng)建一個(gè)列表類對象 (ShareableList) 的方法。

有關(guān)繼承的可選輸入?yún)?shù) addressauthkey 以及他們?nèi)绾斡糜趶倪M(jìn)程連接已經(jīng)存在的 SharedMemoryManager 服務(wù),參見 multiprocessing.managers.BaseManager 。

SharedMemory(size)?

使用 size 參數(shù),創(chuàng)建一個(gè)新的指定字節(jié)大小的 SharedMemory 對象并返回。

ShareableList(sequence)?

創(chuàng)建并返回一個(gè)新的 ShareableList 對象,通過輸入?yún)?shù) sequence 初始化。

下面的案例展示了 SharedMemoryManager 的基本機(jī)制:

>>>
>>> from multiprocessing.managers import SharedMemoryManager
>>> smm = SharedMemoryManager()
>>> smm.start()  # Start the process that manages the shared memory blocks
>>> sl = smm.ShareableList(range(4))
>>> sl
ShareableList([0, 1, 2, 3], name='psm_6572_7512')
>>> raw_shm = smm.SharedMemory(size=128)
>>> another_sl = smm.ShareableList('alpha')
>>> another_sl
ShareableList(['a', 'l', 'p', 'h', 'a'], name='psm_6572_12221')
>>> smm.shutdown()  # Calls unlink() on sl, raw_shm, and another_sl

以下案例展示了 SharedMemoryManager 對象的一種可能更方便的使用方式,通過 with 語句來保證所有共享內(nèi)存塊在使用完后被釋放。

>>>
>>> with SharedMemoryManager() as smm:
...     sl = smm.ShareableList(range(2000))
...     # Divide the work among two processes, storing partial results in sl
...     p1 = Process(target=do_work, args=(sl, 0, 1000))
...     p2 = Process(target=do_work, args=(sl, 1000, 2000))
...     p1.start()
...     p2.start()  # A multiprocessing.Pool might be more efficient
...     p1.join()
...     p2.join()   # Wait for all work to complete in both processes
...     total_result = sum(sl)  # Consolidate the partial results now in sl

with 語句中使用 SharedMemoryManager  對象的時(shí)候,使用這個(gè)管理器創(chuàng)建的共享內(nèi)存塊會在 with 語句代碼塊結(jié)束后被釋放。

class multiprocessing.shared_memory.ShareableList(sequence=None, *, name=None)?

提供一個(gè)可修改的類 list 對象,其中所有值都存放在共享內(nèi)存塊中。這限制了可被存儲在其中的值只能是 int, float, bool, str (每條數(shù)據(jù)小于10M), bytes (每條數(shù)據(jù)小于10M)以及 None 這些內(nèi)置類型。它另一個(gè)顯著區(qū)別于內(nèi)置 list 類型的地方在于它的長度無法修改(比如,沒有 append, insert 等操作)且不支持通過切片操作動態(tài)創(chuàng)建新的 ShareableList  實(shí)例。

sequence 會被用來為一個(gè)新的 ShareableList 填充值。 設(shè)為 None 則會基于唯一的共享內(nèi)存名稱關(guān)聯(lián)到已經(jīng)存在的 ShareableList。

name 是所請求的共享內(nèi)存的唯一名稱,與 SharedMemory 的定義中所描述的一致。 當(dāng)關(guān)聯(lián)到現(xiàn)有的 ShareableList 時(shí),則指明其共享內(nèi)存塊的唯一名稱并將 sequence 設(shè)為 None。

count(value)?

返回 value 出現(xiàn)的次數(shù)。

index(value)?

返回 value 首次出現(xiàn)的位置,如果 value 不存在, 則拋出 ValueError 異常。

format?

包含由所有當(dāng)前存儲值所使用的 struct 打包格式的只讀屬性。

shm?

存儲了值的 SharedMemory 實(shí)例。

下面的例子演示了 ShareableList 實(shí)例的基本用法:

>>>
>>> from multiprocessing import shared_memory
>>> a = shared_memory.ShareableList(['howdy', b'HoWdY', -273.154, 100, None, True, 42])
>>> [ type(entry) for entry in a ]
[<class 'str'>, <class 'bytes'>, <class 'float'>, <class 'int'>, <class 'NoneType'>, <class 'bool'>, <class 'int'>]
>>> a[2]
-273.154
>>> a[2] = -78.5
>>> a[2]
-78.5
>>> a[2] = 'dry ice'  # Changing data types is supported as well
>>> a[2]
'dry ice'
>>> a[2] = 'larger than previously allocated storage space'
Traceback (most recent call last):
  ...
ValueError: exceeds available storage for existing str
>>> a[2]
'dry ice'
>>> len(a)
7
>>> a.index(42)
6
>>> a.count(b'howdy')
0
>>> a.count(b'HoWdY')
1
>>> a.shm.close()
>>> a.shm.unlink()
>>> del a  # Use of a ShareableList after call to unlink() is unsupported

下面的例子演示了一個(gè)、兩個(gè)或多個(gè)進(jìn)程如何通過提供下層的共享內(nèi)存塊名稱來訪問同一個(gè) ShareableList:

>>>
>>> b = shared_memory.ShareableList(range(5))         # In a first process
>>> c = shared_memory.ShareableList(name=b.shm.name)  # In a second process
>>> c
ShareableList([0, 1, 2, 3, 4], name='...')
>>> c[-1] = -999
>>> b[-1]
-999
>>> b.shm.close()
>>> c.shm.close()
>>> c.shm.unlink()

The following examples demonstrates that ShareableList (and underlying SharedMemory) objects can be pickled and unpickled if needed. Note, that it will still be the same shared object. This happens, because the deserialized object has the same unique name and is just attached to an existing object with the same name (if the object is still alive):

>>>
>>> import pickle
>>> from multiprocessing import shared_memory
>>> sl = shared_memory.ShareableList(range(10))
>>> list(sl)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>>
>>> deserialized_sl = pickle.loads(pickle.dumps(sl))
>>> list(deserialized_sl)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>>
>>> sl[0] = -1
>>> deserialized_sl[1] = -2
>>> list(sl)
[-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
>>> list(deserialized_sl)
[-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
>>>
>>> sl.shm.close()
>>> sl.shm.unlink()