itsjunetime commented on code in PR #6690:
URL: https://github.com/apache/arrow-rs/pull/6690#discussion_r1835109766
##########
arrow-flight/src/encode.rs:
##########
@@ -1485,93 +1481,62 @@ mod tests {
hydrate_dictionaries(&batch, batch.schema()).expect("failed to
optimize");
}
- pub fn make_flight_data(
- batch: &RecordBatch,
- options: &IpcWriteOptions,
- ) -> (Vec<FlightData>, FlightData) {
- let data_gen = IpcDataGenerator::default();
- let mut dictionary_tracker =
DictionaryTracker::new_with_preserve_dict_id(false, true);
-
- let (encoded_dictionaries, encoded_batch) = data_gen
- .encoded_batch(batch, &mut dictionary_tracker, options)
- .expect("DictionaryTracker configured above to not error on
replacement");
-
- let flight_dictionaries =
encoded_dictionaries.into_iter().map(Into::into).collect();
- let flight_batch = encoded_batch.into();
-
- (flight_dictionaries, flight_batch)
- }
+ #[tokio::test]
+ async fn test_split_batch_for_grpc_response() {
+ async fn get_decoded(schema: SchemaRef, encoded: Vec<EncodedData>) ->
Vec<RecordBatch> {
+ FlightDataDecoder::new(futures::stream::iter(
+ std::iter::once(SchemaAsIpc::new(&schema,
&IpcWriteOptions::default()).into())
+ .chain(encoded.into_iter().map(FlightData::from))
+ .map(Ok),
+ ))
+ .collect::<Vec<Result<_>>>()
+ .await
+ .into_iter()
+ .map(|r| r.unwrap())
+ .filter_map(|data| match data.payload {
+ DecodedPayload::RecordBatch(rb) => Some(rb),
+ _ => None,
+ })
+ .collect()
+ }
- #[test]
- fn test_split_batch_for_grpc_response() {
let max_flight_data_size = 1024;
+ let write_opts = IpcWriteOptions::default();
+ let mut dict_tracker = DictionaryTracker::new(false);
+ let gen = IpcDataGenerator {};
// no split
let c = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]);
let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c) as
ArrayRef)])
.expect("cannot create record batch");
- let split = split_batch_for_grpc_response(batch.clone(),
max_flight_data_size);
+ let split = gen
+ .encoded_batch_with_size(&batch, &mut dict_tracker, &write_opts,
max_flight_data_size)
+ .unwrap()
+ .1;
assert_eq!(split.len(), 1);
- assert_eq!(batch, split[0]);
+ assert_eq!(batch, get_decoded(batch.schema(), split).await[0]);
// split once
let n_rows = max_flight_data_size + 1;
assert!(n_rows % 2 == 1, "should be an odd number");
let c = UInt8Array::from((0..n_rows).map(|i| (i % 256) as
u8).collect::<Vec<_>>());
let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c) as
ArrayRef)])
.expect("cannot create record batch");
- let split = split_batch_for_grpc_response(batch.clone(),
max_flight_data_size);
- assert_eq!(split.len(), 3);
+ let split = gen
+ .encoded_batch_with_size(&batch, &mut dict_tracker, &write_opts,
max_flight_data_size)
+ .unwrap()
+ .1;
+ assert_eq!(split.len(), 2);
+ let batches = get_decoded(batch.schema(), split).await;
assert_eq!(
- split.iter().map(|batch| batch.num_rows()).sum::<usize>(),
+ batches.iter().map(RecordBatch::num_rows).sum::<usize>(),
n_rows
);
- let a = pretty_format_batches(&split).unwrap().to_string();
+ let a = pretty_format_batches(&batches).unwrap().to_string();
let b = pretty_format_batches(&[batch]).unwrap().to_string();
assert_eq!(a, b);
}
- #[test]
Review Comment:
This test simply checked something that isn't applicable anymore - it
assumes that we can split a single RecordBatch into smaller RecordBatches with
less rows which saturate the given size, but we don't want to verify that we
can do that anymore as we don't want to provide any way to split RecordBatches
without also accomodating for the size of the IPC Headers (and the rest of the
functions in this mod are already doing that).
This test was just mainly there for the `split_batch_for_grpc_response`
function, and that's just not part of the design anymore.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]