rustyconover opened a new issue, #9837:
URL: https://github.com/apache/arrow-rs/issues/9837

   **Is your feature request related to a problem or challenge? Please describe 
what you are trying to do.**
   
   When the entire IPC stream is already in memory (e.g. an HTTP body, a 
memory-mapped file, a shared-memory segment, or a `bytes::Bytes` from object 
storage), `StreamReader<R: Read>` is the natural fit ergonomically — but it's 
significantly slower than necessary because it copies every message body.
   
   Concrete numbers from a microbench (`Float64Array`, single column, in-memory 
IPC stream, `arrow-rs` from current `main`):
   
   ```
          rows           phase   med_ms     GB/s
          1000      serialize    0.003     2.59
          1000     deser_read    0.001     6.40    StreamReader<Cursor<&[u8]>>
          1000   deser_buffer    0.001    10.10    StreamDecoder + Buffer
        100000      serialize    0.102     7.86
        100000     deser_read    0.057    14.13    StreamReader<Cursor<&[u8]>>
        100000   deser_buffer    0.001   960.38    StreamDecoder + Buffer
       1000000      serialize    2.125     3.77
       1000000     deser_read    1.070     7.48    StreamReader<Cursor<&[u8]>>
       1000000   deser_buffer    0.001  8000.00    StreamDecoder + Buffer
      10000000      serialize   22.639     3.53
      10000000     deser_read   33.357     2.40    StreamReader<Cursor<&[u8]>>
      10000000   deser_buffer    0.004 18462.96    StreamDecoder + Buffer  <-- 
zero-copy
   ```
   
   `StreamDecoder` with a single owned `Buffer` is already zero-copy — every 
column's `Buffer` is constructed via `buffer.slice_with_length(...)`, which 
just bumps the `Arc<Bytes>` refcount. So the underlying machinery exists; 
what's missing is a *pull-based* `Iterator<Item = RecordBatch>` constructor 
that takes a buffer directly, mirroring how `StreamReader<R>` is normally used.
   
   This shows up clearly when comparing to pyarrow over the same payload: 
pyarrow's `ipc.open_stream(buf).read_next_batch()` constructs Array views over 
the input buffer with no body copy and lands at \"essentially free\" 
deserialize times (microbenched at >7000 GB/s for the same shape, i.e. 
method-call cost). Today, in arrow-rs, you can match that with the push-based 
`StreamDecoder`, but most callers reach for 
`StreamReader::try_new(Cursor::new(bytes), None)` — and pay an unnecessary 
memcpy per message.
   
   Related performance reports:
   - #6670 (Arrow Flight Performance: Rust vs Python (C++)) — same pattern, 
observed in Flight.
   - #9388 (`split_batch_for_grpc_response` over-splits) — confirms that on the 
deserialize side, all column buffers slice one shared `Bytes`; the slicing is 
already zero-copy.
   - #9307 (LZ4 frame decode temp copy) — sibling issue on the compressed path.
   
   **Describe the solution you'd like**
   
   A new pull-based reader type that consumes an owned [`Buffer`] (or anything 
cheaply convertible to one) and yields `Result<RecordBatch>` via `Iterator`, 
internally driving `StreamDecoder` so each batch's column buffers alias the 
input.
   
   API sketch (final naming up for discussion):
   
   ```rust
   /// Pull-based, zero-copy IPC stream reader over an owned in-memory 
[`Buffer`].
   ///
   /// Where [`StreamReader`] reads from an arbitrary `R: Read` and copies
   /// each message body, this reader takes a fully-materialized buffer
   /// and yields [`RecordBatch`]es whose column [`Buffer`]s alias the
   /// input — no per-message memcpy.
   ///
   /// For a streaming push-based interface (chunked input, e.g. from
   /// object storage), see [`StreamDecoder`].
   pub struct BufferStreamReader { ... }
   
   impl BufferStreamReader {
       pub fn try_new(buffer: Buffer) -> Result<Self, ArrowError>;
       pub fn try_new_from_bytes(bytes: bytes::Bytes) -> Result<Self, 
ArrowError> {
           Self::try_new(Buffer::from_bytes(bytes))
       }
       pub fn schema(&self) -> &SchemaRef;
   }
   
   impl Iterator for BufferStreamReader {
       type Item = Result<RecordBatch, ArrowError>;
       fn next(&mut self) -> Option<Self::Item>;
   }
   ```
   
   The implementation is essentially a thin wrapper around `StreamDecoder` + 
the input `Buffer`, eagerly draining schema (and any leading dictionary 
messages) at construction so `schema()` is cheap and infallible afterwards.
   
   **Describe alternatives you've considered**
   
   1. **Tell users to use `StreamDecoder`.** Already possible, but the 
push-based API is awkward when you have a single buffer; you have to write the 
loop yourself, handle the `Ok(None)` cases for dictionaries vs. EOS, etc. This 
is exactly the kind of thing `StreamReader` does for the `Read` case — there 
should be a parallel for the buffer case.
   
   2. **Add `StreamReader::try_new_from_buffer(Buffer)` returning 
`StreamReader<Cursor<Buffer>>`.** Keeps one type, but requires reworking 
`StreamReader` to use `StreamDecoder` internally instead of `MessageReader<R>` 
so the zero-copy path is taken — bigger blast radius, and `StreamReader<R: 
Read>` has to keep working for streaming sources, so the internals would split 
anyway.
   
   3. **Make `StreamReader<R: Read>` zero-copy in the special case of 
`Cursor<Buffer>`.** Would require generic specialization or a runtime type 
check; both worse than just adding a sibling type.
   
   A new sibling type with a focused purpose (\"I have all the bytes, just give 
me batches fast\") is the cleanest API.
   
   **Additional context**
   
   I'd like to send a PR. Implementation lives almost entirely on top of 
`StreamDecoder` (~80 LoC + tests), and the existing `StreamReader<R>` is 
untouched.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to