Aldrin,
I think my trouble is coming from the fact that a `CompressedOutputStream`
closes the `BufferedOutputStream` when it exits its context manager. I
don't know if that is required or if it is simply an oversight because no
one else has tried to fetch individual objects from a compressed stream
before.
I have gone through and made a new example code that uses a 'meta-sink'
(`file_sink` in the code below) and then writes one `BufferedOutputStream`
for each object and then re-write that compressed data into the meta-sink.
This is a bit wasteful to have that extra copy operation. I'll take a quick
look at the source for Arrow to see if there's feasibly a better solution.
import pyarrow as pa
import pyarrow.ipc as ipc
import numpy as np
import numpy.testing as npt
import pandas.testing as pdt
data = {
'column1': [1, 2, 3],
'column2': ['a', 'b', 'c']
}
data2 = {
'column1': [7, 8, 9],
'column2': ['d', 'e', 'f']
}
table = pa.table(data)
table2 = pa.table(data2)
tensor = pa.Tensor.from_numpy(np.array([5,6,7]))
tensor2 = pa.Tensor.from_numpy(np.array([4,7,4,6,7,8]))
items = [table, table2, tensor, tensor2]
n_items = len(items)
sinks = [None] * n_items
for I, item in enumerate(items):
sink = sinks[I] = pa.BufferOutputStream()
with pa.CompressedOutputStream(sink, 'zstd') as compressed_sink:
if isinstance(item, pa.Table):
with ipc.RecordBatchStreamWriter(compressed_sink, item.schema)
as table_writer:
table_writer.write_table(item)
elif isinstance(item, pa.Tensor):
ipc.write_tensor(item, compressed_sink)
else:
raise NotImplementedError(f'Unhandled type: {type(item)}')
# compressed_sink.flush() # Unnecessary
# Write each chunk from sinks into a file
offsets = [None] * n_items
sizes = [None] * n_items
file_sink = pa.BufferOutputStream()
for I, sink in enumerate(sinks):
offsets[I] = file_sink.tell()
file_sink.write(sink.getvalue())
sizes[I] = file_sink.tell() - offsets[I]
# Convert to bytes to finalize the file_sink buffer
chunk = file_sink.getvalue()
# How can we go about reading only one object in the source buffer?
import random
for I in random.sample(tuple(range(len(items))), len(items)):
print(f'Loading item #{I}')
item = items[I]
# Here we would have to implement some table of contents that gives us
# a map of type, offset, and size.
block = chunk[offsets[I]:offsets[I] + sizes[I]]
source = pa.BufferReader(block)
with pa.CompressedInputStream(block, 'zstd') as decompressed_source:
if isinstance(item, pa.Table):
with ipc.RecordBatchStreamReader(decompressed_source) as
table_reader:
read_item = table_reader.read_all()
pdt.assert_frame_equal(item.to_pandas(), read_item.to_pandas())
elif isinstance(item, pa.Tensor):
read_item = ipc.read_tensor(decompressed_source)
npt.assert_almost_equal(item.to_numpy(), read_item.to_numpy())
else:
raise NotImplementedError(f'Unhandled type: {type(item)}')
On Wed, Oct 9, 2024 at 4:17 PM Aldrin <[email protected]> wrote:
> I could be wrong, but I think zstd naively (or by default) requires the
> whole stream to be decompressed before you can access any data within it
> (it is not "splittable" and does not support random access). There are ways
> to provide this capability by essentially compressing in segments. The best
> concise description of this that I've found is at [1].
>
> Either way, I also think that accessing a compressed stream using purely
> data parameters (e.g. tensor_offset and tensor_size) doesn't accommodate
> any structural information of the stream (e.g. schema, metadata, and
> various data boundaries). So, I think aside from the choice of compression,
> the approach you mentioned may be too naive anyways.
>
> I don't know what a good way to do this is, but you'd either have to dig
> into how the arrow library writes data to the compressed stream (I think
> around [2]) to get some inspiration, or you can try looking at possible
> solutions involving other compression algorithms (bzip2 should be
> splittable, and snappy may or may not be?).
>
> Something you can also try doing is writing in batches rather than the
> whole table, so that if nothing else you can "seek" to where a batch was
> written [3]. If you want to do your own book-keeping, you can potentially
> note positions in the stream when writing to it [4] and you can try and
> read from that position later.
>
> Doing any of these things, or anything at this level, might require you to
> look into the c++ code being called from python to figure out (in which
> case [2] will be useful), but I'm not sure how far down that rabbit hole
> you want to go.
>
>
> [1]: https://github.com/circulosmeos/gztool?tab=readme-ov-file#background
> [2]:
> https://github.com/apache/arrow/blob/main/cpp/src/arrow/io/compressed.cc#L42
> [3]:
> https://arrow.apache.org/docs/python/generated/pyarrow.ipc.RecordBatchStreamWriter.html#pyarrow.ipc.RecordBatchStreamWriter.write_batch
> [4]:
> https://arrow.apache.org/docs/python/generated/pyarrow.CompressedOutputStream.html#pyarrow.CompressedOutputStream.tell
>
>
> # ------------------------------
> # Aldrin
>
> https://github.com/drin/
> https://gitlab.com/octalene
> https://keybase.io/octalene
>
> On Wednesday, October 9th, 2024 at 15:12, Robert McLeod <
> [email protected]> wrote:
>
> I am trying to write multiple tables/tensors into a single stream/file.
> Writing is straightforward, and I can read everything back out, but every
> effort I have tried to pick an individual element out of a compressed
> stream has failed. E.g. I would like to only extract Tensor #1 from the
> stream. I provide some sample code:
>
> import pyarrow as pa
> import pyarrow.ipc as ipc
> import numpy as np
>
> data = {
> 'column1': [1, 2, 3],
> 'column2': ['a', 'b', 'c']
> }
> data2 = {
> 'column1': [7, 8, 9],
> 'column2': ['d', 'e', 'f']
> }
>
> table = pa.table(data)
> table2 = pa.table(data2)
>
> tensor = pa.Tensor.from_numpy(np.ndarray([5,6,7]))
> tensor2 = pa.Tensor.from_numpy(np.ndarray([4,7,4,6,7,8]))
>
> sink = pa.BufferOutputStream()
>
> df_offset = sink.tell()
> # We are not using the context manager here because CompressedOutputStream
> closes
> # the the sink stream when it exits.
> compressed_sink = pa.CompressedOutputStream(sink, 'zstd')
> with ipc.RecordBatchStreamWriter(compressed_sink, table.schema) as
> table_writer:
> table_writer.write_table(table)
> compressed_sink.flush()
> df_size = sink.tell() - df_offset
>
> df2_offset = sink.tell()
> with ipc.RecordBatchStreamWriter(compressed_sink, table2.schema) as
> table_writer:
> table_writer.write_table(table2)
> compressed_sink.flush()
> df2_size = sink.tell() - df2_offset
>
> # Write our tensors
> tensor_offset = sink.tell()
> ipc.write_tensor(tensor, compressed_sink)
> compressed_sink.flush()
> tensor_size = sink.tell() - tensor_offset
>
> tensor_offset2 = sink.tell()
> ipc.write_tensor(tensor2, compressed_sink)
> compressed_sink.flush()
> tensor_size2 = sink.tell() - tensor_offset2
>
> # Convert to bytes to finalize the sink buffer
> chunk = sink.getvalue()
>
> source = pa.BufferReader(chunk)
> # We can read out every element in the stream without any problems
> decompressed_source = pa.CompressedInputStream(source, 'zstd')
> with ipc.RecordBatchStreamReader(decompressed_source) as table_reader:
> read_table = table_reader.read_all()
> with ipc.RecordBatchStreamReader(decompressed_source) as table_reader2:
> read_table2 = table_reader2.read_all()
>
> read_tensor = ipc.read_tensor(decompressed_source)
> read_tensor2 = ipc.read_tensor(decompressed_source)
>
> # But, how can we go about reading only one object in the source buffer?
> block = chunk[tensor_offset:tensor_offset + tensor_size]
> decompressed2 = pa.CompressedInputStream(block, 'zstd')
> random_tensor = ipc.read_tensor(decompressed2)
> # Traceback (most recent call last):
> # File "<stdin>", line 1, in <module>
> # File "pyarrow\ipc.pxi", line 1257, in pyarrow.lib.read_tensor
> # File "pyarrow\error.pxi", line 154, in
> pyarrow.lib.pyarrow_internal_check_status
> # File "pyarrow\error.pxi", line 91, in pyarrow.lib.check_status
> # OSError: ZSTD decompress failed: Unknown frame descriptor
>
> Has anyone else tried this approach and found a workable solution to pull
> individual objects from a (compressed) stream?
>
> --
> Robert McLeod
> [email protected]
> [email protected]
>
>
>
--
Robert McLeod
[email protected]
[email protected]