xubinlaile removed a comment on issue #8270:
URL: https://github.com/apache/arrow/issues/8270#issuecomment-699019657


   > > may this help:
   > > 
https://stackoverflow.com/questions/50916422/python-typeerror-object-of-type-int64-is-not-json-serializable
   > 
   > Maybe I didn't read it carefully but that does not look like a problem 
with Pandas.
   
   
   
   > > may this help:
   > > 
https://stackoverflow.com/questions/50916422/python-typeerror-object-of-type-int64-is-not-json-serializable
   > 
   > Maybe I didn't read it carefully but that does not look like a problem 
with Pandas.
   code as follows. with the stackoverflow anwsers , i  modify  
pandas_compat.py ,it work .
   
   from multiprocessing import Pool
   import numpy as np
   import pandas as pd
   import pyarrow as pa
   import pyarrow.plasma as plasma
   import subprocess
   import time
   
   
   client = None
   object_store_size = 2 * 10 ** 9  # 2 GB
   num_cores = 8
   num_rows = 200000
   num_cols = 2
   column_names = [str(i) for i in range(num_cols)]
   column_to_sort = column_names[0]
   
   
   # Connect to clients
   def connect():
       global client
       client = plasma.connect('/tmp/store')
       np.random.seed(int(time.time() * 10e7) % 10000000)
   
   
   def put_df(df):
       record_batch = pa.RecordBatch.from_pandas(df)
   
       # Get size of record batch and schema
       mock_sink = pa.MockOutputStream()
       stream_writer = pa.RecordBatchStreamWriter(mock_sink, 
record_batch.schema)
       stream_writer.write_batch(record_batch)
       data_size = mock_sink.size()
   
       # Generate an ID and allocate a buffer in the object store for the
       # serialized DataFrame
       object_id = plasma.ObjectID(np.random.bytes(20))
       buf = client.create(object_id, data_size)
   
       # Write the serialized DataFrame to the object store
       sink = pa.FixedSizeBufferWriter(buf)
       stream_writer = pa.RecordBatchStreamWriter(sink, record_batch.schema)
       stream_writer.write_batch(record_batch)
   
       # Seal the object
       client.seal(object_id)
   
       return object_id
   
   
   def get_dfs(object_ids):
       """Retrieve dataframes from the object store given their object IDs."""
       buffers = client.get_buffers(object_ids)
       return [pa.RecordBatchStreamReader(buf).read_next_batch().to_pandas()
               for buf in buffers]
   
   
   
   
   if __name__ == '__main__':
       # Start the plasma store.
       p = subprocess.Popen(['plasma_store',
                             '-s', '/tmp/store',
                             '-m', str(object_store_size)])
   
       # Connect to the plasma store.
       connect()
   
       # Connect the processes in the pool.
       pool = Pool(initializer=connect, initargs=(), processes=num_cores)
   
       # Create a DataFrame from a numpy array.
       df = pd.DataFrame(np.random.randn(num_rows, num_cols),
                         columns=column_names)
   
       partition_ids = [put_df(partition) for partition
                        in np.split(df, num_cores)]
   
       print(partition_ids)
       df = get_dfs(partition_ids)
       print(df)
       # Kill the object store.
       p.kill()
   
   
   
   
   
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to