rustyconover opened a new issue, #9837:
URL: https://github.com/apache/arrow-rs/issues/9837
**Is your feature request related to a problem or challenge? Please describe
what you are trying to do.**
When the entire IPC stream is already in memory (e.g. an HTTP body, a
memory-mapped file, a shared-memory segment, or a `bytes::Bytes` from object
storage), `StreamReader<R: Read>` is the natural fit ergonomically — but it's
significantly slower than necessary because it copies every message body.
Concrete numbers from a microbench (`Float64Array`, single column, in-memory
IPC stream, `arrow-rs` from current `main`):
```
rows phase med_ms GB/s
1000 serialize 0.003 2.59
1000 deser_read 0.001 6.40 StreamReader<Cursor<&[u8]>>
1000 deser_buffer 0.001 10.10 StreamDecoder + Buffer
100000 serialize 0.102 7.86
100000 deser_read 0.057 14.13 StreamReader<Cursor<&[u8]>>
100000 deser_buffer 0.001 960.38 StreamDecoder + Buffer
1000000 serialize 2.125 3.77
1000000 deser_read 1.070 7.48 StreamReader<Cursor<&[u8]>>
1000000 deser_buffer 0.001 8000.00 StreamDecoder + Buffer
10000000 serialize 22.639 3.53
10000000 deser_read 33.357 2.40 StreamReader<Cursor<&[u8]>>
10000000 deser_buffer 0.004 18462.96 StreamDecoder + Buffer <--
zero-copy
```
`StreamDecoder` with a single owned `Buffer` is already zero-copy — every
column's `Buffer` is constructed via `buffer.slice_with_length(...)`, which
just bumps the `Arc<Bytes>` refcount. So the underlying machinery exists;
what's missing is a *pull-based* `Iterator<Item = RecordBatch>` constructor
that takes a buffer directly, mirroring how `StreamReader<R>` is normally used.
This shows up clearly when comparing to pyarrow over the same payload:
pyarrow's `ipc.open_stream(buf).read_next_batch()` constructs Array views over
the input buffer with no body copy and lands at \"essentially free\"
deserialize times (microbenched at >7000 GB/s for the same shape, i.e.
method-call cost). Today, in arrow-rs, you can match that with the push-based
`StreamDecoder`, but most callers reach for
`StreamReader::try_new(Cursor::new(bytes), None)` — and pay an unnecessary
memcpy per message.
Related performance reports:
- #6670 (Arrow Flight Performance: Rust vs Python (C++)) — same pattern,
observed in Flight.
- #9388 (`split_batch_for_grpc_response` over-splits) — confirms that on the
deserialize side, all column buffers slice one shared `Bytes`; the slicing is
already zero-copy.
- #9307 (LZ4 frame decode temp copy) — sibling issue on the compressed path.
**Describe the solution you'd like**
A new pull-based reader type that consumes an owned [`Buffer`] (or anything
cheaply convertible to one) and yields `Result<RecordBatch>` via `Iterator`,
internally driving `StreamDecoder` so each batch's column buffers alias the
input.
API sketch (final naming up for discussion):
```rust
/// Pull-based, zero-copy IPC stream reader over an owned in-memory
[`Buffer`].
///
/// Where [`StreamReader`] reads from an arbitrary `R: Read` and copies
/// each message body, this reader takes a fully-materialized buffer
/// and yields [`RecordBatch`]es whose column [`Buffer`]s alias the
/// input — no per-message memcpy.
///
/// For a streaming push-based interface (chunked input, e.g. from
/// object storage), see [`StreamDecoder`].
pub struct BufferStreamReader { ... }
impl BufferStreamReader {
pub fn try_new(buffer: Buffer) -> Result<Self, ArrowError>;
pub fn try_new_from_bytes(bytes: bytes::Bytes) -> Result<Self,
ArrowError> {
Self::try_new(Buffer::from_bytes(bytes))
}
pub fn schema(&self) -> &SchemaRef;
}
impl Iterator for BufferStreamReader {
type Item = Result<RecordBatch, ArrowError>;
fn next(&mut self) -> Option<Self::Item>;
}
```
The implementation is essentially a thin wrapper around `StreamDecoder` +
the input `Buffer`, eagerly draining schema (and any leading dictionary
messages) at construction so `schema()` is cheap and infallible afterwards.
**Describe alternatives you've considered**
1. **Tell users to use `StreamDecoder`.** Already possible, but the
push-based API is awkward when you have a single buffer; you have to write the
loop yourself, handle the `Ok(None)` cases for dictionaries vs. EOS, etc. This
is exactly the kind of thing `StreamReader` does for the `Read` case — there
should be a parallel for the buffer case.
2. **Add `StreamReader::try_new_from_buffer(Buffer)` returning
`StreamReader<Cursor<Buffer>>`.** Keeps one type, but requires reworking
`StreamReader` to use `StreamDecoder` internally instead of `MessageReader<R>`
so the zero-copy path is taken — bigger blast radius, and `StreamReader<R:
Read>` has to keep working for streaming sources, so the internals would split
anyway.
3. **Make `StreamReader<R: Read>` zero-copy in the special case of
`Cursor<Buffer>`.** Would require generic specialization or a runtime type
check; both worse than just adding a sibling type.
A new sibling type with a focused purpose (\"I have all the bytes, just give
me batches fast\") is the cleanest API.
**Additional context**
I'd like to send a PR. Implementation lives almost entirely on top of
`StreamDecoder` (~80 LoC + tests), and the existing `StreamReader<R>` is
untouched.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]