assignUser commented on issue #43929: URL: https://github.com/apache/arrow/issues/43929#issuecomment-2327641943
I created a quick and very simple PoC. Run `producer.py` first, this will create a table and write it to shared memory as a stream (I think `new_file` should also work but had issue getting it to accept the shared memory). Then run `consumer.py` in a separate shell. ```py # producer.py from multiprocessing import shared_memory import numpy as np import pyarrow as pa data_size = 1024 * 1024 # Estimate the number of elements to roughly achieve the target of ~1MB total # Assuming half the space for each type based on equal elements, not equal bytes num_elements = data_size // ( 4 + 8 ) # Total space divided by the sum of bytes per element of each type # Generate random data for each type int_data = np.random.randint(0, 100000, size=num_elements, dtype=np.int32) float_data = np.random.random(size=num_elements) * 100.0 int_array = pa.array(int_data) float_array = pa.array(float_data) table = pa.Table.from_arrays([int_array, float_array], names=["integers", "floats"]) print("rows: ", table.num_rows) print(table) # Create shared memory twice the size of our data # (re-use would need to check for existence, locks if blocking process hasn't finished processing etc... ) new_share = shared_memory.SharedMemory("new_share", create=True, size=data_size * 2) #turn the shared memory buffer into a pyarrow stream sink = pa.output_stream(new_share.buf) with pa.ipc.new_stream(sink, table.schema) as writer: writer.write_table(table, max_chunksize=1000) writer.close() # write EOS marker # Keep process alive so the shared_memory isn't cleared while(True): pass ``` ```py #consumer.py from multiprocessing import shared_memory import pyarrow as pa # Open the existing memory share share = shared_memory.SharedMemory("new_share") # turn the shared memory buffer into a pyarrow stream sink = pa.input_stream(share.buf) shared_table = None reader = pa.ipc.open_stream(sink) shared_table = reader.read_all() print("rows: ", shared_table.num_rows) print(shared_table) # Remove all objects that hold a reference to the shared memory del shared_table del sink del reader share.close() share.unlink() ``` This will of course then also need some form of lock to make sure data is not overwritten before it's processed by the next process etc. (but I assume you have something like that in place already.) Using the [SharedMemoryManager](https://docs.python.org/3/library/multiprocessing.shared_memory.html#multiprocessing.managers.SharedMemoryManager) might also offer advantages but I didn't look into it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org