tustvold commented on code in PR #2040:
URL: https://github.com/apache/arrow-rs/pull/2040#discussion_r918930065
##########
arrow/src/ipc/writer.rs:
##########
@@ -894,12 +1031,66 @@ fn write_array_data(
Some(buffer) => buffer.clone(),
};
- offset = write_buffer(&null_buffer, buffers, arrow_data, offset);
+ offset = write_buffer(null_buffer.as_slice(), buffers, arrow_data,
offset);
}
- array_data.buffers().iter().for_each(|buffer| {
- offset = write_buffer(buffer, buffers, arrow_data, offset);
- });
+ let data_type = array_data.data_type();
Review Comment:
I wonder if we should add an option to IpcWriteOptions to enable/disable
this behaviour. I could see in some cases the rewriting not being worthwhile
##########
arrow/src/ipc/writer.rs:
##########
@@ -894,12 +1031,66 @@ fn write_array_data(
Some(buffer) => buffer.clone(),
};
- offset = write_buffer(&null_buffer, buffers, arrow_data, offset);
+ offset = write_buffer(null_buffer.as_slice(), buffers, arrow_data,
offset);
Review Comment:
This seems suspect to me, we are truncating the values, but don't appear to
be doing the same for the null mask?
##########
arrow/src/ipc/writer.rs:
##########
@@ -861,6 +863,141 @@ fn has_validity_bitmap(data_type: &DataType,
write_options: &IpcWriteOptions) ->
}
}
+/// Whether to truncate the buffer
+#[inline]
+fn buffer_need_truncate(
+ array_offset: usize,
+ buffer: &Buffer,
+ spec: &BufferSpec,
+ min_length: usize,
+) -> bool {
+ spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length <
buffer.len())
+}
+
+/// Returns byte width for a buffer spec. Only for `BufferSpec::FixedWidth`.
+#[inline]
+fn get_buffer_byte_width(spec: &BufferSpec) -> usize {
+ match spec {
+ BufferSpec::FixedWidth { byte_width } => *byte_width,
+ _ => 0,
+ }
+}
+
+/// Returns the number of total bytes in base binary arrays.
+fn get_total_bytes(array_data: &ArrayData) -> usize {
+ if array_data.is_empty() {
+ return 0;
+ }
+ match array_data.data_type() {
+ DataType::Binary => {
+ let array: BinaryArray = array_data.clone().into();
+ let offsets = array.value_offsets();
+ (offsets[array_data.len()] - offsets[0]) as usize
+ }
+ DataType::LargeBinary => {
+ let array: LargeBinaryArray = array_data.clone().into();
+ let offsets = array.value_offsets();
+ (offsets[array_data.len()] - offsets[0]) as usize
+ }
+ DataType::Utf8 => {
+ let array: StringArray = array_data.clone().into();
+ let offsets = array.value_offsets();
+ (offsets[array_data.len()] - offsets[0]) as usize
+ }
+ DataType::LargeUtf8 => {
+ let array: LargeStringArray = array_data.clone().into();
+ let offsets = array.value_offsets();
+ (offsets[array_data.len()] - offsets[0]) as usize
+ }
+ _ => unreachable!(),
+ }
+}
+
+/// Rebase value offsets for given ArrayData to zero-based.
+fn get_zero_based_value_offsets(array_data: &ArrayData) -> Buffer {
+ match array_data.data_type() {
+ DataType::Binary => {
+ let array: BinaryArray = array_data.clone().into();
+ let offsets = array.value_offsets();
+ let start_offset = offsets[0];
+
+ let mut new_offsets = vec![0_i32; array_data.len() + 1];
+ for (idx, x) in offsets.iter().enumerate() {
+ new_offsets[idx] = x - start_offset;
+ }
+
+ Buffer::from_slice_ref(&new_offsets)
+ }
+ DataType::LargeBinary => {
+ let array: LargeBinaryArray = array_data.clone().into();
+ let offsets = array.value_offsets();
+
+ let start_offset = offsets[0];
Review Comment:
It might be nice to extract the copy-pasted logic here into a generic
function on `O: OffsetSize`
##########
arrow/src/ipc/writer.rs:
##########
@@ -861,6 +863,141 @@ fn has_validity_bitmap(data_type: &DataType,
write_options: &IpcWriteOptions) ->
}
}
+/// Whether to truncate the buffer
+#[inline]
+fn buffer_need_truncate(
+ array_offset: usize,
+ buffer: &Buffer,
+ spec: &BufferSpec,
+ min_length: usize,
+) -> bool {
+ spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length <
buffer.len())
+}
+
+/// Returns byte width for a buffer spec. Only for `BufferSpec::FixedWidth`.
+#[inline]
+fn get_buffer_byte_width(spec: &BufferSpec) -> usize {
+ match spec {
+ BufferSpec::FixedWidth { byte_width } => *byte_width,
+ _ => 0,
+ }
+}
+
+/// Returns the number of total bytes in base binary arrays.
+fn get_total_bytes(array_data: &ArrayData) -> usize {
+ if array_data.is_empty() {
+ return 0;
+ }
+ match array_data.data_type() {
+ DataType::Binary => {
+ let array: BinaryArray = array_data.clone().into();
+ let offsets = array.value_offsets();
+ (offsets[array_data.len()] - offsets[0]) as usize
+ }
+ DataType::LargeBinary => {
+ let array: LargeBinaryArray = array_data.clone().into();
+ let offsets = array.value_offsets();
+ (offsets[array_data.len()] - offsets[0]) as usize
+ }
+ DataType::Utf8 => {
+ let array: StringArray = array_data.clone().into();
+ let offsets = array.value_offsets();
+ (offsets[array_data.len()] - offsets[0]) as usize
+ }
+ DataType::LargeUtf8 => {
+ let array: LargeStringArray = array_data.clone().into();
+ let offsets = array.value_offsets();
+ (offsets[array_data.len()] - offsets[0]) as usize
+ }
+ _ => unreachable!(),
+ }
+}
+
+/// Rebase value offsets for given ArrayData to zero-based.
+fn get_zero_based_value_offsets(array_data: &ArrayData) -> Buffer {
+ match array_data.data_type() {
+ DataType::Binary => {
+ let array: BinaryArray = array_data.clone().into();
+ let offsets = array.value_offsets();
+ let start_offset = offsets[0];
+
+ let mut new_offsets = vec![0_i32; array_data.len() + 1];
+ for (idx, x) in offsets.iter().enumerate() {
+ new_offsets[idx] = x - start_offset;
+ }
+
+ Buffer::from_slice_ref(&new_offsets)
Review Comment:
Using BufferBuilder will be faster as it will avoid a copy
##########
arrow/src/ipc/writer.rs:
##########
@@ -894,12 +1031,66 @@ fn write_array_data(
Some(buffer) => buffer.clone(),
};
- offset = write_buffer(&null_buffer, buffers, arrow_data, offset);
+ offset = write_buffer(null_buffer.as_slice(), buffers, arrow_data,
offset);
}
- array_data.buffers().iter().for_each(|buffer| {
- offset = write_buffer(buffer, buffers, arrow_data, offset);
- });
+ let data_type = array_data.data_type();
+ if matches!(
+ data_type,
+ DataType::Binary | DataType::LargeBinary | DataType::Utf8 |
DataType::LargeUtf8
+ ) {
+ let total_bytes = get_total_bytes(array_data);
+ let value_buffer = &array_data.buffers()[1];
+ if buffer_need_truncate(
+ array_data.offset(),
+ value_buffer,
+ &BufferSpec::VariableWidth,
+ total_bytes,
+ ) {
+ // Rebase offsets and truncate values
+ let new_offsets = get_zero_based_value_offsets(array_data);
+ offset = write_buffer(new_offsets.as_slice(), buffers, arrow_data,
offset);
+
+ let byte_offset = get_buffer_offset(array_data);
+ let buffer_length = min(total_bytes, value_buffer.len() -
byte_offset);
+ let buffer_slice =
+ &value_buffer.as_slice()[byte_offset..(byte_offset +
buffer_length)];
+ offset = write_buffer(buffer_slice, buffers, arrow_data, offset);
+ } else {
+ array_data.buffers().iter().for_each(|buffer| {
+ offset = write_buffer(buffer.as_slice(), buffers, arrow_data,
offset);
+ });
+ }
+ } else if DataType::is_numeric(data_type)
+ || DataType::is_temporal(data_type)
+ || matches!(array_data.data_type(), DataType::FixedSizeBinary(_))
Review Comment:
I think this could also support DictionaryArray without modification?
##########
arrow/src/ipc/writer.rs:
##########
@@ -894,12 +1031,66 @@ fn write_array_data(
Some(buffer) => buffer.clone(),
};
- offset = write_buffer(&null_buffer, buffers, arrow_data, offset);
+ offset = write_buffer(null_buffer.as_slice(), buffers, arrow_data,
offset);
}
- array_data.buffers().iter().for_each(|buffer| {
- offset = write_buffer(buffer, buffers, arrow_data, offset);
- });
+ let data_type = array_data.data_type();
+ if matches!(
+ data_type,
+ DataType::Binary | DataType::LargeBinary | DataType::Utf8 |
DataType::LargeUtf8
+ ) {
+ let total_bytes = get_total_bytes(array_data);
+ let value_buffer = &array_data.buffers()[1];
+ if buffer_need_truncate(
+ array_data.offset(),
+ value_buffer,
+ &BufferSpec::VariableWidth,
+ total_bytes,
+ ) {
+ // Rebase offsets and truncate values
+ let new_offsets = get_zero_based_value_offsets(array_data);
+ offset = write_buffer(new_offsets.as_slice(), buffers, arrow_data,
offset);
Review Comment:
Definitely something that could be left to another PR, but I do wonder if
there might be some way to write the new offsets directly without buffering
them first. I'm not very familiar with the IPC code so not sure how feasible
this may be
##########
arrow/src/ipc/writer.rs:
##########
@@ -861,6 +863,141 @@ fn has_validity_bitmap(data_type: &DataType,
write_options: &IpcWriteOptions) ->
}
}
+/// Whether to truncate the buffer
+#[inline]
+fn buffer_need_truncate(
+ array_offset: usize,
+ buffer: &Buffer,
+ spec: &BufferSpec,
+ min_length: usize,
+) -> bool {
+ spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length <
buffer.len())
+}
+
+/// Returns byte width for a buffer spec. Only for `BufferSpec::FixedWidth`.
+#[inline]
+fn get_buffer_byte_width(spec: &BufferSpec) -> usize {
+ match spec {
+ BufferSpec::FixedWidth { byte_width } => *byte_width,
+ _ => 0,
+ }
+}
+
+/// Returns the number of total bytes in base binary arrays.
+fn get_total_bytes(array_data: &ArrayData) -> usize {
Review Comment:
```suggestion
fn get_binary_buffer_len(array_data: &ArrayData) -> usize {
```
Or something to make a bit clear what it is the total of?
##########
arrow/src/ipc/writer.rs:
##########
@@ -861,6 +863,141 @@ fn has_validity_bitmap(data_type: &DataType,
write_options: &IpcWriteOptions) ->
}
}
+/// Whether to truncate the buffer
+#[inline]
+fn buffer_need_truncate(
+ array_offset: usize,
+ buffer: &Buffer,
+ spec: &BufferSpec,
+ min_length: usize,
+) -> bool {
+ spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length <
buffer.len())
+}
+
+/// Returns byte width for a buffer spec. Only for `BufferSpec::FixedWidth`.
+#[inline]
+fn get_buffer_byte_width(spec: &BufferSpec) -> usize {
+ match spec {
+ BufferSpec::FixedWidth { byte_width } => *byte_width,
+ _ => 0,
+ }
+}
+
+/// Returns the number of total bytes in base binary arrays.
+fn get_total_bytes(array_data: &ArrayData) -> usize {
+ if array_data.is_empty() {
+ return 0;
+ }
+ match array_data.data_type() {
+ DataType::Binary => {
+ let array: BinaryArray = array_data.clone().into();
+ let offsets = array.value_offsets();
+ (offsets[array_data.len()] - offsets[0]) as usize
+ }
+ DataType::LargeBinary => {
+ let array: LargeBinaryArray = array_data.clone().into();
+ let offsets = array.value_offsets();
+ (offsets[array_data.len()] - offsets[0]) as usize
+ }
+ DataType::Utf8 => {
+ let array: StringArray = array_data.clone().into();
+ let offsets = array.value_offsets();
+ (offsets[array_data.len()] - offsets[0]) as usize
+ }
+ DataType::LargeUtf8 => {
+ let array: LargeStringArray = array_data.clone().into();
+ let offsets = array.value_offsets();
+ (offsets[array_data.len()] - offsets[0]) as usize
+ }
+ _ => unreachable!(),
+ }
+}
+
+/// Rebase value offsets for given ArrayData to zero-based.
+fn get_zero_based_value_offsets(array_data: &ArrayData) -> Buffer {
+ match array_data.data_type() {
+ DataType::Binary => {
+ let array: BinaryArray = array_data.clone().into();
+ let offsets = array.value_offsets();
+ let start_offset = offsets[0];
+
+ let mut new_offsets = vec![0_i32; array_data.len() + 1];
Review Comment:
I don't think we need to zero-initialize here
##########
arrow/src/ipc/writer.rs:
##########
@@ -861,6 +863,141 @@ fn has_validity_bitmap(data_type: &DataType,
write_options: &IpcWriteOptions) ->
}
}
+/// Whether to truncate the buffer
+#[inline]
+fn buffer_need_truncate(
+ array_offset: usize,
+ buffer: &Buffer,
+ spec: &BufferSpec,
+ min_length: usize,
+) -> bool {
+ spec != &BufferSpec::AlwaysNull && (array_offset != 0 || min_length <
buffer.len())
+}
+
+/// Returns byte width for a buffer spec. Only for `BufferSpec::FixedWidth`.
+#[inline]
+fn get_buffer_byte_width(spec: &BufferSpec) -> usize {
Review Comment:
```suggestion
fn get_buffer_element_width(spec: &BufferSpec) -> usize {
```
##########
arrow/src/ipc/writer.rs:
##########
@@ -894,12 +1031,66 @@ fn write_array_data(
Some(buffer) => buffer.clone(),
};
- offset = write_buffer(&null_buffer, buffers, arrow_data, offset);
+ offset = write_buffer(null_buffer.as_slice(), buffers, arrow_data,
offset);
}
- array_data.buffers().iter().for_each(|buffer| {
- offset = write_buffer(buffer, buffers, arrow_data, offset);
- });
+ let data_type = array_data.data_type();
+ if matches!(
+ data_type,
+ DataType::Binary | DataType::LargeBinary | DataType::Utf8 |
DataType::LargeUtf8
+ ) {
+ let total_bytes = get_total_bytes(array_data);
+ let value_buffer = &array_data.buffers()[1];
+ if buffer_need_truncate(
+ array_data.offset(),
+ value_buffer,
+ &BufferSpec::VariableWidth,
+ total_bytes,
+ ) {
+ // Rebase offsets and truncate values
+ let new_offsets = get_zero_based_value_offsets(array_data);
+ offset = write_buffer(new_offsets.as_slice(), buffers, arrow_data,
offset);
+
+ let byte_offset = get_buffer_offset(array_data);
+ let buffer_length = min(total_bytes, value_buffer.len() -
byte_offset);
+ let buffer_slice =
+ &value_buffer.as_slice()[byte_offset..(byte_offset +
buffer_length)];
+ offset = write_buffer(buffer_slice, buffers, arrow_data, offset);
+ } else {
+ array_data.buffers().iter().for_each(|buffer| {
+ offset = write_buffer(buffer.as_slice(), buffers, arrow_data,
offset);
+ });
+ }
+ } else if DataType::is_numeric(data_type)
+ || DataType::is_temporal(data_type)
+ || matches!(array_data.data_type(), DataType::FixedSizeBinary(_))
+ {
+ // Truncate values
+ let layout = layout(data_type);
+
+ array_data
Review Comment:
This code could theoretically be simpler if it made the valid assertion that
all these arrays only have a single buffer
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]