This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 301eb26bb9 Reduce copies in Arrow IPC writer (#10044)
301eb26bb9 is described below
commit 301eb26bb92362e531bd6c39980292ebcae8e8aa
Author: RIchard Baah <[email protected]>
AuthorDate: Wed Jun 10 14:54:51 2026 -0400
Reduce copies in Arrow IPC writer (#10044)
# Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax.
-->
- Closes #10029.
[A document that provides a bit of
context](https://github.com/user-attachments/files/28477762/Arrow.flight.speed.up.2.pdf)
# Rationale for this change
Compression is the most compute and memory intensive part of the
arrow-ipc encoding pipeline. It runs per buffer, not per record batch.
For a Flight stream of 10 batches with 5 primitive arrays each, that is
100 compression calls minimum, [more for string and struct
arrays](https://arrow.apache.org/docs/format/Columnar.html#compression).
Each of those calls produced an owned compressed Vec that was then
copied a second time into a flat arrow_data accumulator before being
written to the output. For the uncompressed path the situation was the
same: Arc-backed buffer slices that required no compression were still
copied into that accumulator unnecessarily.
Separately, the original **write_message()** function flushed after
every dictionary and every record batch, causing repeated small OS write
calls per batch. ( **for non vector backed writer implementations** )
The goal was to eliminate both problems: stop copying buffers that do
not need to be copied, and stop flushing on every message.
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
# What changes are included in this PR?
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
- Introduced EncodedBuffer, an enum that wraps either a raw Arc-backed
Buffer for the uncompressed path or an owned Vec for the compressed
path, so both can be held in a uniform collection without an extra copy
into a flat accumulator
- Changed write_array_data to push EncodedBuffer segments instead of
copying bytes into arrow_data
- FileWriter and StreamWriter both now call **write_batch_direct()**,
eliminating the flush-per-message behavior and the intermediate copy on
the hot path
# Are these changes tested?
These changes are intended to be completely seamless. I didn't write new
unit test for the code as nothing externally changed. all test still
pass
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
If this PR claims a performance improvement, please include evidence
such as benchmark results.
-->
## benchmarks
[**main** -> `cargo bench --bench ipc_writer -- "StreamWriter/write_10$"
--sample-size 100`]
[**my branch** -> `cargo bench --bench ipc_writer --
"StreamWriter/write_10$" --sample-size 100` ]
<img width="1832" height="982" alt="Image 6-1-26 at 3 19 PM"
src="https://github.com/user-attachments/assets/8e6253a4-8a53-4d03-bdab-d0321edc2561"
/>
[**main** -> `cargo bench --bench ipc_writer -- --sample-size 1000`]
[**my branch** -> `cargo bench --bench ipc_writer -- --sample-size
1000`]
<img width="1944" height="1000" alt="Image 6-1-26 at 3 20 PM"
src="https://github.com/user-attachments/assets/dc8015e8-ed60-487c-aa66-06f5d35499fe"
/>
# Are there any user-facing changes?
no
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
If there are any breaking changes to public APIs, please call them out.
-->
---
arrow-ipc/src/writer.rs | 469 ++++++++++++++++++++++++++++++++++--------------
1 file changed, 335 insertions(+), 134 deletions(-)
diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs
index 46e2dd7739..d05a5dbc37 100644
--- a/arrow-ipc/src/writer.rs
+++ b/arrow-ipc/src/writer.rs
@@ -70,6 +70,69 @@ pub struct IpcWriteOptions {
dictionary_handling: DictionaryHandling,
}
+/// A single buffer segment ready to be written to the output stream.
+///
+/// For the uncompressed path the original Arc-backed [`Buffer`] is stored
+/// directly (zero copy). For the compressed path the compressed bytes are
+/// owned by a scratch `Vec<u8>`.
+enum EncodedBuffer {
+ /// Uncompressed: Arc-backed reference to the original array buffer.
+ Raw(Buffer),
+ /// Compressed: owned scratch bytes produced by the codec.
+ Compressed(Vec<u8>),
+}
+
+impl EncodedBuffer {
+ fn as_slice(&self) -> &[u8] {
+ match self {
+ EncodedBuffer::Raw(b) => b.as_slice(),
+ EncodedBuffer::Compressed(v) => v.as_slice(),
+ }
+ }
+
+ fn len(&self) -> usize {
+ match self {
+ EncodedBuffer::Raw(b) => b.len(),
+ EncodedBuffer::Compressed(v) => v.len(),
+ }
+ }
+}
+/// Destination for per-buffer encoded output produced by [`write_array_data`].
+enum IpcBodySink<'a> {
+ /// Serialize buffer bytes (with padding) into a contiguous byte vec.
+ Write(&'a mut Vec<u8>),
+ /// Accumulate pre-encoded buffer segments for deferred zero-copy
streaming.
+ Collect(&'a mut Vec<EncodedBuffer>),
+}
+impl<'a> IpcBodySink<'a> {
+ /// Writes the encoded buffer to the sink.
+ pub fn write(&mut self, pad_len: usize, buffer: EncodedBuffer) {
+ match self {
+ IpcBodySink::Write(vec) => {
+ vec.extend_from_slice(buffer.as_slice());
+ vec.extend_from_slice(&PADDING[..pad_len]);
+ }
+ IpcBodySink::Collect(vec) => {
+ vec.push(buffer);
+ }
+ }
+ }
+}
+
+/// Per-message sizes produced by [`IpcDataGenerator::write`].
+///
+/// [`FileWriter`] uses these to build the Block index entries required by the
IPC footer for
+/// random-access reads.
+struct IpcWriteMetadata {
+ /// Per-dictionary `(padded_header_len, body_len)` for each dictionary
batch written
+ /// before the record batch.
+ dictionary_block_sizes: Vec<(usize, usize)>,
+ /// Flatbuffer header size including continuation prefix and alignment
padding.
+ padded_header_len: usize,
+ /// Total length of the record-batch body including trailing alignment
padding.
+ body_len: usize,
+}
+
impl IpcWriteOptions {
/// Configures compression when writing IPC files.
///
@@ -474,16 +537,44 @@ impl IpcDataGenerator {
write_options: &IpcWriteOptions,
compression_context: &mut CompressionContext,
) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
+ let encoded_dictionaries = self.encode_all_dicts(
+ batch,
+ dictionary_tracker,
+ write_options,
+ compression_context,
+ )?;
+ let mut arrow_data = Vec::new();
+ let (ipc_message, _, tail_pad) = self.record_batch_to_bytes(
+ batch,
+ write_options,
+ compression_context,
+ &mut IpcBodySink::Write(&mut arrow_data),
+ )?;
+ arrow_data.extend_from_slice(&PADDING[..tail_pad]);
+ Ok((
+ encoded_dictionaries,
+ EncodedData {
+ ipc_message,
+ arrow_data,
+ },
+ ))
+ }
+
+ /// Encode dictionary batches for all columns in `batch`.
+ fn encode_all_dicts(
+ &self,
+ batch: &RecordBatch,
+ dictionary_tracker: &mut DictionaryTracker,
+ write_options: &IpcWriteOptions,
+ compression_context: &mut CompressionContext,
+ ) -> Result<Vec<EncodedData>, ArrowError> {
let schema = batch.schema();
let mut encoded_dictionaries =
Vec::with_capacity(schema.flattened_fields().len());
-
let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
-
for (i, field) in schema.fields().iter().enumerate() {
- let column = batch.column(i);
self.encode_dictionaries(
field,
- column,
+ batch.column(i),
&mut encoded_dictionaries,
dictionary_tracker,
write_options,
@@ -491,10 +582,71 @@ impl IpcDataGenerator {
compression_context,
)?;
}
+ Ok(encoded_dictionaries)
+ }
+
+ /// Write dictionary batches and the record batch directly to `writer`,
skipping the
+ /// intermediate body `Vec<u8>` allocations
+ /// Returns [`IpcWriteMetadata`] with the sizes needed to build footer
blocks.
+ fn write<W: Write>(
+ &self,
+ batch: &RecordBatch,
+ dictionary_tracker: &mut DictionaryTracker,
+ write_options: &IpcWriteOptions,
+ compression_context: &mut CompressionContext,
+ writer: &mut W,
+ ) -> Result<IpcWriteMetadata, ArrowError> {
+ let encoded_dictionaries = self.encode_all_dicts(
+ batch,
+ dictionary_tracker,
+ write_options,
+ compression_context,
+ )?;
+
+ let mut dictionary_block_sizes =
Vec::with_capacity(encoded_dictionaries.len());
+ for dict in encoded_dictionaries {
+ dictionary_block_sizes.push(write_message(&mut *writer, dict,
write_options)?);
+ }
+
+ let capacity = batch
+ .columns()
+ .iter()
+ .map(|a| estimate_encoded_buffer_count(a.data_type()))
+ .sum();
+ let mut encoded_buffers: Vec<EncodedBuffer> =
Vec::with_capacity(capacity);
+ let (ipc_message, body_len, tail_pad) = self.record_batch_to_bytes(
+ batch,
+ write_options,
+ compression_context,
+ &mut IpcBodySink::Collect(&mut encoded_buffers),
+ )?;
+
+ let alignment = write_options.alignment;
+ let a = usize::from(alignment - 1);
+ let prefix_size = if write_options.write_legacy_ipc_format {
+ 4
+ } else {
+ 8
+ };
+ let aligned_size = (ipc_message.len() + prefix_size + a) & !a;
+ write_continuation(
+ &mut *writer,
+ write_options,
+ (aligned_size - prefix_size) as i32,
+ )?;
+ writer.write_all(&ipc_message)?;
+ writer.write_all(&PADDING[..aligned_size - ipc_message.len() -
prefix_size])?;
+ for enc in &encoded_buffers {
+ writer.write_all(enc.as_slice())?;
+ writer.write_all(&PADDING[..pad_to_alignment(alignment,
enc.len())])?;
+ }
+ writer.write_all(&PADDING[..tail_pad])?;
- let encoded_message =
- self.record_batch_to_bytes(batch, write_options,
compression_context)?;
- Ok((encoded_dictionaries, encoded_message))
+ Ok(IpcWriteMetadata {
+ dictionary_block_sizes,
+ padded_header_len: aligned_size,
+ body_len,
+ })
}
/// Encodes a batch to a number of [EncodedData] items (dictionary batches
+ the record batch).
@@ -515,22 +667,23 @@ impl IpcDataGenerator {
)
}
- /// Write a `RecordBatch` into two sets of bytes, one for the header
(crate::Message) and the
- /// other for the batch's data
+ /// Encodes a `RecordBatch` into a flatbuffer IPC message and fills `sink`
with the
+ /// serialised buffer data.
+ ///
+ /// Returns `(ipc_message, body_len, tail_pad)`: the flatbuffer header
bytes, the
+ /// total body length including trailing padding, and the trailing
alignment padding byte count.
fn record_batch_to_bytes(
&self,
batch: &RecordBatch,
write_options: &IpcWriteOptions,
compression_context: &mut CompressionContext,
- ) -> Result<EncodedData, ArrowError> {
+ sink: &mut IpcBodySink<'_>,
+ ) -> Result<(Vec<u8>, usize, usize), ArrowError> {
let mut fbb = FlatBufferBuilder::new();
let mut nodes: Vec<crate::FieldNode> = vec![];
let mut buffers: Vec<crate::Buffer> = vec![];
- let mut arrow_data: Vec<u8> = vec![];
- let mut offset = 0;
- // get the type of compression
let batch_compression_type = write_options.batch_compression_type;
let compression = batch_compression_type.map(|batch_compression_type| {
@@ -543,14 +696,16 @@ impl IpcDataGenerator {
let compression_codec: Option<CompressionCodec> =
batch_compression_type.map(TryInto::try_into).transpose()?;
+ let alignment = write_options.alignment;
let mut variadic_buffer_counts = vec![];
+ let mut offset = 0i64;
for array in batch.columns() {
let array_data = array.to_data();
offset = write_array_data(
&array_data,
&mut buffers,
- &mut arrow_data,
+ sink,
&mut nodes,
offset,
array.len(),
@@ -559,15 +714,12 @@ impl IpcDataGenerator {
compression_context,
write_options,
)?;
-
append_variadic_buffer_counts(&mut variadic_buffer_counts,
&array_data);
}
- // pad the tail of body data
- let len = arrow_data.len();
- let pad_len = pad_to_alignment(write_options.alignment, len);
- arrow_data.extend_from_slice(&PADDING[..pad_len]);
- // write data
+ let tail_pad = pad_to_alignment(alignment, offset as usize);
+ let body_len = offset as usize + tail_pad;
+
let buffers = fbb.create_vector(&buffers);
let nodes = fbb.create_vector(&nodes);
let variadic_buffer = if variadic_buffer_counts.is_empty() {
@@ -584,27 +736,21 @@ impl IpcDataGenerator {
if let Some(c) = compression {
batch_builder.add_compression(c);
}
-
if let Some(v) = variadic_buffer {
batch_builder.add_variadicBufferCounts(v);
}
- let b = batch_builder.finish();
- b.as_union_value()
+ batch_builder.finish().as_union_value()
};
// create an crate::Message
let mut message = crate::MessageBuilder::new(&mut fbb);
message.add_version(write_options.metadata_version);
message.add_header_type(crate::MessageHeader::RecordBatch);
- message.add_bodyLength(arrow_data.len() as i64);
+ message.add_bodyLength(body_len as i64);
message.add_header(root);
let root = message.finish();
fbb.finish(root, None);
- let finished_data = fbb.finished_data();
- Ok(EncodedData {
- ipc_message: finished_data.to_vec(),
- arrow_data,
- })
+ Ok((fbb.finished_data().to_vec(), body_len, tail_pad))
}
/// Write dictionary values into two sets of bytes, one for the header
(crate::Message) and the
@@ -637,10 +783,12 @@ impl IpcDataGenerator {
.map(|batch_compression_type| batch_compression_type.try_into())
.transpose()?;
- write_array_data(
+ let alignment = write_options.alignment;
+ let mut sink = IpcBodySink::Write(&mut arrow_data);
+ let offset = write_array_data(
array_data,
&mut buffers,
- &mut arrow_data,
+ &mut sink,
&mut nodes,
0,
array_data.len(),
@@ -654,9 +802,9 @@ impl IpcDataGenerator {
append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
// pad the tail of body data
- let len = arrow_data.len();
- let pad_len = pad_to_alignment(write_options.alignment, len);
- arrow_data.extend_from_slice(&PADDING[..pad_len]);
+ let tail_pad = pad_to_alignment(alignment, offset as usize);
+ let body_len = offset as usize + tail_pad;
+ arrow_data.extend_from_slice(&PADDING[..tail_pad]);
// write data
let buffers = fbb.create_vector(&buffers);
@@ -693,7 +841,7 @@ impl IpcDataGenerator {
let mut message_builder = crate::MessageBuilder::new(&mut fbb);
message_builder.add_version(write_options.metadata_version);
message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
- message_builder.add_bodyLength(arrow_data.len() as i64);
+ message_builder.add_bodyLength(body_len as i64);
message_builder.add_header(root);
message_builder.finish()
};
@@ -1165,32 +1313,32 @@ impl<W: Write> FileWriter<W> {
));
}
- let (encoded_dictionaries, encoded_message) = self.data_gen.encode(
+ let meta = self.data_gen.write(
batch,
&mut self.dictionary_tracker,
&self.write_options,
&mut self.compression_context,
+ &mut self.writer,
)?;
- for encoded_dictionary in encoded_dictionaries {
- let (meta, data) =
- write_message(&mut self.writer, encoded_dictionary,
&self.write_options)?;
-
- let block = crate::Block::new(self.block_offsets as i64, meta as
i32, data as i64);
+ for (header_len, body_len) in meta.dictionary_block_sizes {
+ let block = crate::Block::new(
+ self.block_offsets as i64,
+ header_len as i32,
+ body_len as i64,
+ );
self.dictionary_blocks.push(block);
- self.block_offsets += meta + data;
+ self.block_offsets += header_len + body_len;
}
- let (meta, data) = write_message(&mut self.writer, encoded_message,
&self.write_options)?;
-
// add a record block for the footer
let block = crate::Block::new(
self.block_offsets as i64,
- meta as i32, // TODO: is this still applicable?
- data as i64,
+ meta.padded_header_len as i32,
+ meta.body_len as i64,
);
self.record_blocks.push(block);
- self.block_offsets += meta + data;
+ self.block_offsets += meta.padded_header_len + meta.body_len;
Ok(())
}
@@ -1440,21 +1588,13 @@ impl<W: Write> StreamWriter<W> {
));
}
- let (encoded_dictionaries, encoded_message) = self
- .data_gen
- .encode(
- batch,
- &mut self.dictionary_tracker,
- &self.write_options,
- &mut self.compression_context,
- )
- .expect("StreamWriter is configured to not error on dictionary
replacement");
-
- for encoded_dictionary in encoded_dictionaries {
- write_message(&mut self.writer, encoded_dictionary,
&self.write_options)?;
- }
-
- write_message(&mut self.writer, encoded_message, &self.write_options)?;
+ self.data_gen.write(
+ batch,
+ &mut self.dictionary_tracker,
+ &self.write_options,
+ &mut self.compression_context,
+ &mut self.writer,
+ )?;
Ok(())
}
@@ -1786,7 +1926,7 @@ fn get_list_view_array_buffers<O: OffsetSizeTrait>(
/// the array's offset and length. This helps reduce the encoded size of sliced
/// arrays
///
-fn get_or_truncate_buffer(array_data: &ArrayData) -> &[u8] {
+fn get_or_truncate_buffer(array_data: &ArrayData) -> Buffer {
let buffer = &array_data.buffers()[0];
let layout = layout(array_data.data_type());
let spec = &layout.buffers[0];
@@ -1796,18 +1936,23 @@ fn get_or_truncate_buffer(array_data: &ArrayData) ->
&[u8] {
if buffer_need_truncate(array_data.offset(), buffer, spec, min_length) {
let byte_offset = array_data.offset() * byte_width;
let buffer_length = min(min_length, buffer.len() - byte_offset);
- &buffer.as_slice()[byte_offset..(byte_offset + buffer_length)]
+ buffer.slice_with_length(byte_offset, buffer_length)
} else {
- buffer.as_slice()
+ buffer.clone()
}
}
-/// Write array data to a vector of bytes
+/// Recursively encodes `array_data` into its IPC representation.
+///
+/// Output goes to two separate channels:
+/// - `buffers` / `nodes`: IPC metadata (offsets, lengths, null counts) that
will be
+/// serialised into the flatbuffer `RecordBatch` header.
+/// - `sink`: the raw Arrow data bytes that form the IPC message body.
#[allow(clippy::too_many_arguments)]
fn write_array_data(
array_data: &ArrayData,
buffers: &mut Vec<crate::Buffer>,
- arrow_data: &mut Vec<u8>,
+ sink: &mut IpcBodySink<'_>,
nodes: &mut Vec<crate::FieldNode>,
offset: i64,
num_rows: usize,
@@ -1837,10 +1982,10 @@ fn write_array_data(
Some(buffer) => buffer.inner().sliced(),
};
- offset = write_buffer(
- null_buffer.as_slice(),
+ offset = encode_sink_buffer(
+ null_buffer,
buffers,
- arrow_data,
+ sink,
offset,
compression_codec,
compression_context,
@@ -1852,10 +1997,10 @@ fn write_array_data(
if matches!(data_type, DataType::Binary | DataType::Utf8) {
let (offsets, values) = get_byte_array_buffers::<i32>(array_data);
for buffer in [offsets, values] {
- offset = write_buffer(
- buffer.as_slice(),
+ offset = encode_sink_buffer(
+ buffer,
buffers,
- arrow_data,
+ sink,
offset,
compression_codec,
compression_context,
@@ -1870,10 +2015,10 @@ fn write_array_data(
// If users wants to "compact" the arrays prior to sending them over
IPC,
// they should consider the gc API suggested in #5513
let views = get_or_truncate_buffer(array_data);
- offset = write_buffer(
+ offset = encode_sink_buffer(
views,
buffers,
- arrow_data,
+ sink,
offset,
compression_codec,
compression_context,
@@ -1881,10 +2026,10 @@ fn write_array_data(
)?;
for buffer in array_data.buffers().iter().skip(1) {
- offset = write_buffer(
- buffer.as_slice(),
+ offset = encode_sink_buffer(
+ buffer.clone(),
buffers,
- arrow_data,
+ sink,
offset,
compression_codec,
compression_context,
@@ -1894,10 +2039,10 @@ fn write_array_data(
} else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8)
{
let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
for buffer in [offsets, values] {
- offset = write_buffer(
- buffer.as_slice(),
+ offset = encode_sink_buffer(
+ buffer,
buffers,
- arrow_data,
+ sink,
offset,
compression_codec,
compression_context,
@@ -1915,10 +2060,10 @@ fn write_array_data(
assert_eq!(array_data.buffers().len(), 1);
let buffer = get_or_truncate_buffer(array_data);
- offset = write_buffer(
+ offset = encode_sink_buffer(
buffer,
buffers,
- arrow_data,
+ sink,
offset,
compression_codec,
compression_context,
@@ -1931,10 +2076,10 @@ fn write_array_data(
let buffer = &array_data.buffers()[0];
let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
- offset = write_buffer(
- &buffer,
+ offset = encode_sink_buffer(
+ buffer,
buffers,
- arrow_data,
+ sink,
offset,
compression_codec,
compression_context,
@@ -1954,10 +2099,10 @@ fn write_array_data(
DataType::LargeList(_) =>
get_list_array_buffers::<i64>(array_data),
_ => unreachable!(),
};
- offset = write_buffer(
- offsets.as_slice(),
+ offset = encode_sink_buffer(
+ offsets,
buffers,
- arrow_data,
+ sink,
offset,
compression_codec,
compression_context,
@@ -1966,7 +2111,7 @@ fn write_array_data(
offset = write_array_data(
&sliced_child_data,
buffers,
- arrow_data,
+ sink,
nodes,
offset,
sliced_child_data.len(),
@@ -1989,20 +2134,19 @@ fn write_array_data(
_ => unreachable!(),
};
- offset = write_buffer(
- offsets.as_slice(),
+ offset = encode_sink_buffer(
+ offsets,
buffers,
- arrow_data,
+ sink,
offset,
compression_codec,
compression_context,
write_options.alignment,
)?;
-
- offset = write_buffer(
- sizes.as_slice(),
+ offset = encode_sink_buffer(
+ sizes,
buffers,
- arrow_data,
+ sink,
offset,
compression_codec,
compression_context,
@@ -2012,7 +2156,7 @@ fn write_array_data(
offset = write_array_data(
&child_data,
buffers,
- arrow_data,
+ sink,
nodes,
offset,
child_data.len(),
@@ -2033,7 +2177,7 @@ fn write_array_data(
offset = write_array_data(
&child_data,
buffers,
- arrow_data,
+ sink,
nodes,
offset,
child_data.len(),
@@ -2045,10 +2189,10 @@ fn write_array_data(
return Ok(offset);
} else {
for buffer in array_data.buffers() {
- offset = write_buffer(
- buffer,
+ offset = encode_sink_buffer(
+ buffer.clone(),
buffers,
- arrow_data,
+ sink,
offset,
compression_codec,
compression_context,
@@ -2068,7 +2212,7 @@ fn write_array_data(
offset = write_array_data(
data_ref,
buffers,
- arrow_data,
+ sink,
nodes,
offset,
data_ref.len(),
@@ -2086,7 +2230,7 @@ fn write_array_data(
offset = write_array_data(
data_ref,
buffers,
- arrow_data,
+ sink,
nodes,
offset,
data_ref.len(),
@@ -2101,50 +2245,107 @@ fn write_array_data(
Ok(offset)
}
-/// Write a buffer into `arrow_data`, a vector of bytes, and adds its
-/// [`crate::Buffer`] to `buffers`. Returns the new offset in `arrow_data`
+/// Encodes a single Arrow [`Buffer`] into the IPC body and records its
metadata.
///
+/// - `buffer`: the Arrow data buffer to encode (validity bitmap, offsets,
values, etc.)
+/// - `buffers`: in-progress list of IPC `Buffer` metadata entries (body
offset + length) that
+/// will eventually be serialised into the flatbuffer `RecordBatch` header.
+/// - `sink`: destination for the actual encoded bytes; either a contiguous
`Vec<u8>` for
+/// in-memory writes, or a list of [`EncodedBuffer`] segments for deferred
zero-copy streaming.
+/// - `offset`: running byte offset into the IPC message body, used to compute
the metadata entry.
+/// - `compression_codec` / `compression_context`: if `Some`, the buffer is
compressed before
+/// writing; `compression_context` provides reusable scratch space across
calls.
+/// - `alignment`: each buffer is padded to this many bytes so the next buffer
starts aligned.
///
-/// From
<https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
-/// Each constituent buffer is first compressed with the indicated
-/// compressor, and then written with the uncompressed length in the first 8
-/// bytes as a 64-bit little-endian signed integer followed by the compressed
-/// buffer bytes (and then padding as required by the protocol). The
-/// uncompressed length may be set to -1 to indicate that the data that
-/// follows is not compressed, which can be useful for cases where
-/// compression does not yield appreciable savings.
-fn write_buffer(
- buffer: &[u8], // input
- buffers: &mut Vec<crate::Buffer>, // output buffer descriptors
- arrow_data: &mut Vec<u8>, // output stream
- offset: i64, // current output stream offset
+/// Returns the updated `offset` (advanced by the encoded length plus any
alignment padding).
+fn encode_sink_buffer(
+ buffer: Buffer,
+ buffers: &mut Vec<crate::Buffer>,
+ sink: &mut IpcBodySink<'_>,
+ offset: i64,
compression_codec: Option<CompressionCodec>,
compression_context: &mut CompressionContext,
alignment: u8,
) -> Result<i64, ArrowError> {
- let len: i64 = match compression_codec {
- Some(compressor) => compressor.compress_to_vec(buffer, arrow_data,
compression_context)?,
+ let (encoded, len) = match compression_codec {
None => {
- arrow_data.extend_from_slice(buffer);
- buffer.len()
+ let len = buffer.len() as i64;
+ (EncodedBuffer::Raw(buffer), len)
}
- }
- .try_into()
- .map_err(|e| {
- ArrowError::InvalidArgumentError(format!("Could not convert compressed
size to i64: {e}"))
- })?;
+ Some(codec) => {
+ let mut scratch = Vec::new();
+ let written =
+ codec.compress_to_vec(buffer.as_slice(), &mut scratch,
compression_context)?;
+ let len = i64::try_from(written)
+ .map_err(|e|
ArrowError::InvalidArgumentError(format!("{e}")))?;
+ (EncodedBuffer::Compressed(scratch), len)
+ }
+ };
- // make new index entry
- buffers.push(crate::Buffer::new(offset, len));
- // padding and make offset aligned
let pad_len = pad_to_alignment(alignment, len as usize);
- arrow_data.extend_from_slice(&PADDING[..pad_len]);
-
- Ok(offset + len + (pad_len as i64))
+ sink.write(pad_len, encoded);
+ buffers.push(crate::Buffer::new(offset, len));
+ Ok(offset + len + pad_len as i64)
}
const PADDING: [u8; 64] = [0; 64];
+/// Estimates the number of [`EncodedBuffer`] segments that
[`write_array_data`]
+/// will produce for a column of the given type.
+///
+/// Based on the Arrow IPC buffer layout
+/// (<https://arrow.apache.org/docs/format/Columnar.html#recordbatch-message>):
+#[inline]
+fn estimate_encoded_buffer_count(dt: &DataType) -> usize {
+ match dt {
+ DataType::Null => 0,
+
+ DataType::Binary | DataType::Utf8 | DataType::LargeBinary |
DataType::LargeUtf8 => 3,
+
+ DataType::BinaryView | DataType::Utf8View => 3,
+
+ DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => {
+ 2 + estimate_encoded_buffer_count(f.data_type())
+ }
+
+ DataType::ListView(f) | DataType::LargeListView(f) => {
+ 3 + estimate_encoded_buffer_count(f.data_type())
+ }
+
+ DataType::FixedSizeList(f, _) => 1 +
estimate_encoded_buffer_count(f.data_type()),
+
+ DataType::Struct(fields) => {
+ 1 + fields
+ .iter()
+ .map(|f| estimate_encoded_buffer_count(f.data_type()))
+ .sum::<usize>()
+ }
+
+ // Dictionary indices only; dictionary body is a separate IPC message.
+ DataType::Dictionary(_, _) => 2,
+
+ DataType::Union(fields, UnionMode::Sparse) => {
+ 1 + fields
+ .iter()
+ .map(|(_, f)| estimate_encoded_buffer_count(f.data_type()))
+ .sum::<usize>()
+ }
+ DataType::Union(fields, UnionMode::Dense) => {
+ 2 + fields
+ .iter()
+ .map(|(_, f)| estimate_encoded_buffer_count(f.data_type()))
+ .sum::<usize>()
+ }
+
+ DataType::RunEndEncoded(run_ends, values) => {
+ estimate_encoded_buffer_count(run_ends.data_type())
+ + estimate_encoded_buffer_count(values.data_type())
+ }
+ // Primitive, Bool, temporal, Decimal*, FixedSizeBinary: validity +
values.
+ _ => 2,
+ }
+}
+
/// Calculate an alignment boundary and return the number of bytes needed to
pad to the alignment boundary
#[inline]
fn pad_to_alignment(alignment: u8, len: usize) -> usize {