jecsand838 commented on code in PR #9397:
URL: https://github.com/apache/arrow-rs/pull/9397#discussion_r2806805039


##########
arrow-avro/src/reader/vlq.rs:
##########
@@ -97,6 +97,34 @@ fn read_varint_slow(buf: &[u8]) -> Option<(u64, usize)> {
     None
 }
 
+pub(crate) fn skip_varint(buf: &[u8]) -> Option<usize> {
+    if let Some(array) = buf.get(..10) {
+        return skip_varint_array(array.try_into().unwrap());
+    }
+    skip_varint_slow(buf)
+}
+
+fn skip_varint_array(buf: [u8; 10]) -> Option<usize> {
+    // Using buf.into_iter().enumerate() regresses performance by 1% on x86-64
+    #[allow(clippy::needless_range_loop)]
+    for idx in 0..10 {
+        if buf[idx] < 0x80 {
+            return Some(idx + 1);
+        }
+    }
+    None
+}

Review Comment:
   I think this introduces a regression due to `skip_varint_array` accepting 
overflowing 10-byte varints because it returns the first terminating byte 
without the overflow guard used by `read_varint (count != 9 || byte < 2)`.
   
   I added this regression test to `arrow-avro/src/reader/mod.rs` to verify:
   
   ```rust
       fn corrupt_first_block_payload_byte(
           mut bytes: Vec<u8>,
           field_offset: usize,
           expected_original: u8,
           replacement: u8,
       ) -> Vec<u8> {
           let mut header_decoder = HeaderDecoder::default();
           let header_len = header_decoder.decode(&bytes).expect("decode 
header");
           assert!(header_decoder.flush().is_some(), "decode complete header");
   
           let mut cursor = &bytes[header_len..];
           let (_, count_len) = 
crate::reader::vlq::read_varint(cursor).expect("decode block count");
           cursor = &cursor[count_len..];
           let (_, size_len) = 
crate::reader::vlq::read_varint(cursor).expect("decode block size");
           let data_start = header_len + count_len + size_len;
           let target = data_start + field_offset;
   
           assert!(
               target < bytes.len(),
               "target byte offset {target} out of bounds for input length {}",
               bytes.len()
           );
           assert_eq!(
               bytes[target], expected_original,
               "unexpected original byte at payload offset {field_offset}"
           );
           bytes[target] = replacement;
           bytes
       }
   
       #[test]
       fn ocf_projection_rejects_overflowing_varint_in_skipped_long_field() {
           // Writer row payload is [bad_long=i64::MIN][keep=7]. The first 
field is encoded as
           // 10-byte VLQ ending in 0x01. Flipping that terminator to 0x02 
creates an overflow
           // varint that must fail.
           let writer_schema = Schema::new(vec![
               Field::new("bad_long", DataType::Int64, false),
               Field::new("keep", DataType::Int32, false),
           ]);
           let batch = RecordBatch::try_new(
               Arc::new(writer_schema.clone()),
               vec![
                   Arc::new(Int64Array::from(vec![i64::MIN])) as ArrayRef,
                   Arc::new(Int32Array::from(vec![7])) as ArrayRef,
               ],
           )
           .expect("build writer batch");
           let bytes = write_ocf(&writer_schema, &[batch]);
           let mutated = corrupt_first_block_payload_byte(bytes, 9, 0x01, 0x02);
   
           let err = ReaderBuilder::new()
               .build(Cursor::new(mutated.clone()))
               .expect("build full reader")
               .collect::<Result<Vec<_>, _>>()
               .expect_err("full decode should reject malformed varint");
           assert!(matches!(err, ArrowError::AvroError(_)));
           assert!(err.to_string().contains("bad varint"));
   
           let err = ReaderBuilder::new()
               .with_projection(vec![1])
               .build(Cursor::new(mutated))
               .expect("build projected reader")
               .collect::<Result<Vec<_>, _>>()
               .expect_err("projection must also reject malformed skipped 
varint");
           assert!(matches!(err, ArrowError::AvroError(_)));
           assert!(err.to_string().contains("bad varint"));
       }
   ```



##########
arrow-avro/src/reader/record.rs:
##########
@@ -2054,15 +2054,15 @@ impl Skipper {
                 Ok(())
             }
             Self::Int32 => {
-                buf.get_int()?;
+                buf.skip_vlq()?;
                 Ok(())
             }

Review Comment:
   Switching skipped `Int32` values from `get_int()` to `skip_vlq()` drops 
`i32` range validation (`u64` -> `u32` overflow check). That means projected 
int-encoded fields can silently accept out of range varints that `decode` still 
rejects with varint overflow. I'm thinking in maybe worth preserving the 
`Int32` overflow validation in the skip path so projection behavior matches 
full decode. However I'm 100% open to a better idea or any feedback incase I'm 
missing something.
   
   I created a `arrow-avro/src/reader/mod.rs` file regression test for this as 
well:
   
   ```rust
       fn corrupt_first_block_payload_byte(
           mut bytes: Vec<u8>,
           field_offset: usize,
           expected_original: u8,
           replacement: u8,
       ) -> Vec<u8> {
           let mut header_decoder = HeaderDecoder::default();
           let header_len = header_decoder.decode(&bytes).expect("decode 
header");
           assert!(header_decoder.flush().is_some(), "decode complete header");
   
           let mut cursor = &bytes[header_len..];
           let (_, count_len) = 
crate::reader::vlq::read_varint(cursor).expect("decode block count");
           cursor = &cursor[count_len..];
           let (_, size_len) = 
crate::reader::vlq::read_varint(cursor).expect("decode block size");
           let data_start = header_len + count_len + size_len;
           let target = data_start + field_offset;
   
           assert!(
               target < bytes.len(),
               "target byte offset {target} out of bounds for input length {}",
               bytes.len()
           );
           assert_eq!(
               bytes[target], expected_original,
               "unexpected original byte at payload offset {field_offset}"
           );
           bytes[target] = replacement;
           bytes
       }
   
       #[test]
       fn ocf_projection_rejects_i32_overflow_in_skipped_int_field() {
           // Writer row payload is [bad_int=i32::MIN][keep=11]. The first 
field encodes to
           // ff ff ff ff 0f. Flipping 0x0f -> 0x10 keeps a syntactically valid 
varint, but now
           // its value exceeds u32::MAX and must fail Int32 validation even 
when projected out.
           let writer_schema = Schema::new(vec![
               Field::new("bad_int", DataType::Int32, false),
               Field::new("keep", DataType::Int64, false),
           ]);
           let batch = RecordBatch::try_new(
               Arc::new(writer_schema.clone()),
               vec![
                   Arc::new(Int32Array::from(vec![i32::MIN])) as ArrayRef,
                   Arc::new(Int64Array::from(vec![11])) as ArrayRef,
               ],
           )
           .expect("build writer batch");
           let bytes = write_ocf(&writer_schema, &[batch]);
           let mutated = corrupt_first_block_payload_byte(bytes, 4, 0x0f, 0x10);
   
           let err = ReaderBuilder::new()
               .build(Cursor::new(mutated.clone()))
               .expect("build full reader")
               .collect::<Result<Vec<_>, _>>()
               .expect_err("full decode should reject int overflow");
           assert!(matches!(err, ArrowError::AvroError(_)));
           assert!(err.to_string().contains("varint overflow"));
   
           let err = ReaderBuilder::new()
               .with_projection(vec![1])
               .build(Cursor::new(mutated))
               .expect("build projected reader")
               .collect::<Result<Vec<_>, _>>()
               .expect_err("projection must also reject skipped int overflow");
           assert!(matches!(err, ArrowError::AvroError(_)));
           assert!(err.to_string().contains("varint overflow"));
       }
   ```



##########
arrow-avro/benches/project_record.rs:
##########
@@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use apache_avro::types::Value;
+use apache_avro::{Schema as ApacheSchema, to_avro_datum};

Review Comment:
   @mzabaluev Is it possible remove `apache_avro`? I had planned to remove it 
from the crate, but haven't gotten around to it yet. Refer to: 
https://github.com/apache/arrow-rs/pull/8079#issuecomment-3164861808



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to