If you think the problem is the context manager, then don't use it. As with file handles, the expectation is that exiting a context manager closes the thing it's managing. But even if you don't close it, the positions should be relevant when you open it again.
Also, looking at the example again I see you were using tell() which I somehow missed; but, you're doing it outside of the context manager. If you move it inside the context manager then you don't have to worry about it being closed when you call tell(). As far as your use of `file_sink` in your new code, I don't see it opened nor used with a context manager. My eyes may be failing me again but it seems like it would have the same issue as using tell() with a closed BufferedOutputStream? Sent from Proton Mail for iOS On Thu, Oct 10, 2024 at 08:33, Robert McLeod < [email protected]> wrote: Aldrin, I think my trouble is coming from the fact that a `CompressedOutputStream` closes the `BufferedOutputStream` when it exits its context manager. I don't know if that is required or if it is simply an oversight because no one else has tried to fetch individual objects from a compressed stream before. I have gone through and made a new example code that uses a 'meta-sink' (`file_sink` in the code below) and then writes one `BufferedOutputStream` for each object and then re-write that compressed data into the meta-sink. This is a bit wasteful to have that extra copy operation. I'll take a quick look at the source for Arrow to see if there's feasibly a better solution. import pyarrow as pa import pyarrow. ipc as ipc import numpy as np import numpy. testing as npt import pandas. testing as pdt data = { 'column1': [ 1, 2, 3], 'column2': [ 'a', 'b', 'c'] } data2 = { 'column1': [ 7, 8, 9], 'column2': [ 'd', 'e', 'f'] } table = pa.table( data) table2 = pa.table( data2) tensor = pa.Tensor.from_numpy( np. array([ 5, 6, 7])) tensor2 = pa.Tensor.from_numpy( np. array([ 4, 7, 4, 6, 7, 8])) items = [ table, table2, tensor, tensor2] n_items = len( items) sinks = [ None] * n_items for I, item in enumerate( items): sink = sinks[ I] = pa.BufferOutputStream() with pa.CompressedOutputStream( sink, 'zstd') as compressed_sink: if isinstance( item, pa.Table): with ipc. RecordBatchStreamWriter( compressed_sink, item.schema) as table_writer: table_writer.write_table( item) elif isinstance( item, pa.Tensor): ipc.write_tensor( item, compressed_sink) else: raise NotImplementedError( f 'Unhandled type: { type( item) } ') # compressed_sink.flush() # Unnecessary # Write each chunk from sinks into a file offsets = [ None] * n_items sizes = [ None] * n_items file_sink = pa.BufferOutputStream() for I, sink in enumerate( sinks): offsets[ I] = file_sink.tell() file_sink.write( sink.getvalue()) sizes[ I] = file_sink.tell() - offsets[ I] # Convert to bytes to finalize the file_sink buffer chunk = file_sink.getvalue() # How can we go about reading only one object in the source buffer? import random for I in random. sample( tuple( range( len( items))), len( items)): print( f 'Loading item # { I } ') item = items[ I] # Here we would have to implement some table of contents that gives us # a map of type, offset, and size. block = chunk[ offsets[ I]: offsets[ I] + sizes[ I]] source = pa.BufferReader( block) with pa.CompressedInputStream( block, 'zstd') as decompressed_source: if isinstance( item, pa.Table): with ipc. RecordBatchStreamReader( decompressed_source) as table_reader: read_item = table_reader.read_all() pdt. assert_frame_equal( item.to_pandas(), read_item.to_pandas()) elif isinstance( item, pa.Tensor): read_item = ipc.read_tensor( decompressed_source) npt. assert_almost_equal( item.to_numpy(), read_item.to_numpy()) else: raise NotImplementedError( f 'Unhandled type: { type( item) } ') On Wed, Oct 9, 2024 at 4:17 PM Aldrin < [email protected]> wrote: I could be wrong, but I think zstd naively (or by default) requires the whole stream to be decompressed before you can access any data within it (it is not "splittable" and does not support random access). There are ways to provide this capability by essentially compressing in segments. The best concise description of this that I've found is at [1]. Either way, I also think that accessing a compressed stream using purely data parameters (e.g. tensor_offset and tensor_size) doesn't accommodate any structural information of the stream (e.g. schema, metadata, and various data boundaries). So, I think aside from the choice of compression, the approach you mentioned may be too naive anyways. I don't know what a good way to do this is, but you'd either have to dig into how the arrow library writes data to the compressed stream (I think around [2]) to get some inspiration, or you can try looking at possible solutions involving other compression algorithms (bzip2 should be splittable, and snappy may or may not be?). Something you can also try doing is writing in batches rather than the whole table, so that if nothing else you can "seek" to where a batch was written [3]. If you want to do your own book-keeping, you can potentially note positions in the stream when writing to it [4] and you can try and read from that position later. Doing any of these things, or anything at this level, might require you to look into the c++ code being called from python to figure out (in which case [2] will be useful), but I'm not sure how far down that rabbit hole you want to go. [1]: https://github.com/circulosmeos/gztool?tab=readme-ov-file#background [2]: https://github.com/apache/arrow/blob/main/cpp/src/arrow/io/compressed.cc#L42 [3]: https://arrow.apache.org/docs/python/generated/pyarrow.ipc.RecordBatchStreamWriter.html#pyarrow.ipc.RecordBatchStreamWriter.write_batch [4]: https://arrow.apache.org/docs/python/generated/pyarrow.CompressedOutputStream.html#pyarrow.CompressedOutputStream.tell # ------------------------------ # Aldrin https://github.com/drin/ https://gitlab.com/octalene https://keybase.io/octalene On Wednesday, October 9th, 2024 at 15:12, Robert McLeod < [email protected]> wrote: I am trying to write multiple tables/tensors into a single stream/file. Writing is straightforward, and I can read everything back out, but every effort I have tried to pick an individual element out of a compressed stream has failed. E.g. I would like to only extract Tensor #1 from the stream. I provide some sample code: import pyarrow as pa import pyarrow. ipc as ipc import numpy as np data = { 'column1': [ 1, 2, 3], 'column2': [ 'a', 'b', 'c'] } data2 = { 'column1': [ 7, 8, 9], 'column2': [ 'd', 'e', 'f'] } table = pa.table( data) table2 = pa.table( data2) tensor = pa.Tensor.from_numpy( np. ndarray([ 5, 6, 7])) tensor2 = pa.Tensor.from_numpy( np. ndarray([ 4, 7, 4, 6, 7, 8])) sink = pa.BufferOutputStream() df_offset = sink.tell() # We are not using the context manager here because CompressedOutputStream closes # the the sink stream when it exits. compressed_sink = pa.CompressedOutputStream( sink, 'zstd') with ipc. RecordBatchStreamWriter( compressed_sink, table.schema) as table_writer: table_writer.write_table( table) compressed_sink.flush() df_size = sink.tell() - df_offset df2_offset = sink.tell() with ipc. RecordBatchStreamWriter( compressed_sink, table2.schema) as table_writer: table_writer.write_table( table2) compressed_sink.flush() df2_size = sink.tell() - df2_offset # Write our tensors tensor_offset = sink.tell() ipc.write_tensor( tensor, compressed_sink) compressed_sink.flush() tensor_size = sink.tell() - tensor_offset tensor_offset2 = sink.tell() ipc.write_tensor( tensor2, compressed_sink) compressed_sink.flush() tensor_size2 = sink.tell() - tensor_offset2 # Convert to bytes to finalize the sink buffer chunk = sink.getvalue() source = pa.BufferReader( chunk) # We can read out every element in the stream without any problems decompressed_source = pa.CompressedInputStream( source, 'zstd') with ipc. RecordBatchStreamReader( decompressed_source) as table_reader: read_table = table_reader.read_all() with ipc. RecordBatchStreamReader( decompressed_source) as table_reader2: read_table2 = table_reader2.read_all() read_tensor = ipc.read_tensor( decompressed_source) read_tensor2 = ipc.read_tensor( decompressed_source) # But, how can we go about reading only one object in the source buffer? block = chunk[ tensor_offset: tensor_offset + tensor_size] decompressed2 = pa.CompressedInputStream( block, 'zstd') random_tensor = ipc.read_tensor( decompressed2) # Traceback (most recent call last): # File "<stdin>", line 1, in <module> # File "pyarrow\ipc.pxi", line 1257, in pyarrow.lib.read_tensor # File "pyarrow\error.pxi", line 154, in pyarrow.lib.pyarrow_internal_check_status # File "pyarrow\error.pxi", line 91, in pyarrow.lib.check_status # OSError: ZSTD decompress failed: Unknown frame descriptor Has anyone else tried this approach and found a workable solution to pull individual objects from a (compressed) stream? -- Robert McLeod [email protected] [email protected] -- Robert McLeod [email protected] [email protected]
signature.asc
Description: OpenPGP digital signature
