xudong963 commented on code in PR #22024:
URL: https://github.com/apache/datafusion/pull/22024#discussion_r3224060302
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -2714,4 +2756,112 @@ mod test {
"without page index all rows are returned"
);
}
+
+ /// End-to-end: a parquet file with 4 row groups, scanned with
+ /// `row_group_fraction = 0.5`, should return rows from exactly 2
+ /// of the 4 row groups.
+ #[tokio::test]
+ async fn row_group_sampling_end_to_end() {
+ let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+
+ // 4 row groups of 3 rows each = 12 rows total.
+ let batches = (0..4)
+ .map(|g| {
+ record_batch!((
+ "a",
+ Int32,
+ vec![Some(g * 10 + 1), Some(g * 10 + 2), Some(g * 10 + 3),]
+ ))
+ .unwrap()
+ })
+ .collect::<Vec<_>>();
+ let schema = batches[0].schema();
+ let props = WriterProperties::builder()
+ .set_max_row_group_row_count(Some(3))
+ .build();
+
+ let data_len = write_parquet_batches(
+ Arc::clone(&store),
+ "rg_sampled.parquet",
+ batches,
+ Some(props),
+ )
+ .await;
+
+ let file = PartitionedFile::new(
+ "rg_sampled.parquet".to_string(),
+ u64::try_from(data_len).unwrap(),
+ );
+
+ let sampling = crate::sampling::ParquetSampling {
+ row_group_fraction: Some(0.5),
+ ..Default::default()
+ };
+
+ let opener = ParquetMorselizerBuilder::new()
+ .with_store(Arc::clone(&store))
+ .with_schema(Arc::clone(&schema))
+ .with_projection_indices(&[0])
+ .with_sampling(sampling)
+ .build();
+
+ let stream = open_file(&opener, file).await.unwrap();
+ let (_num_batches, num_rows) = count_batches_and_rows(stream).await;
+
+ // ceil(4 * 0.5) = 2 row groups kept, each with 3 rows.
+ assert_eq!(
+ num_rows, 6,
+ "row_group_fraction=0.5 over 4 row groups should yield 2 row
groups × 3 rows"
+ );
+ }
+
+ /// End-to-end: a single row group of 100 rows scanned with
+ /// `row_fraction = 0.1` and the default cluster size should yield
+ /// roughly 10 rows. The exact count depends on `ceil(100 * 0.1) =
+ /// 10` plus how the windows pack — we assert the count is in the
+ /// expected range and significantly less than 100.
+ #[tokio::test]
+ async fn row_fraction_end_to_end() {
+ let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+
+ // One row group of 100 rows so we exercise the per-row-group
+ // RowSelection, not the row-group-level skip.
+ let values: Vec<Option<i32>> = (0..100).map(Some).collect();
+ let batch = record_batch!(("a", Int32, values)).unwrap();
+ let schema = batch.schema();
+ let data_len =
+ write_parquet(Arc::clone(&store), "rf.parquet",
batch.clone()).await;
+ let file = PartitionedFile::new(
+ "rf.parquet".to_string(),
+ u64::try_from(data_len).unwrap(),
+ );
+
+ let sampling = crate::sampling::ParquetSampling {
+ row_fraction: Some(0.1),
+ row_cluster_size: 4, // small cluster -> several windows
+ ..Default::default()
+ };
+
+ let opener = ParquetMorselizerBuilder::new()
+ .with_store(Arc::clone(&store))
+ .with_schema(Arc::clone(&schema))
+ .with_projection_indices(&[0])
+ .with_sampling(sampling)
+ .build();
+
+ let stream = open_file(&opener, file).await.unwrap();
+ let (_num_batches, num_rows) = count_batches_and_rows(stream).await;
+
+ // We asked for ~10% of 100 rows. ceil(10 / cluster=4) = 3
+ // windows of ceil(10/3)=4 rows each, capped at the total ->
+ // up to 12 rows in practice. Assert the bounds.
+ assert!(
+ (1..100).contains(&num_rows),
+ "row_fraction=0.1 should drop the vast majority of rows; got
{num_rows}"
+ );
+ assert!(
+ num_rows <= 16,
Review Comment:
Will it be flaky?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]