assignUser commented on issue #43929:
URL: https://github.com/apache/arrow/issues/43929#issuecomment-2327641943
I created a quick and very simple PoC. Run `producer.py` first, this will
create a table and write it to shared memory as a stream (I think `new_file`
should also work but had issue getting it to accept the shared memory). Then
run `consumer.py` in a separate shell.
```py
# producer.py
from multiprocessing import shared_memory
import numpy as np
import pyarrow as pa
data_size = 1024 * 1024
# Estimate the number of elements to roughly achieve the target of ~1MB total
# Assuming half the space for each type based on equal elements, not equal
bytes
num_elements = data_size // (
4 + 8
) # Total space divided by the sum of bytes per element of each type
# Generate random data for each type
int_data = np.random.randint(0, 100000, size=num_elements, dtype=np.int32)
float_data = np.random.random(size=num_elements) * 100.0
int_array = pa.array(int_data)
float_array = pa.array(float_data)
table = pa.Table.from_arrays([int_array, float_array], names=["integers",
"floats"])
print("rows: ", table.num_rows)
print(table)
# Create shared memory twice the size of our data
# (re-use would need to check for existence, locks if blocking process
hasn't finished processing etc... )
new_share = shared_memory.SharedMemory("new_share", create=True,
size=data_size * 2)
#turn the shared memory buffer into a pyarrow stream
sink = pa.output_stream(new_share.buf)
with pa.ipc.new_stream(sink, table.schema) as writer:
writer.write_table(table, max_chunksize=1000)
writer.close() # write EOS marker
# Keep process alive so the shared_memory isn't cleared
while(True):
pass
```
```py
#consumer.py
from multiprocessing import shared_memory
import pyarrow as pa
# Open the existing memory share
share = shared_memory.SharedMemory("new_share")
# turn the shared memory buffer into a pyarrow stream
sink = pa.input_stream(share.buf)
shared_table = None
reader = pa.ipc.open_stream(sink)
shared_table = reader.read_all()
print("rows: ", shared_table.num_rows)
print(shared_table)
# Remove all objects that hold a reference to the shared memory
del shared_table
del sink
del reader
share.close()
share.unlink()
```
This will of course then also need some form of lock to make sure data is
not overwritten before it's processed by the next process etc. (but I assume
you have something like that in place already.) Using the
[SharedMemoryManager](https://docs.python.org/3/library/multiprocessing.shared_memory.html#multiprocessing.managers.SharedMemoryManager)
might also offer advantages but I didn't look into it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]