If you think the problem is the context manager, then don't use it. As with 
file handles, the expectation is that exiting a context manager closes the 
thing it's managing.      
  But even if you don't close it, the positions should be relevant when you 
open it again.


 


  Also, looking at the example again I see you were using tell() which I 
somehow missed; but, you're doing it outside of the context manager. If you 
move it inside the context manager then you don't have to worry about it being 
closed when you call tell(). 
As far as your use of `file_sink` in your new code, I don't see it opened nor 
used with a context manager. My eyes may be failing me again but it seems like 
it would have the same issue as using tell() with a closed 
BufferedOutputStream?          Sent from   Proton Mail for iOS           On 
Thu, Oct 10, 2024 at 08:33, Robert McLeod < [email protected]> wrote:  
    Aldrin,                 I think my trouble is coming from the fact that a 
`CompressedOutputStream` closes the `BufferedOutputStream` when it exits its 
context manager. I don't know if that is required or if it is simply an 
oversight because no one else has tried to fetch individual objects from a 
compressed stream before.                    I have gone through and made a new 
example code that uses a 'meta-sink' (`file_sink` in the code below) and then 
writes one `BufferedOutputStream` for each object and then re-write that 
compressed data into the meta-sink. This is a bit wasteful to have that extra 
copy operation. I'll take a quick look at the source for Arrow to see if 
there's feasibly a better solution.                               import      
pyarrow      as      pa                import      pyarrow.      ipc      as    
  ipc                import      numpy      as      np                import    
  numpy.      testing      as      npt                import      pandas.      
testing      as      pdt                     data      = {                      
   'column1': [      1,      2,      3],                         'column2': [   
   'a',      'b',      'c']                }                data2      = {      
                   'column1': [      7,      8,      9],                        
 'column2': [      'd',      'e',      'f']                }                    
 table      =      pa.table(      data)                table2      =      
pa.table(      data2)                tensor      =      pa.Tensor.from_numpy(   
   np.      array([      5,      6,      7]))                tensor2      =     
 pa.Tensor.from_numpy(      np.      array([      4,      7,      4,      6,    
  7,      8]))                     items      = [      table,      table2,      
tensor,      tensor2]                n_items      =      len(      items)       
         sinks      = [      None]      *      n_items                for      
I,      item      in      enumerate(      items):                         sink  
    =      sinks[      I]      =      pa.BufferOutputStream()                   
      with      pa.CompressedOutputStream(      sink,      'zstd')      as      
compressed_sink:                             if      isinstance(      item,     
 pa.Table):                                 with      ipc.      
RecordBatchStreamWriter(      compressed_sink,      item.schema)      as      
table_writer:                                     table_writer.write_table(     
 item)                             elif      isinstance(      item,      
pa.Tensor):                                 ipc.write_tensor(      item,      
compressed_sink)                             else:                              
   raise      NotImplementedError(      f      'Unhandled type:       {      
type(      item)      }      ')                             # 
compressed_sink.flush() # Unnecessary                     # Write each chunk 
from sinks into a file                offsets      = [      None]      *      
n_items                sizes      = [      None]      *      n_items            
    file_sink      =      pa.BufferOutputStream()                for      I,    
  sink      in      enumerate(      sinks):                         offsets[    
  I]      =      file_sink.tell()                         file_sink.write(      
sink.getvalue())                         sizes[      I]      =      
file_sink.tell()      -      offsets[      I]                     # Convert to 
bytes to finalize the file_sink buffer                chunk      =      
file_sink.getvalue()                     # How can we go about reading only one 
object in the source buffer?                import      random                
for      I      in      random.      sample(      tuple(      range(      len(  
    items))),      len(      items)):                         print(      f     
 'Loading item #      {      I      }      ')                         item      
=      items[      I]                         # Here we would have to implement 
some table of contents that gives us                          # a map of type, 
offset, and size.                         block      =      chunk[      
offsets[      I]:      offsets[      I]      +      sizes[      I]]             
            source      =      pa.BufferReader(      block)                     
    with      pa.CompressedInputStream(      block,      'zstd')      as      
decompressed_source:                             if      isinstance(      item, 
     pa.Table):                                 with      ipc.      
RecordBatchStreamReader(      decompressed_source)      as      table_reader:   
                                  read_item      =      table_reader.read_all() 
                                pdt.      assert_frame_equal(      
item.to_pandas(),      read_item.to_pandas())                             elif  
    isinstance(      item,      pa.Tensor):                                 
read_item      =      ipc.read_tensor(      decompressed_source)                
                 npt.      assert_almost_equal(      item.to_numpy(),      
read_item.to_numpy())                             else:                         
        raise      NotImplementedError(      f      'Unhandled type:       {    
  type(      item)      }      ')                                   On Wed, Oct 
9, 2024 at 4:17 PM Aldrin <    [email protected]> wrote:                 
  I could be wrong, but I think zstd naively (or by default) requires the whole 
stream to be decompressed before you can access any data within it (it is not 
"splittable" and does not support random access). There are ways to provide 
this capability by essentially compressing in segments. The best concise 
description of this that I've found is at [1].                          Either 
way, I also think that accessing a compressed stream using purely data 
parameters (e.g. tensor_offset and tensor_size) doesn't accommodate any 
structural information of the stream (e.g. schema, metadata, and various data 
boundaries). So, I think aside from the choice of compression, the approach you 
mentioned may be too naive anyways.                          I don't know what 
a good way to do this is, but you'd either have to dig into how the arrow 
library writes data to the compressed stream (I think around [2]) to get some 
inspiration, or you can try looking at possible solutions involving other 
compression algorithms (bzip2 should be splittable, and snappy may or may not 
be?).                          Something you can also try doing is writing in 
batches rather than the whole table, so that if nothing else you can "seek" to 
where a batch was written [3]. If you want to do your own book-keeping, you can 
potentially note positions in the stream when writing to it [4] and you can try 
and read from that position later.                          Doing any of these 
things, or anything at this level, might require you to look into the c++ code 
being called from python to figure out (in which case [2] will be useful), but 
I'm not sure how far down that rabbit hole you want to go.                      
                      [1]:     
https://github.com/circulosmeos/gztool?tab=readme-ov-file#background 
                 [2]:     
https://github.com/apache/arrow/blob/main/cpp/src/arrow/io/compressed.cc#L42    
              [3]:     
https://arrow.apache.org/docs/python/generated/pyarrow.ipc.RecordBatchStreamWriter.html#pyarrow.ipc.RecordBatchStreamWriter.write_batch
                  [4]:     
https://arrow.apache.org/docs/python/generated/pyarrow.CompressedOutputStream.html#pyarrow.CompressedOutputStream.tell
                                                               # 
------------------------------
                   # Aldrin
                                      https://github.com/drin/                  
        https://gitlab.com/octalene                          
https://keybase.io/octalene
                                               On Wednesday, October 9th, 2024 
at 15:12, Robert McLeod <     [email protected]> wrote:                
               I am trying to write multiple tables/tensors into a single 
stream/file. Writing is straightforward, and I can read everything back out, 
but every effort I have tried to pick an individual element out of a compressed 
stream has failed. E.g. I would like to only extract Tensor #1 from the stream. 
I provide some sample code:                                                     
          import          pyarrow          as          pa                       
     import          pyarrow.          ipc          as          ipc             
               import          numpy          as          np                    
                 data          = {                            'column1': [      
    1,          2,          3],                            'column2': [         
 'a',          'b',          'c']                            }                  
          data2          = {                            'column1': [          
7,          8,          9],                            'column2': [          
'd',          'e',          'f']                            }                   
                                     table          =          pa.table(        
  data)                            table2          =          pa.table(         
 data2)                                                        tensor          
=          pa.Tensor.from_numpy(          np.          ndarray([          5,    
      6,          7]))                            tensor2          =          
pa.Tensor.from_numpy(          np.          ndarray([          4,          7,   
       4,          6,          7,          8]))                                 
    sink          =          pa.BufferOutputStream()                            
         df_offset          =          sink.tell()                            # 
We are not using the context manager here because CompressedOutputStream closes 
                            # the the sink stream when it exits.                
            compressed_sink          =          pa.CompressedOutputStream(      
    sink,          'zstd')                            with          ipc.        
  RecordBatchStreamWriter(          compressed_sink,          table.schema)     
     as          table_writer:                            
table_writer.write_table(          table)                            
compressed_sink.flush()                            df_size          =          
sink.tell()          -          df_offset                                     
df2_offset          =          sink.tell()                            with      
    ipc.          RecordBatchStreamWriter(          compressed_sink,          
table2.schema)          as          table_writer:                            
table_writer.write_table(          table2)                            
compressed_sink.flush()                            df2_size          =          
sink.tell()          -          df2_offset                                     
# Write our tensors                            tensor_offset          =         
 sink.tell()                            ipc.write_tensor(          tensor,      
    compressed_sink)                            compressed_sink.flush()         
                   tensor_size          =          sink.tell()          -       
   tensor_offset                                     tensor_offset2          =  
        sink.tell()                            ipc.write_tensor(          
tensor2,          compressed_sink)                            
compressed_sink.flush()                            tensor_size2          =      
    sink.tell()          -          tensor_offset2                              
       # Convert to bytes to finalize the sink buffer                           
 chunk          =          sink.getvalue()                                     
source          =          pa.BufferReader(          chunk)                     
       # We can read out every element in the stream without any problems       
                     decompressed_source          =          
pa.CompressedInputStream(          source,          'zstd')                     
       with          ipc.          RecordBatchStreamReader(          
decompressed_source)          as          table_reader:                         
   read_table          =          table_reader.read_all()                       
     with          ipc.          RecordBatchStreamReader(          
decompressed_source)          as          table_reader2:                        
    read_table2          =          table_reader2.read_all()                    
                 read_tensor          =          ipc.read_tensor(          
decompressed_source)                            read_tensor2          =         
 ipc.read_tensor(          decompressed_source)                                 
    # But, how can we go about reading only one object in the source buffer?    
                        block          =          chunk[          
tensor_offset:          tensor_offset          +          tensor_size]          
                  decompressed2          =          pa.CompressedInputStream(   
       block,          'zstd')                            random_tensor         
 =          ipc.read_tensor(          decompressed2)                            
# Traceback (most recent call last):                            # File 
"<stdin>", line 1, in <module>                            # File 
"pyarrow\ipc.pxi", line 1257, in pyarrow.lib.read_tensor                        
    # File "pyarrow\error.pxi", line 154, in 
pyarrow.lib.pyarrow_internal_check_status                            # File 
"pyarrow\error.pxi", line 91, in pyarrow.lib.check_status                       
     # OSError: ZSTD decompress failed: Unknown frame descriptor                
                                                         Has anyone else tried 
this approach and found a workable solution to pull individual objects from a 
(compressed) stream?                                 --                         
                                                                                
                                                                                
            Robert McLeod                                                       
   [email protected]                                                         
 [email protected]                                             
                                                                                
                                                                                
                                                          --                    
                                                                                
                      Robert McLeod                                           
[email protected]                                           
[email protected]

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to