This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 197c425285 Validate ArrayData Buffer Alignment and Automatically Align 
IPC buffers (#4255) (#4681)
197c425285 is described below

commit 197c425285219706f0a8393468c55e2ccd82e6e8
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed Aug 16 11:53:46 2023 +0100

    Validate ArrayData Buffer Alignment and Automatically Align IPC buffers 
(#4255) (#4681)
    
    * Automatically align misaligned IPC buffers (#4255)
    
    * Update test
    
    * Further test fix
    
    * Format
    
    * Review feedback
    
    * More docs
---
 arrow-array/src/array/list_array.rs |  12 +-
 arrow-array/src/types.rs            |   4 +-
 arrow-data/src/data.rs              | 230 +++++++++++++++++++++++++-----------
 arrow-ipc/src/reader.rs             | 120 ++++++++-----------
 arrow-ipc/src/writer.rs             |   2 +-
 arrow/tests/array_validation.rs     |   4 +-
 6 files changed, 223 insertions(+), 149 deletions(-)

diff --git a/arrow-array/src/array/list_array.rs 
b/arrow-array/src/array/list_array.rs
index f5b7ae77c3..3508e4f1c4 100644
--- a/arrow-array/src/array/list_array.rs
+++ b/arrow-array/src/array/list_array.rs
@@ -1037,13 +1037,17 @@ mod tests {
     #[should_panic(
         expected = "Memory pointer is not aligned with the specified scalar 
type"
     )]
+    // Different error messages, so skip for now
+    // https://github.com/apache/arrow-rs/issues/1545
+    #[cfg(not(feature = "force_validate"))]
     fn test_primitive_array_alignment() {
         let buf = Buffer::from_slice_ref([0_u64]);
         let buf2 = buf.slice(1);
-        let array_data = ArrayData::builder(DataType::Int32)
-            .add_buffer(buf2)
-            .build()
-            .unwrap();
+        let array_data = unsafe {
+            ArrayData::builder(DataType::Int32)
+                .add_buffer(buf2)
+                .build_unchecked()
+        };
         drop(Int32Array::from(array_data));
     }
 
diff --git a/arrow-array/src/types.rs b/arrow-array/src/types.rs
index 769dbf974b..d79b32a991 100644
--- a/arrow-array/src/types.rs
+++ b/arrow-array/src/types.rs
@@ -1494,7 +1494,6 @@ pub type LargeBinaryType = GenericBinaryType<i64>;
 mod tests {
     use super::*;
     use arrow_data::{layout, BufferSpec};
-    use std::mem::size_of;
 
     #[test]
     fn month_day_nano_should_roundtrip() {
@@ -1541,7 +1540,8 @@ mod tests {
         assert_eq!(
             spec,
             &BufferSpec::FixedWidth {
-                byte_width: size_of::<T::Native>()
+                byte_width: std::mem::size_of::<T::Native>(),
+                alignment: std::mem::align_of::<T::Native>(),
             }
         );
     }
diff --git a/arrow-data/src/data.rs b/arrow-data/src/data.rs
index 6ff8a824b2..0417e1d357 100644
--- a/arrow-data/src/data.rs
+++ b/arrow-data/src/data.rs
@@ -20,7 +20,7 @@
 
 use crate::bit_iterator::BitSliceIterator;
 use arrow_buffer::buffer::{BooleanBuffer, NullBuffer};
-use arrow_buffer::{bit_util, ArrowNativeType, Buffer, MutableBuffer};
+use arrow_buffer::{bit_util, i256, ArrowNativeType, Buffer, MutableBuffer};
 use arrow_schema::{ArrowError, DataType, UnionMode};
 use std::convert::TryInto;
 use std::mem;
@@ -451,7 +451,7 @@ impl ArrayData {
 
         for spec in layout.buffers.iter() {
             match spec {
-                BufferSpec::FixedWidth { byte_width } => {
+                BufferSpec::FixedWidth { byte_width, .. } => {
                     let buffer_size =
                         self.len.checked_mul(*byte_width).ok_or_else(|| {
                             ArrowError::ComputeError(
@@ -699,6 +699,23 @@ impl ArrayData {
         Self::new_null(data_type, 0)
     }
 
+    /// Verifies that the buffers meet the minimum alignment requirements for 
the data type
+    ///
+    /// Buffers that are not adequately aligned will be copied to a new 
aligned allocation
+    ///
+    /// This can be useful for when interacting with data sent over IPC or 
FFI, that may
+    /// not meet the minimum alignment requirements
+    fn align_buffers(&mut self) {
+        let layout = layout(&self.data_type);
+        for (buffer, spec) in self.buffers.iter_mut().zip(&layout.buffers) {
+            if let BufferSpec::FixedWidth { alignment, .. } = spec {
+                if buffer.as_ptr().align_offset(*alignment) != 0 {
+                    *buffer = Buffer::from_slice_ref(buffer.as_ref())
+                }
+            }
+        }
+    }
+
     /// "cheap" validation of an `ArrayData`. Ensures buffers are
     /// sufficiently sized to store `len` + `offset` total elements of
     /// `data_type` and performs other inexpensive consistency checks.
@@ -736,10 +753,11 @@ impl ArrayData {
             self.buffers.iter().zip(layout.buffers.iter()).enumerate()
         {
             match spec {
-                BufferSpec::FixedWidth { byte_width } => {
-                    let min_buffer_size = len_plus_offset
-                        .checked_mul(*byte_width)
-                        .expect("integer overflow computing min buffer size");
+                BufferSpec::FixedWidth {
+                    byte_width,
+                    alignment,
+                } => {
+                    let min_buffer_size = 
len_plus_offset.saturating_mul(*byte_width);
 
                     if buffer.len() < min_buffer_size {
                         return Err(ArrowError::InvalidArgumentError(format!(
@@ -747,6 +765,14 @@ impl ArrayData {
                             min_buffer_size, i, self.data_type, buffer.len()
                         )));
                     }
+
+                    let align_offset = 
buffer.as_ptr().align_offset(*alignment);
+                    if align_offset != 0 {
+                        return Err(ArrowError::InvalidArgumentError(format!(
+                            "Misaligned buffers[{i}] in array of type {:?}, 
offset from expected alignment of {alignment} by {}",
+                            self.data_type, align_offset.min(alignment - 
align_offset)
+                        )));
+                    }
                 }
                 BufferSpec::VariableWidth => {
                     // not cheap to validate (need to look at the
@@ -1493,7 +1519,8 @@ impl ArrayData {
 pub fn layout(data_type: &DataType) -> DataTypeLayout {
     // based on C/C++ implementation in
     // 
https://github.com/apache/arrow/blob/661c7d749150905a63dd3b52e0a04dac39030d95/cpp/src/arrow/type.h
 (and .cc)
-    use std::mem::size_of;
+    use arrow_schema::IntervalUnit::*;
+
     match data_type {
         DataType::Null => DataTypeLayout {
             buffers: vec![],
@@ -1503,44 +1530,52 @@ pub fn layout(data_type: &DataType) -> DataTypeLayout {
             buffers: vec![BufferSpec::BitMap],
             can_contain_null_mask: true,
         },
-        DataType::Int8
-        | DataType::Int16
-        | DataType::Int32
-        | DataType::Int64
-        | DataType::UInt8
-        | DataType::UInt16
-        | DataType::UInt32
-        | DataType::UInt64
-        | DataType::Float16
-        | DataType::Float32
-        | DataType::Float64
-        | DataType::Timestamp(_, _)
-        | DataType::Date32
-        | DataType::Date64
-        | DataType::Time32(_)
-        | DataType::Time64(_)
-        | DataType::Interval(_) => {
-            
DataTypeLayout::new_fixed_width(data_type.primitive_width().unwrap())
-        }
-        DataType::Duration(_) => 
DataTypeLayout::new_fixed_width(size_of::<i64>()),
-        DataType::Binary => DataTypeLayout::new_binary(size_of::<i32>()),
-        DataType::FixedSizeBinary(bytes_per_value) => {
-            let bytes_per_value: usize = (*bytes_per_value)
-                .try_into()
-                .expect("negative size for fixed size binary");
-            DataTypeLayout::new_fixed_width(bytes_per_value)
+        DataType::Int8 => DataTypeLayout::new_fixed_width::<i8>(),
+        DataType::Int16 => DataTypeLayout::new_fixed_width::<i16>(),
+        DataType::Int32 => DataTypeLayout::new_fixed_width::<i32>(),
+        DataType::Int64 => DataTypeLayout::new_fixed_width::<i64>(),
+        DataType::UInt8 => DataTypeLayout::new_fixed_width::<u8>(),
+        DataType::UInt16 => DataTypeLayout::new_fixed_width::<u16>(),
+        DataType::UInt32 => DataTypeLayout::new_fixed_width::<u32>(),
+        DataType::UInt64 => DataTypeLayout::new_fixed_width::<u64>(),
+        DataType::Float16 => DataTypeLayout::new_fixed_width::<half::f16>(),
+        DataType::Float32 => DataTypeLayout::new_fixed_width::<f32>(),
+        DataType::Float64 => DataTypeLayout::new_fixed_width::<f64>(),
+        DataType::Timestamp(_, _) => DataTypeLayout::new_fixed_width::<i64>(),
+        DataType::Date32 => DataTypeLayout::new_fixed_width::<i32>(),
+        DataType::Date64 => DataTypeLayout::new_fixed_width::<i64>(),
+        DataType::Time32(_) => DataTypeLayout::new_fixed_width::<i32>(),
+        DataType::Time64(_) => DataTypeLayout::new_fixed_width::<i64>(),
+        DataType::Interval(YearMonth) => 
DataTypeLayout::new_fixed_width::<i32>(),
+        DataType::Interval(DayTime) => 
DataTypeLayout::new_fixed_width::<i64>(),
+        DataType::Interval(MonthDayNano) => 
DataTypeLayout::new_fixed_width::<i128>(),
+        DataType::Duration(_) => DataTypeLayout::new_fixed_width::<i64>(),
+        DataType::Decimal128(_, _) => 
DataTypeLayout::new_fixed_width::<i128>(),
+        DataType::Decimal256(_, _) => 
DataTypeLayout::new_fixed_width::<i256>(),
+        DataType::FixedSizeBinary(size) => {
+            let spec = BufferSpec::FixedWidth {
+                byte_width: (*size).try_into().unwrap(),
+                alignment: mem::align_of::<u8>(),
+            };
+            DataTypeLayout {
+                buffers: vec![spec],
+                can_contain_null_mask: true,
+            }
         }
-        DataType::LargeBinary => DataTypeLayout::new_binary(size_of::<i64>()),
-        DataType::Utf8 => DataTypeLayout::new_binary(size_of::<i32>()),
-        DataType::LargeUtf8 => DataTypeLayout::new_binary(size_of::<i64>()),
-        DataType::List(_) => DataTypeLayout::new_fixed_width(size_of::<i32>()),
+        DataType::Binary => DataTypeLayout::new_binary::<i32>(),
+        DataType::LargeBinary => DataTypeLayout::new_binary::<i64>(),
+        DataType::Utf8 => DataTypeLayout::new_binary::<i32>(),
+        DataType::LargeUtf8 => DataTypeLayout::new_binary::<i64>(),
         DataType::FixedSizeList(_, _) => DataTypeLayout::new_empty(), // all 
in child data
-        DataType::LargeList(_) => 
DataTypeLayout::new_fixed_width(size_of::<i64>()),
+        DataType::List(_) => DataTypeLayout::new_fixed_width::<i32>(),
+        DataType::LargeList(_) => DataTypeLayout::new_fixed_width::<i64>(),
+        DataType::Map(_, _) => DataTypeLayout::new_fixed_width::<i32>(),
         DataType::Struct(_) => DataTypeLayout::new_empty(), // all in child 
data,
         DataType::RunEndEncoded(_, _) => DataTypeLayout::new_empty(), // all 
in child data,
         DataType::Union(_, mode) => {
             let type_ids = BufferSpec::FixedWidth {
-                byte_width: size_of::<i8>(),
+                byte_width: mem::size_of::<i8>(),
+                alignment: mem::align_of::<i8>(),
             };
 
             DataTypeLayout {
@@ -1552,7 +1587,8 @@ pub fn layout(data_type: &DataType) -> DataTypeLayout {
                         vec![
                             type_ids,
                             BufferSpec::FixedWidth {
-                                byte_width: size_of::<i32>(),
+                                byte_width: mem::size_of::<i32>(),
+                                alignment: mem::align_of::<i32>(),
                             },
                         ]
                     }
@@ -1561,19 +1597,6 @@ pub fn layout(data_type: &DataType) -> DataTypeLayout {
             }
         }
         DataType::Dictionary(key_type, _value_type) => layout(key_type),
-        DataType::Decimal128(_, _) => {
-            // Decimals are always some fixed width; The rust implementation
-            // always uses 16 bytes / size of i128
-            DataTypeLayout::new_fixed_width(size_of::<i128>())
-        }
-        DataType::Decimal256(_, _) => {
-            // Decimals are always some fixed width.
-            DataTypeLayout::new_fixed_width(32)
-        }
-        DataType::Map(_, _) => {
-            // same as ListType
-            DataTypeLayout::new_fixed_width(size_of::<i32>())
-        }
     }
 }
 
@@ -1589,10 +1612,13 @@ pub struct DataTypeLayout {
 }
 
 impl DataTypeLayout {
-    /// Describes a basic numeric array where each element has a fixed width
-    pub fn new_fixed_width(byte_width: usize) -> Self {
+    /// Describes a basic numeric array where each element has type `T`
+    pub fn new_fixed_width<T>() -> Self {
         Self {
-            buffers: vec![BufferSpec::FixedWidth { byte_width }],
+            buffers: vec![BufferSpec::FixedWidth {
+                byte_width: mem::size_of::<T>(),
+                alignment: mem::align_of::<T>(),
+            }],
             can_contain_null_mask: true,
         }
     }
@@ -1608,14 +1634,15 @@ impl DataTypeLayout {
     }
 
     /// Describes a basic numeric array where each element has a fixed
-    /// with offset buffer of `offset_byte_width` bytes, followed by a
+    /// with offset buffer of type `T`, followed by a
     /// variable width data buffer
-    pub fn new_binary(offset_byte_width: usize) -> Self {
+    pub fn new_binary<T>() -> Self {
         Self {
             buffers: vec![
                 // offsets
                 BufferSpec::FixedWidth {
-                    byte_width: offset_byte_width,
+                    byte_width: mem::size_of::<T>(),
+                    alignment: mem::align_of::<T>(),
                 },
                 // values
                 BufferSpec::VariableWidth,
@@ -1628,8 +1655,18 @@ impl DataTypeLayout {
 /// Layout specification for a single data type buffer
 #[derive(Debug, PartialEq, Eq)]
 pub enum BufferSpec {
-    /// each element has a fixed width
-    FixedWidth { byte_width: usize },
+    /// Each element is a fixed width primitive, with the given `byte_width` 
and `alignment`
+    ///
+    /// `alignment` is the alignment required by Rust for an array of the 
corresponding primitive,
+    /// see [`Layout::array`](std::alloc::Layout::array) and 
[`std::mem::align_of`].
+    ///
+    /// Arrow-rs requires that all buffers are have at least this alignment, 
to allow for
+    /// [slice](std::slice) based APIs. We do not require alignment in excess 
of this to allow
+    /// for array slicing, and interoperability with `Vec` which in the 
absence of support
+    /// for custom allocators, cannot be over-aligned.
+    ///
+    /// Note that these alignment requirements will vary between architectures
+    FixedWidth { byte_width: usize, alignment: usize },
     /// Variable width, such as string data for utf8 data
     VariableWidth,
     /// Buffer holds a bitmap.
@@ -1741,6 +1778,15 @@ impl ArrayDataBuilder {
     /// apply.
     #[allow(clippy::let_and_return)]
     pub unsafe fn build_unchecked(self) -> ArrayData {
+        let data = self.build_impl();
+        // Provide a force_validate mode
+        #[cfg(feature = "force_validate")]
+        data.validate_data().unwrap();
+        data
+    }
+
+    /// Same as [`Self::build_unchecked`] but ignoring `force_validate` 
feature flag
+    unsafe fn build_impl(self) -> ArrayData {
         let nulls = self.nulls.or_else(|| {
             let buffer = self.null_bit_buffer?;
             let buffer = BooleanBuffer::new(buffer, self.offset, self.len);
@@ -1750,26 +1796,41 @@ impl ArrayDataBuilder {
             })
         });
 
-        let data = ArrayData {
+        ArrayData {
             data_type: self.data_type,
             len: self.len,
             offset: self.offset,
             buffers: self.buffers,
             child_data: self.child_data,
             nulls: nulls.filter(|b| b.null_count() != 0),
-        };
-
-        // Provide a force_validate mode
-        #[cfg(feature = "force_validate")]
-        data.validate_data().unwrap();
-        data
+        }
     }
 
     /// Creates an array data, validating all inputs
-    #[allow(clippy::let_and_return)]
     pub fn build(self) -> Result<ArrayData, ArrowError> {
-        let data = unsafe { self.build_unchecked() };
-        #[cfg(not(feature = "force_validate"))]
+        let data = unsafe { self.build_impl() };
+        data.validate_data()?;
+        Ok(data)
+    }
+
+    /// Creates an array data, validating all inputs, and aligning any buffers
+    ///
+    /// Rust requires that arrays are aligned to their corresponding primitive,
+    /// see [`Layout::array`](std::alloc::Layout::array) and 
[`std::mem::align_of`].
+    ///
+    /// [`ArrayData`] therefore requires that all buffers are have at least 
this alignment,
+    /// to allow for [slice](std::slice) based APIs. See 
[`BufferSpec::FixedWidth`].
+    ///
+    /// As this alignment is architecture specific, and not guaranteed by all 
arrow implementations,
+    /// this method is provided to automatically copy buffers to a new 
correctly aligned allocation
+    /// when necessary, making it useful when interacting with buffers 
produced by other systems,
+    /// e.g. IPC or FFI.
+    ///
+    /// This is unlike `[Self::build`] which will instead return an error on 
encountering
+    /// insufficiently aligned buffers.
+    pub fn build_aligned(self) -> Result<ArrayData, ArrowError> {
+        let mut data = unsafe { self.build_impl() };
+        data.align_buffers();
         data.validate_data()?;
         Ok(data)
     }
@@ -2057,4 +2118,31 @@ mod tests {
             assert_eq!(buffers.len(), layout.buffers.len());
         }
     }
+
+    #[test]
+    fn test_alignment() {
+        let buffer = Buffer::from_vec(vec![1_i32, 2_i32, 3_i32]);
+        let sliced = buffer.slice(1);
+
+        let mut data = ArrayData {
+            data_type: DataType::Int32,
+            len: 0,
+            offset: 0,
+            buffers: vec![buffer],
+            child_data: vec![],
+            nulls: None,
+        };
+        data.validate_full().unwrap();
+
+        data.buffers[0] = sliced;
+        let err = data.validate().unwrap_err();
+
+        assert_eq!(
+            err.to_string(),
+            "Invalid argument error: Misaligned buffers[0] in array of type 
Int32, offset from expected alignment of 4 by 1"
+        );
+
+        data.align_buffers();
+        data.validate_full().unwrap();
+    }
 }
diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index 0908d580d5..b7d328977d 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -20,7 +20,6 @@
 //! The `FileReader` and `StreamReader` have similar interfaces,
 //! however the `FileReader` expects a reader that supports `Seek`ing
 
-use arrow_buffer::i256;
 use flatbuffers::VectorIter;
 use std::collections::HashMap;
 use std::fmt;
@@ -129,7 +128,7 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> 
Result<ArrayRef, Arr
                 .offset(0)
                 .add_child_data(run_ends.into_data())
                 .add_child_data(values.into_data())
-                .build()?;
+                .build_aligned()?;
 
             Ok(make_array(data))
         }
@@ -202,7 +201,7 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> 
Result<ArrayRef, Arr
             let data = ArrayData::builder(data_type.clone())
                 .len(length as usize)
                 .offset(0)
-                .build()
+                .build_aligned()
                 .unwrap();
             // no buffer increases
             Ok(Arc::new(NullArray::from(data)))
@@ -231,54 +230,17 @@ fn create_primitive_array(
                 .len(length)
                 .buffers(buffers[1..3].to_vec())
                 .null_bit_buffer(null_buffer)
-                .build()?
+                .build_aligned()?
         }
-        Int8
-        | Int16
-        | Int32
-        | UInt8
-        | UInt16
-        | UInt32
-        | Time32(_)
-        | Date32
-        | Interval(IntervalUnit::YearMonth)
-        | Interval(IntervalUnit::DayTime)
-        | FixedSizeBinary(_)
-        | Boolean
-        | Int64
-        | UInt64
-        | Float32
-        | Float64
-        | Time64(_)
-        | Timestamp(_, _)
-        | Date64
-        | Duration(_) => {
+        _ if data_type.is_primitive()
+            || matches!(data_type, Boolean | FixedSizeBinary(_)) =>
+        {
             // read 2 buffers: null buffer (optional) and data buffer
             ArrayData::builder(data_type.clone())
                 .len(length)
                 .add_buffer(buffers[1].clone())
                 .null_bit_buffer(null_buffer)
-                .build()?
-        }
-        Interval(IntervalUnit::MonthDayNano) | Decimal128(_, _) => {
-            let buffer = get_aligned_buffer::<i128>(&buffers[1], length);
-
-            // read 2 buffers: null buffer (optional) and data buffer
-            ArrayData::builder(data_type.clone())
-                .len(length)
-                .add_buffer(buffer)
-                .null_bit_buffer(null_buffer)
-                .build()?
-        }
-        Decimal256(_, _) => {
-            let buffer = get_aligned_buffer::<i256>(&buffers[1], length);
-
-            // read 2 buffers: null buffer (optional) and data buffer
-            ArrayData::builder(data_type.clone())
-                .len(length)
-                .add_buffer(buffer)
-                .null_bit_buffer(null_buffer)
-                .build()?
+                .build_aligned()?
         }
         t => unreachable!("Data type {:?} either unsupported or not 
primitive", t),
     };
@@ -286,28 +248,10 @@ fn create_primitive_array(
     Ok(make_array(array_data))
 }
 
-/// Checks if given `Buffer` is properly aligned with `T`.
-/// If not, copying the data and padded it for alignment.
-fn get_aligned_buffer<T>(buffer: &Buffer, length: usize) -> Buffer {
-    let ptr = buffer.as_ptr();
-    let align_req = std::mem::align_of::<T>();
-    let align_offset = ptr.align_offset(align_req);
-    // The buffer is not aligned properly. The writer might use a smaller 
alignment
-    // e.g. 8 bytes, but on some platform (e.g. ARM) i128 requires 16 bytes 
alignment.
-    // We need to copy the buffer as fallback.
-    if align_offset != 0 {
-        let len_in_bytes = (length * 
std::mem::size_of::<T>()).min(buffer.len());
-        let slice = &buffer.as_slice()[0..len_in_bytes];
-        Buffer::from_slice_ref(slice)
-    } else {
-        buffer.clone()
-    }
-}
-
 /// Reads the correct number of buffers based on list type and null_count, and 
creates a
 /// list array ref
 fn create_list_array(
-    field_node: &crate::FieldNode,
+    field_node: &FieldNode,
     data_type: &DataType,
     buffers: &[Buffer],
     child_array: ArrayRef,
@@ -329,13 +273,13 @@ fn create_list_array(
 
         _ => unreachable!("Cannot create list or map array from {:?}", 
data_type),
     };
-    Ok(make_array(builder.build()?))
+    Ok(make_array(builder.build_aligned()?))
 }
 
 /// Reads the correct number of buffers based on list type and null_count, and 
creates a
 /// list array ref
 fn create_dictionary_array(
-    field_node: &crate::FieldNode,
+    field_node: &FieldNode,
     data_type: &DataType,
     buffers: &[Buffer],
     value_array: ArrayRef,
@@ -348,7 +292,7 @@ fn create_dictionary_array(
             .add_child_data(value_array.into_data())
             .null_bit_buffer(null_buffer);
 
-        Ok(make_array(builder.build()?))
+        Ok(make_array(builder.build_aligned()?))
     } else {
         unreachable!("Cannot create dictionary array from {:?}", data_type)
     }
@@ -1097,10 +1041,11 @@ impl<R: Read> RecordBatchReader for StreamReader<R> {
 
 #[cfg(test)]
 mod tests {
-    use crate::writer::unslice_run_array;
+    use crate::writer::{unslice_run_array, DictionaryTracker, 
IpcDataGenerator};
 
     use super::*;
 
+    use crate::root_as_message;
     use arrow_array::builder::{PrimitiveRunBuilder, UnionBuilder};
     use arrow_array::types::*;
     use arrow_buffer::ArrowNativeType;
@@ -1357,8 +1302,7 @@ mod tests {
         writer.finish().unwrap();
         drop(writer);
 
-        let mut reader =
-            crate::reader::FileReader::try_new(std::io::Cursor::new(buf), 
None).unwrap();
+        let mut reader = FileReader::try_new(std::io::Cursor::new(buf), 
None).unwrap();
         reader.next().unwrap().unwrap()
     }
 
@@ -1704,4 +1648,40 @@ mod tests {
         let output_batch = roundtrip_ipc_stream(&input_batch);
         assert_eq!(input_batch, output_batch);
     }
+
+    #[test]
+    fn test_unaligned() {
+        let batch = RecordBatch::try_from_iter(vec![(
+            "i32",
+            Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
+        )])
+        .unwrap();
+
+        let gen = IpcDataGenerator {};
+        let mut dict_tracker = DictionaryTracker::new(false);
+        let (_, encoded) = gen
+            .encoded_batch(&batch, &mut dict_tracker, &Default::default())
+            .unwrap();
+
+        let message = root_as_message(&encoded.ipc_message).unwrap();
+
+        // Construct an unaligned buffer
+        let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() 
+ 1);
+        buffer.push(0_u8);
+        buffer.extend_from_slice(&encoded.arrow_data);
+        let b = Buffer::from(buffer).slice(1);
+        assert_ne!(b.as_ptr().align_offset(8), 0);
+
+        let ipc_batch = message.header_as_record_batch().unwrap();
+        let roundtrip = read_record_batch(
+            &b,
+            ipc_batch,
+            batch.schema(),
+            &Default::default(),
+            None,
+            &message.version(),
+        )
+        .unwrap();
+        assert_eq!(batch, roundtrip);
+    }
 }
diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs
index 59657bc4be..1c56613d8f 100644
--- a/arrow-ipc/src/writer.rs
+++ b/arrow-ipc/src/writer.rs
@@ -1146,7 +1146,7 @@ fn buffer_need_truncate(
 #[inline]
 fn get_buffer_element_width(spec: &BufferSpec) -> usize {
     match spec {
-        BufferSpec::FixedWidth { byte_width } => *byte_width,
+        BufferSpec::FixedWidth { byte_width, .. } => *byte_width,
         _ => 0,
     }
 }
diff --git a/arrow/tests/array_validation.rs b/arrow/tests/array_validation.rs
index 0d3652a047..fa80db1860 100644
--- a/arrow/tests/array_validation.rs
+++ b/arrow/tests/array_validation.rs
@@ -56,7 +56,9 @@ fn test_bad_number_of_buffers() {
 }
 
 #[test]
-#[should_panic(expected = "integer overflow computing min buffer size")]
+#[should_panic(
+    expected = "Need at least 18446744073709551615 bytes in buffers[0] in 
array of type Int64, but got 8"
+)]
 fn test_fixed_width_overflow() {
     let buffer = Buffer::from_slice_ref([0i32, 2i32]);
     ArrayData::try_new(DataType::Int64, usize::MAX, None, 0, vec![buffer], 
vec![])

Reply via email to