alamb commented on code in PR #5260: URL: https://github.com/apache/arrow-datafusion/pull/5260#discussion_r1103787835
########## datafusion/core/src/physical_plan/file_format/parquet.rs: ########## @@ -1121,8 +1158,12 @@ mod tests { let filter = col("c2").eq(lit(2_i64)).or(col("c2").eq(lit(1_i64))); // read/write them files: - let rt = - round_trip(vec![batch1, batch2], None, None, Some(filter), true, false).await; + let rt = RoundTrip::new() Review Comment: I think this construction is clearer about what combination of options are being tested ########## datafusion/core/src/physical_plan/file_format/parquet.rs: ########## @@ -834,82 +834,116 @@ mod tests { parquet_exec: Arc<ParquetExec>, } - /// writes each RecordBatch as an individual parquet file and re-reads - /// the data back. Returns the data as [RecordBatch]es - async fn round_trip_to_parquet( - batches: Vec<RecordBatch>, + /// round-trip record batches by writing each individual RecordBatch to + /// a parquet file and then reading that parquet file with the specified + /// options + #[derive(Debug, Default)] + struct RoundTrip { projection: Option<Vec<usize>>, schema: Option<SchemaRef>, predicate: Option<Expr>, pushdown_predicate: bool, - ) -> Result<Vec<RecordBatch>> { - round_trip( - batches, - projection, - schema, - predicate, - pushdown_predicate, - false, - ) - .await - .batches + page_index_predicate: bool, } - /// Writes each RecordBatch as an individual parquet file and then - /// reads them back. Returns the parquet exec as well as the data - /// as [RecordBatch]es - async fn round_trip( - batches: Vec<RecordBatch>, - projection: Option<Vec<usize>>, - schema: Option<SchemaRef>, - predicate: Option<Expr>, - pushdown_predicate: bool, - page_index_predicate: bool, - ) -> RoundTripResult { - let file_schema = match schema { - Some(schema) => schema, - None => Arc::new( - Schema::try_merge(batches.iter().map(|b| b.schema().as_ref().clone())) - .unwrap(), - ), - }; + impl RoundTrip { + fn new() -> Self { + Default::default() + } - let (meta, _files) = store_parquet(batches, page_index_predicate).await.unwrap(); - let file_groups = meta.into_iter().map(Into::into).collect(); + fn with_projection(mut self, projection: Vec<usize>) -> Self { + self.projection = Some(projection); + self + } - // prepare the scan - let mut parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: vec![file_groups], - file_schema, - statistics: Statistics::default(), - projection, - limit: None, - table_partition_cols: vec![], - output_ordering: None, - infinite_source: false, - }, - predicate, - None, - ); + fn with_schema(mut self, schema: SchemaRef) -> Self { + self.schema = Some(schema); + self + } - if pushdown_predicate { - parquet_exec = parquet_exec - .with_pushdown_filters(true) - .with_reorder_filters(true); + fn with_predicate(mut self, predicate: Expr) -> Self { + self.predicate = Some(predicate); + self } - if page_index_predicate { - parquet_exec = parquet_exec.with_enable_page_index(true); + fn with_pushdown_predicate(mut self) -> Self { + self.pushdown_predicate = true; + self } - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); - let parquet_exec = Arc::new(parquet_exec); - RoundTripResult { - batches: collect(parquet_exec.clone(), task_ctx).await, - parquet_exec, + fn with_page_index_predicate(mut self) -> Self { + self.page_index_predicate = true; + self + } + + /// run the test, returning only the resulting RecordBatches + async fn round_trip_to_batches( + self, + batches: Vec<RecordBatch>, + ) -> Result<Vec<RecordBatch>> { + self.round_trip(batches).await.batches + } + + /// run the test, returning the `RoundTripResult` + async fn round_trip(self, batches: Vec<RecordBatch>) -> RoundTripResult { Review Comment: I think whitespace blind diff shows the changes more clearly: https://github.com/apache/arrow-datafusion/pull/5260/files?w=1 ########## datafusion/core/src/physical_plan/file_format/parquet.rs: ########## @@ -834,82 +834,116 @@ mod tests { parquet_exec: Arc<ParquetExec>, } - /// writes each RecordBatch as an individual parquet file and re-reads - /// the data back. Returns the data as [RecordBatch]es - async fn round_trip_to_parquet( - batches: Vec<RecordBatch>, + /// round-trip record batches by writing each individual RecordBatch to + /// a parquet file and then reading that parquet file with the specified + /// options + #[derive(Debug, Default)] + struct RoundTrip { projection: Option<Vec<usize>>, schema: Option<SchemaRef>, predicate: Option<Expr>, pushdown_predicate: bool, - ) -> Result<Vec<RecordBatch>> { - round_trip( - batches, - projection, - schema, - predicate, - pushdown_predicate, - false, - ) - .await - .batches + page_index_predicate: bool, } - /// Writes each RecordBatch as an individual parquet file and then - /// reads them back. Returns the parquet exec as well as the data - /// as [RecordBatch]es - async fn round_trip( - batches: Vec<RecordBatch>, - projection: Option<Vec<usize>>, - schema: Option<SchemaRef>, - predicate: Option<Expr>, - pushdown_predicate: bool, - page_index_predicate: bool, - ) -> RoundTripResult { - let file_schema = match schema { - Some(schema) => schema, - None => Arc::new( - Schema::try_merge(batches.iter().map(|b| b.schema().as_ref().clone())) - .unwrap(), - ), - }; + impl RoundTrip { + fn new() -> Self { + Default::default() + } - let (meta, _files) = store_parquet(batches, page_index_predicate).await.unwrap(); - let file_groups = meta.into_iter().map(Into::into).collect(); + fn with_projection(mut self, projection: Vec<usize>) -> Self { + self.projection = Some(projection); + self + } - // prepare the scan - let mut parquet_exec = ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: vec![file_groups], - file_schema, - statistics: Statistics::default(), - projection, - limit: None, - table_partition_cols: vec![], - output_ordering: None, - infinite_source: false, - }, - predicate, - None, - ); + fn with_schema(mut self, schema: SchemaRef) -> Self { + self.schema = Some(schema); + self + } - if pushdown_predicate { - parquet_exec = parquet_exec - .with_pushdown_filters(true) - .with_reorder_filters(true); + fn with_predicate(mut self, predicate: Expr) -> Self { + self.predicate = Some(predicate); + self } - if page_index_predicate { - parquet_exec = parquet_exec.with_enable_page_index(true); + fn with_pushdown_predicate(mut self) -> Self { + self.pushdown_predicate = true; + self } - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); - let parquet_exec = Arc::new(parquet_exec); - RoundTripResult { - batches: collect(parquet_exec.clone(), task_ctx).await, - parquet_exec, + fn with_page_index_predicate(mut self) -> Self { + self.page_index_predicate = true; + self + } + + /// run the test, returning only the resulting RecordBatches + async fn round_trip_to_batches( + self, + batches: Vec<RecordBatch>, + ) -> Result<Vec<RecordBatch>> { + self.round_trip(batches).await.batches + } + + /// run the test, returning the `RoundTripResult` + async fn round_trip(self, batches: Vec<RecordBatch>) -> RoundTripResult { + let Self { + projection, + schema, + predicate, + pushdown_predicate, + page_index_predicate, + } = self; + + let file_schema = match schema { + Some(schema) => schema, + None => Arc::new( + Schema::try_merge( + batches.iter().map(|b| b.schema().as_ref().clone()), + ) + .unwrap(), + ), + }; + + // If testing with page_index_predicate, write parquet + // files with multiple pages + let multi_page = page_index_predicate; Review Comment: I found this logic slightly confusing so I added some comments and tried to make it clearer -- no functional change is intended -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org