assignUser commented on issue #43929:
URL: https://github.com/apache/arrow/issues/43929#issuecomment-2327641943

   I created a quick and very simple PoC. Run `producer.py` first, this will 
create a table and write it to shared memory as  a stream (I think `new_file` 
should also work but had issue getting it to accept the shared memory). Then 
run `consumer.py` in a separate shell. 
   
   ```py
   # producer.py
   from multiprocessing import shared_memory
   import numpy as np
   import pyarrow as pa
   
   data_size = 1024 * 1024
   # Estimate the number of elements to roughly achieve the target of ~1MB total
   # Assuming half the space for each type based on equal elements, not equal 
bytes
   num_elements = data_size // (
       4 + 8
   )  # Total space divided by the sum of bytes per element of each type
   
   # Generate random data for each type
   int_data = np.random.randint(0, 100000, size=num_elements, dtype=np.int32)
   float_data = np.random.random(size=num_elements) * 100.0
   
   int_array = pa.array(int_data)
   float_array = pa.array(float_data)
   
   table = pa.Table.from_arrays([int_array, float_array], names=["integers", 
"floats"])
   print("rows: ", table.num_rows)
   print(table)
   
   
   # Create shared memory twice the size of our data
   # (re-use would need to check for existence, locks if blocking process 
hasn't finished processing etc... )
   new_share = shared_memory.SharedMemory("new_share", create=True, 
size=data_size * 2)
   
   #turn the shared memory buffer into a pyarrow stream
   sink = pa.output_stream(new_share.buf)
   
   with pa.ipc.new_stream(sink, table.schema) as writer:
     writer.write_table(table, max_chunksize=1000)
     writer.close() # write EOS marker
   
   # Keep process alive so the shared_memory isn't cleared
   while(True):
       pass
   ```
   ```py
   #consumer.py
   from multiprocessing import shared_memory
   import pyarrow as pa
   
   # Open the existing memory share
   share = shared_memory.SharedMemory("new_share")
   
   # turn the shared memory buffer into a pyarrow stream
   sink = pa.input_stream(share.buf)
   
   shared_table = None
   
   reader = pa.ipc.open_stream(sink)
   shared_table = reader.read_all()
   
   print("rows: ", shared_table.num_rows)
   print(shared_table)
   
   # Remove all objects that hold a reference to the shared memory
   del shared_table
   del sink
   del reader
   share.close()
   share.unlink()
   ```
   This will of course then also need some form of lock to make sure data is 
not overwritten before it's processed by the next process etc. (but I assume 
you have something like that in place already.) Using the 
[SharedMemoryManager](https://docs.python.org/3/library/multiprocessing.shared_memory.html#multiprocessing.managers.SharedMemoryManager)
 might also offer advantages but I didn't look into it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to