xubinlaile removed a comment on issue #8270: URL: https://github.com/apache/arrow/issues/8270#issuecomment-699019657
> > may this help: > > https://stackoverflow.com/questions/50916422/python-typeerror-object-of-type-int64-is-not-json-serializable > > Maybe I didn't read it carefully but that does not look like a problem with Pandas. > > may this help: > > https://stackoverflow.com/questions/50916422/python-typeerror-object-of-type-int64-is-not-json-serializable > > Maybe I didn't read it carefully but that does not look like a problem with Pandas. code as follows. with the stackoverflow anwsers , i modify pandas_compat.py ,it work . from multiprocessing import Pool import numpy as np import pandas as pd import pyarrow as pa import pyarrow.plasma as plasma import subprocess import time client = None object_store_size = 2 * 10 ** 9 # 2 GB num_cores = 8 num_rows = 200000 num_cols = 2 column_names = [str(i) for i in range(num_cols)] column_to_sort = column_names[0] # Connect to clients def connect(): global client client = plasma.connect('/tmp/store') np.random.seed(int(time.time() * 10e7) % 10000000) def put_df(df): record_batch = pa.RecordBatch.from_pandas(df) # Get size of record batch and schema mock_sink = pa.MockOutputStream() stream_writer = pa.RecordBatchStreamWriter(mock_sink, record_batch.schema) stream_writer.write_batch(record_batch) data_size = mock_sink.size() # Generate an ID and allocate a buffer in the object store for the # serialized DataFrame object_id = plasma.ObjectID(np.random.bytes(20)) buf = client.create(object_id, data_size) # Write the serialized DataFrame to the object store sink = pa.FixedSizeBufferWriter(buf) stream_writer = pa.RecordBatchStreamWriter(sink, record_batch.schema) stream_writer.write_batch(record_batch) # Seal the object client.seal(object_id) return object_id def get_dfs(object_ids): """Retrieve dataframes from the object store given their object IDs.""" buffers = client.get_buffers(object_ids) return [pa.RecordBatchStreamReader(buf).read_next_batch().to_pandas() for buf in buffers] if __name__ == '__main__': # Start the plasma store. p = subprocess.Popen(['plasma_store', '-s', '/tmp/store', '-m', str(object_store_size)]) # Connect to the plasma store. connect() # Connect the processes in the pool. pool = Pool(initializer=connect, initargs=(), processes=num_cores) # Create a DataFrame from a numpy array. df = pd.DataFrame(np.random.randn(num_rows, num_cols), columns=column_names) partition_ids = [put_df(partition) for partition in np.split(df, num_cores)] print(partition_ids) df = get_dfs(partition_ids) print(df) # Kill the object store. p.kill() ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
