This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 37e408b01e Refactor arrow-ipc: Move `create_*_array` methods into
`RecordBatchDecoder` (#7029)
37e408b01e is described below
commit 37e408b01e47faf51f856c0ff8c46832d10430d6
Author: Andrew Lamb <[email protected]>
AuthorDate: Sat Feb 8 09:24:07 2025 -0500
Refactor arrow-ipc: Move `create_*_array` methods into `RecordBatchDecoder`
(#7029)
* Move `create_primitive_array` into RecordBatchReader
* Move `create_list-array` into RecordBatchReader
* Move `create_dictionay_array` into RecordBatchReader
---
arrow-ipc/src/reader.rs | 228 ++++++++++++++++++++++--------------------------
1 file changed, 104 insertions(+), 124 deletions(-)
diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index e79ab23211..2ab6181777 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -85,16 +85,15 @@ impl RecordBatchDecoder<'_> {
) -> Result<ArrayRef, ArrowError> {
let data_type = field.data_type();
match data_type {
- Utf8 | Binary | LargeBinary | LargeUtf8 => create_primitive_array(
- self.next_node(field)?,
- data_type,
- &[
+ Utf8 | Binary | LargeBinary | LargeUtf8 => {
+ let field_node = self.next_node(field)?;
+ let buffers = [
self.next_buffer()?,
self.next_buffer()?,
self.next_buffer()?,
- ],
- self.require_alignment,
- ),
+ ];
+ self.create_primitive_array(field_node, data_type, &buffers)
+ }
BinaryView | Utf8View => {
let count = variadic_counts
.pop_front()
@@ -105,42 +104,25 @@ impl RecordBatchDecoder<'_> {
let buffers = (0..count)
.map(|_| self.next_buffer())
.collect::<Result<Vec<_>, _>>()?;
- create_primitive_array(
- self.next_node(field)?,
- data_type,
- &buffers,
- self.require_alignment,
- )
+ let field_node = self.next_node(field)?;
+ self.create_primitive_array(field_node, data_type, &buffers)
+ }
+ FixedSizeBinary(_) => {
+ let field_node = self.next_node(field)?;
+ let buffers = [self.next_buffer()?, self.next_buffer()?];
+ self.create_primitive_array(field_node, data_type, &buffers)
}
- FixedSizeBinary(_) => create_primitive_array(
- self.next_node(field)?,
- data_type,
- &[self.next_buffer()?, self.next_buffer()?],
- self.require_alignment,
- ),
List(ref list_field) | LargeList(ref list_field) | Map(ref
list_field, _) => {
let list_node = self.next_node(field)?;
let list_buffers = [self.next_buffer()?, self.next_buffer()?];
let values = self.create_array(list_field, variadic_counts)?;
- create_list_array(
- list_node,
- data_type,
- &list_buffers,
- values,
- self.require_alignment,
- )
+ self.create_list_array(list_node, data_type, &list_buffers,
values)
}
FixedSizeList(ref list_field, _) => {
let list_node = self.next_node(field)?;
let list_buffers = [self.next_buffer()?];
let values = self.create_array(list_field, variadic_counts)?;
- create_list_array(
- list_node,
- data_type,
- &list_buffers,
- values,
- self.require_alignment,
- )
+ self.create_list_array(list_node, data_type, &list_buffers,
values)
}
Struct(struct_fields) => {
let struct_node = self.next_node(field)?;
@@ -205,12 +187,11 @@ impl RecordBatchDecoder<'_> {
))
})?;
- create_dictionary_array(
+ self.create_dictionary_array(
index_node,
data_type,
&index_buffers,
value_array.clone(),
- self.require_alignment,
)
}
Union(fields, mode) => {
@@ -265,107 +246,106 @@ impl RecordBatchDecoder<'_> {
// no buffer increases
Ok(Arc::new(NullArray::from(array_data)))
}
- _ => create_primitive_array(
- self.next_node(field)?,
- data_type,
- &[self.next_buffer()?, self.next_buffer()?],
- self.require_alignment,
- ),
+ _ => {
+ let field_node = self.next_node(field)?;
+ let buffers = [self.next_buffer()?, self.next_buffer()?];
+ self.create_primitive_array(field_node, data_type, &buffers)
+ }
}
}
-}
-/// Reads the correct number of buffers based on data type and null_count, and
creates a
-/// primitive array ref
-fn create_primitive_array(
- field_node: &FieldNode,
- data_type: &DataType,
- buffers: &[Buffer],
- require_alignment: bool,
-) -> Result<ArrayRef, ArrowError> {
- let length = field_node.length() as usize;
- let null_buffer = (field_node.null_count() >
0).then_some(buffers[0].clone());
- let builder = match data_type {
- Utf8 | Binary | LargeBinary | LargeUtf8 => {
- // read 3 buffers: null buffer (optional), offsets buffer and data
buffer
- ArrayData::builder(data_type.clone())
- .len(length)
- .buffers(buffers[1..3].to_vec())
- .null_bit_buffer(null_buffer)
- }
- BinaryView | Utf8View => ArrayData::builder(data_type.clone())
- .len(length)
- .buffers(buffers[1..].to_vec())
- .null_bit_buffer(null_buffer),
- _ if data_type.is_primitive() || matches!(data_type, Boolean |
FixedSizeBinary(_)) => {
- // read 2 buffers: null buffer (optional) and data buffer
- ArrayData::builder(data_type.clone())
+ /// Reads the correct number of buffers based on data type and null_count,
and creates a
+ /// primitive array ref
+ fn create_primitive_array(
+ &self,
+ field_node: &FieldNode,
+ data_type: &DataType,
+ buffers: &[Buffer],
+ ) -> Result<ArrayRef, ArrowError> {
+ let length = field_node.length() as usize;
+ let null_buffer = (field_node.null_count() >
0).then_some(buffers[0].clone());
+ let builder = match data_type {
+ Utf8 | Binary | LargeBinary | LargeUtf8 => {
+ // read 3 buffers: null buffer (optional), offsets buffer and
data buffer
+ ArrayData::builder(data_type.clone())
+ .len(length)
+ .buffers(buffers[1..3].to_vec())
+ .null_bit_buffer(null_buffer)
+ }
+ BinaryView | Utf8View => ArrayData::builder(data_type.clone())
.len(length)
- .add_buffer(buffers[1].clone())
- .null_bit_buffer(null_buffer)
- }
- t => unreachable!("Data type {:?} either unsupported or not
primitive", t),
- };
+ .buffers(buffers[1..].to_vec())
+ .null_bit_buffer(null_buffer),
+ _ if data_type.is_primitive() || matches!(data_type, Boolean |
FixedSizeBinary(_)) => {
+ // read 2 buffers: null buffer (optional) and data buffer
+ ArrayData::builder(data_type.clone())
+ .len(length)
+ .add_buffer(buffers[1].clone())
+ .null_bit_buffer(null_buffer)
+ }
+ t => unreachable!("Data type {:?} either unsupported or not
primitive", t),
+ };
- let array_data = builder.align_buffers(!require_alignment).build()?;
+ let array_data =
builder.align_buffers(!self.require_alignment).build()?;
- Ok(make_array(array_data))
-}
+ Ok(make_array(array_data))
+ }
-/// Reads the correct number of buffers based on list type and null_count, and
creates a
-/// list array ref
-fn create_list_array(
- field_node: &FieldNode,
- data_type: &DataType,
- buffers: &[Buffer],
- child_array: ArrayRef,
- require_alignment: bool,
-) -> Result<ArrayRef, ArrowError> {
- let null_buffer = (field_node.null_count() >
0).then_some(buffers[0].clone());
- let length = field_node.length() as usize;
- let child_data = child_array.into_data();
- let builder = match data_type {
- List(_) | LargeList(_) | Map(_, _) =>
ArrayData::builder(data_type.clone())
- .len(length)
- .add_buffer(buffers[1].clone())
- .add_child_data(child_data)
- .null_bit_buffer(null_buffer),
-
- FixedSizeList(_, _) => ArrayData::builder(data_type.clone())
- .len(length)
- .add_child_data(child_data)
- .null_bit_buffer(null_buffer),
-
- _ => unreachable!("Cannot create list or map array from {:?}",
data_type),
- };
+ /// Reads the correct number of buffers based on list type and null_count,
and creates a
+ /// list array ref
+ fn create_list_array(
+ &self,
+ field_node: &FieldNode,
+ data_type: &DataType,
+ buffers: &[Buffer],
+ child_array: ArrayRef,
+ ) -> Result<ArrayRef, ArrowError> {
+ let null_buffer = (field_node.null_count() >
0).then_some(buffers[0].clone());
+ let length = field_node.length() as usize;
+ let child_data = child_array.into_data();
+ let builder = match data_type {
+ List(_) | LargeList(_) | Map(_, _) =>
ArrayData::builder(data_type.clone())
+ .len(length)
+ .add_buffer(buffers[1].clone())
+ .add_child_data(child_data)
+ .null_bit_buffer(null_buffer),
- let array_data = builder.align_buffers(!require_alignment).build()?;
+ FixedSizeList(_, _) => ArrayData::builder(data_type.clone())
+ .len(length)
+ .add_child_data(child_data)
+ .null_bit_buffer(null_buffer),
- Ok(make_array(array_data))
-}
+ _ => unreachable!("Cannot create list or map array from {:?}",
data_type),
+ };
-/// Reads the correct number of buffers based on list type and null_count, and
creates a
-/// list array ref
-fn create_dictionary_array(
- field_node: &FieldNode,
- data_type: &DataType,
- buffers: &[Buffer],
- value_array: ArrayRef,
- require_alignment: bool,
-) -> Result<ArrayRef, ArrowError> {
- if let Dictionary(_, _) = *data_type {
- let null_buffer = (field_node.null_count() >
0).then_some(buffers[0].clone());
- let array_data = ArrayData::builder(data_type.clone())
- .len(field_node.length() as usize)
- .add_buffer(buffers[1].clone())
- .add_child_data(value_array.into_data())
- .null_bit_buffer(null_buffer)
- .align_buffers(!require_alignment)
- .build()?;
+ let array_data =
builder.align_buffers(!self.require_alignment).build()?;
Ok(make_array(array_data))
- } else {
- unreachable!("Cannot create dictionary array from {:?}", data_type)
+ }
+
+ /// Reads the correct number of buffers based on list type and null_count,
and creates a
+ /// list array ref
+ fn create_dictionary_array(
+ &self,
+ field_node: &FieldNode,
+ data_type: &DataType,
+ buffers: &[Buffer],
+ value_array: ArrayRef,
+ ) -> Result<ArrayRef, ArrowError> {
+ if let Dictionary(_, _) = *data_type {
+ let null_buffer = (field_node.null_count() >
0).then_some(buffers[0].clone());
+ let array_data = ArrayData::builder(data_type.clone())
+ .len(field_node.length() as usize)
+ .add_buffer(buffers[1].clone())
+ .add_child_data(value_array.into_data())
+ .null_bit_buffer(null_buffer)
+ .align_buffers(!self.require_alignment)
+ .build()?;
+
+ Ok(make_array(array_data))
+ } else {
+ unreachable!("Cannot create dictionary array from {:?}", data_type)
+ }
}
}