alamb commented on code in PR #6690:
URL: https://github.com/apache/arrow-rs/pull/6690#discussion_r1834572196


##########
arrow-flight/src/encode.rs:
##########
@@ -327,6 +327,10 @@ impl FlightDataEncoder {
 
     /// Encodes batch into one or more `FlightData` messages in self.queue
     fn encode_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        if batch.num_rows() == 0 {

Review Comment:
   What is the purpose of this optimization?
   
   I vaguely remember at least one usecase of encoding empty batches is send 
the schema information. With this change it seems empty batches can no longer 
be encoded
   
   Unless there is a reason to prevent empty batches, I would suggest we remove 
this check here and instead callers of the encoder can decide to filter out 
empty batches if they want



##########
arrow-flight/src/encode.rs:
##########
@@ -1726,59 +1680,49 @@ mod tests {
 
     /// Coverage for <https://github.com/apache/arrow-rs/issues/3478>
     ///
-    /// Encodes the specified batch using several values of
-    /// `max_flight_data_size` between 1K to 5K and ensures that the
-    /// resulting size of the flight data stays within the limit
-    /// + `allowed_overage`
-    ///
-    /// `allowed_overage` is how far off the actual data encoding is
-    /// from the target limit that was set. It is an improvement when
-    /// the allowed_overage decreses.
-    ///
-    /// Note this overhead will likely always be greater than zero to
-    /// account for encoding overhead such as IPC headers and padding.
+    /// Encodes the specified batch using several values of 
`max_flight_data_size` between 1K to 5K
+    /// and ensures that the resulting size of the flight data stays within 
the limit, except for
+    /// in cases where only 1 row is sent - if only 1 row is sent, then we 
know that there was no
+    /// way to keep the data within the limit (since the minimum possible 
amount of data was sent),
+    /// so we allow it to go over.
     ///
-    ///
-    async fn verify_encoded_split(batch: RecordBatch, allowed_overage: usize) {
+    async fn verify_encoded_split_no_overage(batch: RecordBatch) {
         let num_rows = batch.num_rows();
 
-        // Track the overall required maximum overage
-        let mut max_overage_seen = 0;
-
         for max_flight_data_size in [1024, 2021, 5000] {
             println!("Encoding {num_rows} with a maximum size of 
{max_flight_data_size}");
 
-            let mut stream = FlightDataEncoderBuilder::new()
+            let stream = FlightDataEncoderBuilder::new()
                 .with_max_flight_data_size(max_flight_data_size)
                 // use 8-byte alignment - default alignment is 64 which 
produces bigger ipc data
                 .with_options(IpcWriteOptions::try_new(8, false, 
MetadataVersion::V5).unwrap())
                 .build(futures::stream::iter([Ok(batch.clone())]));
 
+            let mut stream = FlightDataDecoder::new(stream);
+
             let mut i = 0;
             while let Some(data) = stream.next().await.transpose().unwrap() {
-                let actual_data_size = flight_data_size(&data);
+                let actual_data_size = flight_data_size(&data.inner);
 
                 let actual_overage = 
actual_data_size.saturating_sub(max_flight_data_size);
 
-                assert!(
-                    actual_overage <= allowed_overage,
-                    "encoded data[{i}]: actual size {actual_data_size}, \
-                         actual_overage: {actual_overage} \
-                         allowed_overage: {allowed_overage}"
-                );
+                let is_1_row =
+                    matches!(data.payload, DecodedPayload::RecordBatch(rb) if 
rb.num_rows() == 1);
+
+                // If only 1 row was sent over via this recordBatch, there was 
no way to avoid
+                // going over the limit. There's currently no mechanism for 
splitting a single row
+                // of results over multiple messages, so we allow going over 
the limit if it's the

Review Comment:
   this makes sense -- could you also put this caveat in the documentation for 
`max_encoding_message_size` ? (we can do it as a follow on PR too)
   
   
https://docs.rs/arrow-flight/latest/arrow_flight/flight_service_client/struct.FlightServiceClient.html#method.max_encoding_message_size



##########
arrow-flight/src/encode.rs:
##########
@@ -700,6 +682,9 @@ mod tests {
     use super::*;
 
     #[test]
+    // flight_data_from_arrow_batch is deprecated but does exactly what we 
need. Would probably be

Review Comment:
   Why not continue to use `make_flight_data` (which seems like is exactly what 
this comment proposes -- a copy of `flight_data_from_arrow_batch`?
   
   This change seems like it would only make the final removal of 
`flight_data_from_arrow_batch` harder (would have to re-create 
`make_flight_data`)
   
   



##########
arrow-flight/src/encode.rs:
##########
@@ -1485,93 +1481,62 @@ mod tests {
         hydrate_dictionaries(&batch, batch.schema()).expect("failed to 
optimize");
     }
 
-    pub fn make_flight_data(
-        batch: &RecordBatch,
-        options: &IpcWriteOptions,
-    ) -> (Vec<FlightData>, FlightData) {
-        let data_gen = IpcDataGenerator::default();
-        let mut dictionary_tracker = 
DictionaryTracker::new_with_preserve_dict_id(false, true);
-
-        let (encoded_dictionaries, encoded_batch) = data_gen
-            .encoded_batch(batch, &mut dictionary_tracker, options)
-            .expect("DictionaryTracker configured above to not error on 
replacement");
-
-        let flight_dictionaries = 
encoded_dictionaries.into_iter().map(Into::into).collect();
-        let flight_batch = encoded_batch.into();
-
-        (flight_dictionaries, flight_batch)
-    }
+    #[tokio::test]
+    async fn test_split_batch_for_grpc_response() {
+        async fn get_decoded(schema: SchemaRef, encoded: Vec<EncodedData>) -> 
Vec<RecordBatch> {
+            FlightDataDecoder::new(futures::stream::iter(
+                std::iter::once(SchemaAsIpc::new(&schema, 
&IpcWriteOptions::default()).into())
+                    .chain(encoded.into_iter().map(FlightData::from))
+                    .map(Ok),
+            ))
+            .collect::<Vec<Result<_>>>()
+            .await
+            .into_iter()
+            .map(|r| r.unwrap())
+            .filter_map(|data| match data.payload {
+                DecodedPayload::RecordBatch(rb) => Some(rb),
+                _ => None,
+            })
+            .collect()
+        }
 
-    #[test]
-    fn test_split_batch_for_grpc_response() {
         let max_flight_data_size = 1024;
+        let write_opts = IpcWriteOptions::default();
+        let mut dict_tracker = DictionaryTracker::new(false);
+        let gen = IpcDataGenerator {};
 
         // no split
         let c = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]);
         let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c) as 
ArrayRef)])
             .expect("cannot create record batch");
-        let split = split_batch_for_grpc_response(batch.clone(), 
max_flight_data_size);
+        let split = gen
+            .encoded_batch_with_size(&batch, &mut dict_tracker, &write_opts, 
max_flight_data_size)
+            .unwrap()
+            .1;
         assert_eq!(split.len(), 1);
-        assert_eq!(batch, split[0]);
+        assert_eq!(batch, get_decoded(batch.schema(), split).await[0]);
 
         // split once
         let n_rows = max_flight_data_size + 1;
         assert!(n_rows % 2 == 1, "should be an odd number");
         let c = UInt8Array::from((0..n_rows).map(|i| (i % 256) as 
u8).collect::<Vec<_>>());
         let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c) as 
ArrayRef)])
             .expect("cannot create record batch");
-        let split = split_batch_for_grpc_response(batch.clone(), 
max_flight_data_size);
-        assert_eq!(split.len(), 3);
+        let split = gen
+            .encoded_batch_with_size(&batch, &mut dict_tracker, &write_opts, 
max_flight_data_size)
+            .unwrap()
+            .1;
+        assert_eq!(split.len(), 2);
+        let batches = get_decoded(batch.schema(), split).await;
         assert_eq!(
-            split.iter().map(|batch| batch.num_rows()).sum::<usize>(),
+            batches.iter().map(RecordBatch::num_rows).sum::<usize>(),
             n_rows
         );
-        let a = pretty_format_batches(&split).unwrap().to_string();
+        let a = pretty_format_batches(&batches).unwrap().to_string();
         let b = pretty_format_batches(&[batch]).unwrap().to_string();
         assert_eq!(a, b);
     }
 
-    #[test]

Review Comment:
   Can you please explain why this test removed?



##########
arrow-integration-testing/src/flight_client_scenarios/integration_test.rs:
##########
@@ -125,16 +125,9 @@ async fn send_batch(
     batch: &RecordBatch,
     options: &writer::IpcWriteOptions,
 ) -> Result {
-    let data_gen = writer::IpcDataGenerator::default();
-    let mut dictionary_tracker = 
writer::DictionaryTracker::new_with_preserve_dict_id(false, true);
-
-    let (encoded_dictionaries, encoded_batch) = data_gen
-        .encoded_batch(batch, &mut dictionary_tracker, options)
-        .expect("DictionaryTracker configured above to not error on 
replacement");
-
-    let dictionary_flight_data: Vec<FlightData> =
-        encoded_dictionaries.into_iter().map(Into::into).collect();
-    let mut batch_flight_data: FlightData = encoded_batch.into();
+    #[allow(deprecated)]

Review Comment:
   It seems unfortunate that we are reverting back to the deprecated method. 
Maybe we can figure out some non deprecated API 🤔 



##########
arrow-flight/src/encode.rs:
##########
@@ -1679,9 +1637,7 @@ mod tests {
 
         let batch = RecordBatch::try_from_iter(vec![("a1", Arc::new(array) as 
_)]).unwrap();
 
-        // overage is much higher than ideal
-        // https://github.com/apache/arrow-rs/issues/3478

Review Comment:
   🎉 



##########
arrow-buffer/src/buffer/immutable.rs:
##########
@@ -261,11 +261,11 @@ impl Buffer {
     }
 
     /// Returns a slice of this buffer starting at a certain bit offset.
-    /// If the offset is byte-aligned the returned buffer is a shallow clone,
+    /// If the offset and length are byte-aligned the returned buffer is a 
shallow clone,
     /// otherwise a new buffer is allocated and filled with a copy of the bits 
in the range.
     pub fn bit_slice(&self, offset: usize, len: usize) -> Self {
-        if offset % 8 == 0 {
-            return self.slice(offset / 8);
+        if offset % 8 == 0 && len % 8 == 0 {

Review Comment:
   It seems like it may fix a bug where the length is incorrectly set after 
slice if the offset is zero 👍 
   
   Can you please also add a unit tests showing this bug fix (aka a unit test 
in immutable.rs)
   
   
   
   
   
   
   I verified that the tests in this PR fail after this change:
   ```
   Encoding 1023 with a maximum size of 1024
   test encode::tests::flight_data_size_string_dictionary ... FAILED
   
   failures:
   
   failures:
       encode::tests::flight_data_size_string_dictionary
   
   test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 28 filtered 
out; finished in 0.00s
   
   
   --- STDERR:              arrow-flight 
encode::tests::flight_data_size_string_dictionary ---
   thread 'encode::tests::flight_data_size_string_dictionary' panicked at 
arrow-flight/src/encode.rs:1717:21:
   assertion `left == right` failed: encoded data[1]: actual size 1136, 
actual_overage: 112
     left: 112
    right: 0
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to