This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new cecbc72edb removed clippy ignore statment (#10111)
cecbc72edb is described below
commit cecbc72edbb4cdc93ef6e78d38493afb91ada02e
Author: RIchard Baah <[email protected]>
AuthorDate: Thu Jun 11 10:48:11 2026 -0400
removed clippy ignore statment (#10111)
# Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax.
-->
Resolves this
https://github.com/apache/arrow-rs/pull/10044#discussion_r3381759987
from #10044
# Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
Code in this file is hard to navigate & its unclear what is happening.
# What changes are included in this PR?
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
This PR introduces `IpcMetadataBuilde`r, a struct that groups the nodes
and buffers vecs previously passed separately into `write_array_data()`,
and removes the redundant num_rows/null_count parameters by deriving
them from `array_data` directly. Together these reduce
`write_array_data()` from 10 arguments to 7, eliminating the
#[allow(clippy::too_many_arguments)] suppression, and doc comments are
added to clarify the two-channel output model between
`IpcMetadataBuilder` (flatbuffer header metadata) and `IpcBodySink` (raw
Arrow data bytes).
# Are these changes tested?
yes
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
If this PR claims a performance improvement, please include evidence
such as benchmark results.
-->
# Are there any user-facing changes?
no
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
If there are any breaking changes to public APIs, please call them out.
-->
---
arrow-ipc/src/writer.rs | 112 +++++++++++++++++++++---------------------------
1 file changed, 50 insertions(+), 62 deletions(-)
diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs
index d05a5dbc37..4142858ce8 100644
--- a/arrow-ipc/src/writer.rs
+++ b/arrow-ipc/src/writer.rs
@@ -97,7 +97,20 @@ impl EncodedBuffer {
}
}
}
-/// Destination for per-buffer encoded output produced by [`write_array_data`].
+/// Accumulates the IPC metadata produced by [`write_array_data`].
+///
+/// `nodes` and `buffers` are serialised into the flatbuffer `RecordBatch` (or
`DictionaryBatch`)
+/// header. The companion [`IpcBodySink`] holds the actual encoded bytes.
+#[derive(Default)]
+struct IpcMetadataBuilder {
+ nodes: Vec<crate::FieldNode>,
+ buffers: Vec<crate::Buffer>,
+}
+
+/// Destination for the raw Arrow data bytes (the IPC message body) produced
by [`write_array_data`].
+///
+/// The companion [`IpcMetadataBuilder`] accumulates the flatbuffer metadata
+/// (offset + length of each buffer in the body); together they form a
complete IPC message.
enum IpcBodySink<'a> {
/// Serialize buffer bytes (with padding) into a contiguous byte vec.
Write(&'a mut Vec<u8>),
@@ -681,9 +694,6 @@ impl IpcDataGenerator {
) -> Result<(Vec<u8>, usize, usize), ArrowError> {
let mut fbb = FlatBufferBuilder::new();
- let mut nodes: Vec<crate::FieldNode> = vec![];
- let mut buffers: Vec<crate::Buffer> = vec![];
-
let batch_compression_type = write_options.batch_compression_type;
let compression = batch_compression_type.map(|batch_compression_type| {
@@ -698,18 +708,16 @@ impl IpcDataGenerator {
let alignment = write_options.alignment;
let mut variadic_buffer_counts = vec![];
+ let mut meta = IpcMetadataBuilder::default();
let mut offset = 0i64;
for array in batch.columns() {
let array_data = array.to_data();
offset = write_array_data(
&array_data,
- &mut buffers,
+ &mut meta,
sink,
- &mut nodes,
offset,
- array.len(),
- array.null_count(),
compression_codec,
compression_context,
write_options,
@@ -720,8 +728,8 @@ impl IpcDataGenerator {
let tail_pad = pad_to_alignment(alignment, offset as usize);
let body_len = offset as usize + tail_pad;
- let buffers = fbb.create_vector(&buffers);
- let nodes = fbb.create_vector(&nodes);
+ let buffers = fbb.create_vector(&meta.buffers);
+ let nodes = fbb.create_vector(&meta.nodes);
let variadic_buffer = if variadic_buffer_counts.is_empty() {
None
} else {
@@ -765,8 +773,6 @@ impl IpcDataGenerator {
) -> Result<EncodedData, ArrowError> {
let mut fbb = FlatBufferBuilder::new();
- let mut nodes: Vec<crate::FieldNode> = vec![];
- let mut buffers: Vec<crate::Buffer> = vec![];
let mut arrow_data: Vec<u8> = vec![];
// get the type of compression
@@ -784,15 +790,13 @@ impl IpcDataGenerator {
.transpose()?;
let alignment = write_options.alignment;
+ let mut meta = IpcMetadataBuilder::default();
let mut sink = IpcBodySink::Write(&mut arrow_data);
let offset = write_array_data(
array_data,
- &mut buffers,
+ &mut meta,
&mut sink,
- &mut nodes,
0,
- array_data.len(),
- array_data.null_count(),
compression_codec,
compression_context,
write_options,
@@ -807,8 +811,8 @@ impl IpcDataGenerator {
arrow_data.extend_from_slice(&PADDING[..tail_pad]);
// write data
- let buffers = fbb.create_vector(&buffers);
- let nodes = fbb.create_vector(&nodes);
+ let buffers = fbb.create_vector(&meta.buffers);
+ let nodes = fbb.create_vector(&meta.nodes);
let variadic_buffer = if variadic_buffer_counts.is_empty() {
None
} else {
@@ -1945,29 +1949,28 @@ fn get_or_truncate_buffer(array_data: &ArrayData) ->
Buffer {
/// Recursively encodes `array_data` into its IPC representation.
///
/// Output goes to two separate channels:
-/// - `buffers` / `nodes`: IPC metadata (offsets, lengths, null counts) that
will be
-/// serialised into the flatbuffer `RecordBatch` header.
+/// - `meta`: accumulates IPC metadata (`nodes` and `buffers`) for the
flatbuffer header.
/// - `sink`: the raw Arrow data bytes that form the IPC message body.
-#[allow(clippy::too_many_arguments)]
fn write_array_data(
array_data: &ArrayData,
- buffers: &mut Vec<crate::Buffer>,
+ meta: &mut IpcMetadataBuilder,
sink: &mut IpcBodySink<'_>,
- nodes: &mut Vec<crate::FieldNode>,
offset: i64,
- num_rows: usize,
- null_count: usize,
compression_codec: Option<CompressionCodec>,
compression_context: &mut CompressionContext,
write_options: &IpcWriteOptions,
) -> Result<i64, ArrowError> {
let mut offset = offset;
+ let num_rows = array_data.len();
if !matches!(array_data.data_type(), DataType::Null) {
- nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
+ meta.nodes.push(crate::FieldNode::new(
+ num_rows as i64,
+ array_data.null_count() as i64,
+ ));
} else {
- // NullArray's null_count equals to len, but the `null_count` passed
in is from ArrayData
- // where null_count is always 0.
- nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
+ // NullArray's null_count equals to len, but ArrayData null_count is
always 0.
+ meta.nodes
+ .push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
}
if has_validity_bitmap(array_data.data_type(), write_options) {
// write null buffer if exists
@@ -1984,7 +1987,7 @@ fn write_array_data(
offset = encode_sink_buffer(
null_buffer,
- buffers,
+ meta,
sink,
offset,
compression_codec,
@@ -1999,7 +2002,7 @@ fn write_array_data(
for buffer in [offsets, values] {
offset = encode_sink_buffer(
buffer,
- buffers,
+ meta,
sink,
offset,
compression_codec,
@@ -2017,7 +2020,7 @@ fn write_array_data(
let views = get_or_truncate_buffer(array_data);
offset = encode_sink_buffer(
views,
- buffers,
+ meta,
sink,
offset,
compression_codec,
@@ -2028,7 +2031,7 @@ fn write_array_data(
for buffer in array_data.buffers().iter().skip(1) {
offset = encode_sink_buffer(
buffer.clone(),
- buffers,
+ meta,
sink,
offset,
compression_codec,
@@ -2041,7 +2044,7 @@ fn write_array_data(
for buffer in [offsets, values] {
offset = encode_sink_buffer(
buffer,
- buffers,
+ meta,
sink,
offset,
compression_codec,
@@ -2062,7 +2065,7 @@ fn write_array_data(
let buffer = get_or_truncate_buffer(array_data);
offset = encode_sink_buffer(
buffer,
- buffers,
+ meta,
sink,
offset,
compression_codec,
@@ -2078,7 +2081,7 @@ fn write_array_data(
let buffer = buffer.bit_slice(array_data.offset(), array_data.len());
offset = encode_sink_buffer(
buffer,
- buffers,
+ meta,
sink,
offset,
compression_codec,
@@ -2101,7 +2104,7 @@ fn write_array_data(
};
offset = encode_sink_buffer(
offsets,
- buffers,
+ meta,
sink,
offset,
compression_codec,
@@ -2110,12 +2113,9 @@ fn write_array_data(
)?;
offset = write_array_data(
&sliced_child_data,
- buffers,
+ meta,
sink,
- nodes,
offset,
- sliced_child_data.len(),
- sliced_child_data.null_count(),
compression_codec,
compression_context,
write_options,
@@ -2136,7 +2136,7 @@ fn write_array_data(
offset = encode_sink_buffer(
offsets,
- buffers,
+ meta,
sink,
offset,
compression_codec,
@@ -2145,7 +2145,7 @@ fn write_array_data(
)?;
offset = encode_sink_buffer(
sizes,
- buffers,
+ meta,
sink,
offset,
compression_codec,
@@ -2155,12 +2155,9 @@ fn write_array_data(
offset = write_array_data(
&child_data,
- buffers,
+ meta,
sink,
- nodes,
offset,
- child_data.len(),
- child_data.null_count(),
compression_codec,
compression_context,
write_options,
@@ -2176,12 +2173,9 @@ fn write_array_data(
offset = write_array_data(
&child_data,
- buffers,
+ meta,
sink,
- nodes,
offset,
- child_data.len(),
- child_data.null_count(),
compression_codec,
compression_context,
write_options,
@@ -2191,7 +2185,7 @@ fn write_array_data(
for buffer in array_data.buffers() {
offset = encode_sink_buffer(
buffer.clone(),
- buffers,
+ meta,
sink,
offset,
compression_codec,
@@ -2211,12 +2205,9 @@ fn write_array_data(
// write the nested data (e.g list data)
offset = write_array_data(
data_ref,
- buffers,
+ meta,
sink,
- nodes,
offset,
- data_ref.len(),
- data_ref.null_count(),
compression_codec,
compression_context,
write_options,
@@ -2229,12 +2220,9 @@ fn write_array_data(
// write the nested data (e.g list data)
offset = write_array_data(
data_ref,
- buffers,
+ meta,
sink,
- nodes,
offset,
- data_ref.len(),
- data_ref.null_count(),
compression_codec,
compression_context,
write_options,
@@ -2260,7 +2248,7 @@ fn write_array_data(
/// Returns the updated `offset` (advanced by the encoded length plus any
alignment padding).
fn encode_sink_buffer(
buffer: Buffer,
- buffers: &mut Vec<crate::Buffer>,
+ ipc_meta_data: &mut IpcMetadataBuilder,
sink: &mut IpcBodySink<'_>,
offset: i64,
compression_codec: Option<CompressionCodec>,
@@ -2284,7 +2272,7 @@ fn encode_sink_buffer(
let pad_len = pad_to_alignment(alignment, len as usize);
sink.write(pad_len, encoded);
- buffers.push(crate::Buffer::new(offset, len));
+ ipc_meta_data.buffers.push(crate::Buffer::new(offset, len));
Ok(offset + len + pad_len as i64)
}