alamb commented on code in PR #5554:
URL: https://github.com/apache/arrow-rs/pull/5554#discussion_r1546746985


##########
arrow-ipc/src/reader.rs:
##########
@@ -628,6 +688,19 @@ impl FileDecoder {
         self
     }
 
+    /// Specify whether to enforce zero copy, which means the array data 
inside the record batches
+    /// produced by this decoder will always reference the input buffer.

Review Comment:
   What do you think about calling this concept / field `require_alignment` 
which describes what is required (rather than how it is enforced)?
   
   It might also help make the intent clearer to users (that the IPC decoder 
will error if the buffers are not aligned properly)
   
   I don't feel super strongly about this, but I was a little confused about 
what `enforce_zero_copy` meant until I found this comment 



##########
arrow-ipc/src/reader.rs:
##########
@@ -1801,11 +1878,56 @@ mod tests {
             &Default::default(),
             None,
             &message.version(),
+            false,
         )
         .unwrap();
         assert_eq!(batch, roundtrip);
     }
 
+    #[test]

Review Comment:
   ❤️ 



##########
arrow-ipc/src/reader.rs:
##########
@@ -396,6 +451,7 @@ pub fn read_record_batch(
     dictionaries_by_id: &HashMap<i64, ArrayRef>,
     projection: Option<&[usize]>,
     metadata: &MetadataVersion,
+    enforce_zero_copy: bool,

Review Comment:
   Since `read_record_batch` is `pub` I think we should also document the 
parameter here as well (maybe link back to the array builder's docs)
   
   https://docs.rs/arrow-ipc/latest/arrow_ipc/reader/fn.read_record_batch.html



##########
arrow-ipc/src/reader.rs:
##########
@@ -1801,11 +1878,56 @@ mod tests {
             &Default::default(),
             None,
             &message.version(),
+            false,

Review Comment:
   does this fail if you pass `true`?
   



##########
arrow-ipc/src/reader.rs:
##########
@@ -1801,11 +1878,56 @@ mod tests {
             &Default::default(),
             None,
             &message.version(),
+            false,
         )
         .unwrap();
         assert_eq!(batch, roundtrip);
     }
 
+    #[test]
+    fn test_unaligned_throws_error_with_enforce_zero_copy() {
+        let batch = RecordBatch::try_from_iter(vec![(
+            "i32",
+            Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
+        )])
+        .unwrap();
+
+        let gen = IpcDataGenerator {};
+        let mut dict_tracker = DictionaryTracker::new(false);
+        let (_, encoded) = gen
+            .encoded_batch(&batch, &mut dict_tracker, &Default::default())
+            .unwrap();
+
+        let message = root_as_message(&encoded.ipc_message).unwrap();
+
+        // Construct an unaligned buffer
+        let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() 
+ 1);
+        buffer.push(0_u8);
+        buffer.extend_from_slice(&encoded.arrow_data);
+        let b = Buffer::from(buffer).slice(1);
+        assert_ne!(b.as_ptr().align_offset(8), 0);
+
+        let ipc_batch = message.header_as_record_batch().unwrap();
+        let result = read_record_batch(
+            &b,
+            ipc_batch,
+            batch.schema(),
+            &Default::default(),
+            None,
+            &message.version(),
+            true,
+        );
+
+        let error = result.unwrap_err();
+        match error {
+            ArrowError::InvalidArgumentError(e) => {
+                assert!(e.contains("Misaligned"));
+                assert!(e.contains("offset from expected alignment of"));
+            }
+            _ => panic!("Expected InvalidArgumentError"),
+        }

Review Comment:
   Another way to test this is to do something like 
   ```rust
   let err = result.unwrap_err();
   assert_eq!(err.to_string(), "expected error string")
   ```
   This also works I just figured I would pass it along



##########
arrow-ipc/src/writer.rs:
##########
@@ -2234,4 +2270,128 @@ mod tests {
         let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
         roundtrip_ensure_sliced_smaller(in_batch, 1000);
     }
+
+    #[test]
+    fn test_decimal128_alignment16() {
+        const IPC_ALIGNMENT: u8 = 16;
+
+        // Test a bunch of different dimensions to ensure alignment is never 
an issue.
+        // For example, if we only test `num_cols = 1` then even with 
alignment 8 this
+        // test would _happen_ to pass, even though for different dimensions 
like
+        // `num_cols = 2` it would fail.
+        for num_cols in 1..100 {
+            let num_rows = (num_cols * 7 + 11) % 100; // Deterministic swizzle
+
+            let mut fields = Vec::new();
+            let mut arrays = Vec::new();
+            for i in 0..num_cols {
+                let field = Field::new(&format!("col_{}", i), 
DataType::Decimal128(38, 10), true);
+                let array = Decimal128Array::from(vec![num_cols as i128; 
num_rows]);
+                fields.push(field);
+                arrays.push(Arc::new(array) as Arc<dyn Array>);
+            }
+            let schema = Schema::new(fields);
+            let batch = RecordBatch::try_new(Arc::new(schema), 
arrays).unwrap();
+
+            let mut writer = FileWriter::try_new_with_options(
+                Vec::new(),
+                batch.schema_ref(),
+                IpcWriteOptions::try_new(IPC_ALIGNMENT, false, 
MetadataVersion::V5).unwrap(),
+            )
+            .unwrap();
+            writer.write(&batch).unwrap();
+            writer.finish().unwrap();
+
+            let out: Vec<u8> = writer.into_inner().unwrap();
+
+            let buffer = Buffer::from_vec(out);
+            let trailer_start = buffer.len() - 10;
+            let footer_len =
+                
read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
+            let footer =
+                root_as_footer(&buffer[trailer_start - 
footer_len..trailer_start]).unwrap();
+
+            let schema = fb_to_schema(footer.schema().unwrap());
+            assert_eq!(&schema, batch.schema().as_ref());
+
+            let decoder =

Review Comment:
   Maybe worth a comment here that the test is that the validation is happening 
here



##########
arrow-ipc/src/reader.rs:
##########
@@ -1801,11 +1878,56 @@ mod tests {
             &Default::default(),
             None,
             &message.version(),
+            false,
         )
         .unwrap();
         assert_eq!(batch, roundtrip);
     }
 
+    #[test]
+    fn test_unaligned_throws_error_with_enforce_zero_copy() {
+        let batch = RecordBatch::try_from_iter(vec![(
+            "i32",
+            Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _,
+        )])
+        .unwrap();
+
+        let gen = IpcDataGenerator {};
+        let mut dict_tracker = DictionaryTracker::new(false);
+        let (_, encoded) = gen
+            .encoded_batch(&batch, &mut dict_tracker, &Default::default())
+            .unwrap();
+
+        let message = root_as_message(&encoded.ipc_message).unwrap();
+
+        // Construct an unaligned buffer
+        let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() 
+ 1);
+        buffer.push(0_u8);

Review Comment:
   👍 



##########
arrow-ipc/src/writer.rs:
##########
@@ -83,13 +84,13 @@ impl IpcWriteOptions {
     }
     /// Try create IpcWriteOptions, checking for incompatible settings
     pub fn try_new(
-        alignment: usize,
+        alignment: u8,

Review Comment:
   this is a breaking API change. Since it is fallable anyways, could you 
perhaps leave the argument a `usize` and throw an error if it is out of range 
(aka `let alignment = u8::try_from(alignment).map_err(|e| ...)`?)



##########
arrow-integration-testing/src/flight_server_scenarios/integration_test.rs:
##########
@@ -308,6 +308,7 @@ async fn record_batch_from_message(
         dictionaries_by_id,
         None,
         &message.version(),
+        false,

Review Comment:
   if these are true, do the tests fail? 



##########
arrow-ipc/src/writer.rs:
##########
@@ -2234,4 +2270,128 @@ mod tests {
         let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
         roundtrip_ensure_sliced_smaller(in_batch, 1000);
     }
+
+    #[test]
+    fn test_decimal128_alignment16() {
+        const IPC_ALIGNMENT: u8 = 16;
+
+        // Test a bunch of different dimensions to ensure alignment is never 
an issue.
+        // For example, if we only test `num_cols = 1` then even with 
alignment 8 this
+        // test would _happen_ to pass, even though for different dimensions 
like
+        // `num_cols = 2` it would fail.
+        for num_cols in 1..100 {

Review Comment:
   do we really need to test 100 columns? That seems overly compute sensitive 
for relatively small additional coverage.  Maybe we could use a few  few 
distinct ones like `1, 17, 23, 73` ?



##########
arrow-ipc/src/reader.rs:
##########
@@ -456,6 +512,7 @@ pub fn read_dictionary(
     schema: &Schema,
     dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
     metadata: &MetadataVersion,
+    enforce_zero_copy: bool,

Review Comment:
   likewise this is a public function as well so I think the parameter should 
be documented



##########
arrow-ipc/src/reader.rs:
##########
@@ -628,6 +688,19 @@ impl FileDecoder {
         self
     }
 
+    /// Specify whether to enforce zero copy, which means the array data 
inside the record batches
+    /// produced by this decoder will always reference the input buffer.
+    ///
+    /// In particular, this means that if there is data that is not aligned 
properly in the
+    /// input buffer, the decoder will throw rather than copy the data to an 
aligned buffer.

Review Comment:
   ```suggestion
       /// input buffer, the decoder will return an error rather than copy the 
data to an aligned buffer.
   ```



##########
arrow-ipc/src/writer.rs:
##########
@@ -2234,4 +2270,128 @@ mod tests {
         let in_batch = RecordBatch::try_new(schema, vec![values]).unwrap();
         roundtrip_ensure_sliced_smaller(in_batch, 1000);
     }
+
+    #[test]
+    fn test_decimal128_alignment16() {
+        const IPC_ALIGNMENT: u8 = 16;
+
+        // Test a bunch of different dimensions to ensure alignment is never 
an issue.
+        // For example, if we only test `num_cols = 1` then even with 
alignment 8 this
+        // test would _happen_ to pass, even though for different dimensions 
like
+        // `num_cols = 2` it would fail.
+        for num_cols in 1..100 {
+            let num_rows = (num_cols * 7 + 11) % 100; // Deterministic swizzle
+
+            let mut fields = Vec::new();
+            let mut arrays = Vec::new();
+            for i in 0..num_cols {
+                let field = Field::new(&format!("col_{}", i), 
DataType::Decimal128(38, 10), true);
+                let array = Decimal128Array::from(vec![num_cols as i128; 
num_rows]);
+                fields.push(field);
+                arrays.push(Arc::new(array) as Arc<dyn Array>);
+            }
+            let schema = Schema::new(fields);
+            let batch = RecordBatch::try_new(Arc::new(schema), 
arrays).unwrap();
+
+            let mut writer = FileWriter::try_new_with_options(
+                Vec::new(),
+                batch.schema_ref(),
+                IpcWriteOptions::try_new(IPC_ALIGNMENT, false, 
MetadataVersion::V5).unwrap(),
+            )
+            .unwrap();
+            writer.write(&batch).unwrap();
+            writer.finish().unwrap();
+
+            let out: Vec<u8> = writer.into_inner().unwrap();
+
+            let buffer = Buffer::from_vec(out);
+            let trailer_start = buffer.len() - 10;
+            let footer_len =
+                
read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
+            let footer =
+                root_as_footer(&buffer[trailer_start - 
footer_len..trailer_start]).unwrap();
+
+            let schema = fb_to_schema(footer.schema().unwrap());
+            assert_eq!(&schema, batch.schema().as_ref());

Review Comment:
   I am not sure how much value these checks add -- they are redundant with 
other code I think and somewhat obscure the point of the test (to set 
with_enforce_zero_copy and read the data back without error)



##########
arrow-ipc/src/reader.rs:
##########
@@ -71,7 +71,11 @@ fn read_buffer(
 ///     - check if the bit width of non-64-bit numbers is 64, and
 ///     - read the buffer as 64-bit (signed integer or float), and
 ///     - cast the 64-bit array to the appropriate data type
-fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, 
ArrowError> {
+fn create_array(

Review Comment:
   Can you please add some documentation on what `enforce_zero_copy` does to 
the docstrings?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to