alamb commented on code in PR #17162: URL: https://github.com/apache/datafusion/pull/17162#discussion_r2274470625
########## datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs: ########## @@ -736,6 +737,101 @@ async fn test_topk_dynamic_filter_pushdown() { ); } +#[tokio::test] +async fn test_topk_dynamic_filter_pushdown_multi_column_sort() { + let batches = vec![ + // We are going to do ORDER BY b ASC NULLS LAST, a DESC + // And we put the values in such a way that the first batch will fill the TopK + // and we skip the second batch. + record_batch!( + ("a", Utf8, ["ac", "ad"]), + ("b", Utf8, ["bb", "ba"]), + ("c", Float64, [2.0, 1.0]) + ) + .unwrap(), + record_batch!( + ("a", Utf8, ["aa", "ab"]), + ("b", Utf8, ["bc", "bd"]), + ("c", Float64, [1.0, 2.0]) + ) + .unwrap(), + ]; + let scan = TestScanBuilder::new(schema()) + .with_support(true) + .with_batches(batches) + .build(); + let plan = Arc::new( + SortExec::new( + LexOrdering::new(vec![ + PhysicalSortExpr::new( + col("b", &schema()).unwrap(), + SortOptions::new(false, false), // ascending, nulls_first + ), + PhysicalSortExpr::new( + col("a", &schema()).unwrap(), + SortOptions::new(true, true), // ascending, nulls_last + ), + ]) + .unwrap(), + Arc::clone(&scan), + ) + .with_fetch(Some(2)), + ) as Arc<dyn ExecutionPlan>; + + // expect the predicate to be pushed down into the DataSource + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true), + @r" + OptimizationTest: + input: + - SortExec: TopK(fetch=2), expr=[b@1 ASC NULLS LAST, a@0 DESC], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - SortExec: TopK(fetch=2), expr=[b@1 ASC NULLS LAST, a@0 DESC], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] + " + ); + + // Actually apply the optimization to the plan and put some data through it to check that the filter is updated to reflect the TopK state + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + let plan = FilterPushdown::new_post_optimization() + .optimize(plan, &config) + .unwrap(); + let config = SessionConfig::new().with_batch_size(2); + let session_ctx = SessionContext::new_with_config(config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap(); + // Iterate one batch + let res = stream.next().await.unwrap().unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----+-----+", + "| a | b | c |", + "+----+----+-----+", + "| ad | ba | 1.0 |", + "| ac | bb | 2.0 |", + "+----+----+-----+", + ]; + assert_batches_eq!(expected, &[res]); + // Now check what our filter looks like + insta::assert_snapshot!( Review Comment: this is pretty cool (that you can see the predicate after the plan runs) ########## datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs: ########## @@ -736,6 +737,101 @@ async fn test_topk_dynamic_filter_pushdown() { ); } +#[tokio::test] +async fn test_topk_dynamic_filter_pushdown_multi_column_sort() { + let batches = vec![ + // We are going to do ORDER BY b ASC NULLS LAST, a DESC + // And we put the values in such a way that the first batch will fill the TopK + // and we skip the second batch. + record_batch!( + ("a", Utf8, ["ac", "ad"]), + ("b", Utf8, ["bb", "ba"]), + ("c", Float64, [2.0, 1.0]) + ) + .unwrap(), + record_batch!( + ("a", Utf8, ["aa", "ab"]), + ("b", Utf8, ["bc", "bd"]), + ("c", Float64, [1.0, 2.0]) + ) + .unwrap(), + ]; + let scan = TestScanBuilder::new(schema()) + .with_support(true) + .with_batches(batches) + .build(); + let plan = Arc::new( + SortExec::new( + LexOrdering::new(vec![ + PhysicalSortExpr::new( + col("b", &schema()).unwrap(), + SortOptions::new(false, false), // ascending, nulls_first Review Comment: FWIW you can use the builder style API here to make the code more self documenting ```suggestion SortOptions::default().asc().nulls_first(), ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org