alamb commented on code in PR #17162:
URL: https://github.com/apache/datafusion/pull/17162#discussion_r2274470625


##########
datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs:
##########
@@ -736,6 +737,101 @@ async fn test_topk_dynamic_filter_pushdown() {
     );
 }
 
+#[tokio::test]
+async fn test_topk_dynamic_filter_pushdown_multi_column_sort() {
+    let batches = vec![
+        // We are going to do ORDER BY b ASC NULLS LAST, a DESC
+        // And we put the values in such a way that the first batch will fill 
the TopK
+        // and we skip the second batch.
+        record_batch!(
+            ("a", Utf8, ["ac", "ad"]),
+            ("b", Utf8, ["bb", "ba"]),
+            ("c", Float64, [2.0, 1.0])
+        )
+        .unwrap(),
+        record_batch!(
+            ("a", Utf8, ["aa", "ab"]),
+            ("b", Utf8, ["bc", "bd"]),
+            ("c", Float64, [1.0, 2.0])
+        )
+        .unwrap(),
+    ];
+    let scan = TestScanBuilder::new(schema())
+        .with_support(true)
+        .with_batches(batches)
+        .build();
+    let plan = Arc::new(
+        SortExec::new(
+            LexOrdering::new(vec![
+                PhysicalSortExpr::new(
+                    col("b", &schema()).unwrap(),
+                    SortOptions::new(false, false), // ascending, nulls_first
+                ),
+                PhysicalSortExpr::new(
+                    col("a", &schema()).unwrap(),
+                    SortOptions::new(true, true), // ascending, nulls_last
+                ),
+            ])
+            .unwrap(),
+            Arc::clone(&scan),
+        )
+        .with_fetch(Some(2)),
+    ) as Arc<dyn ExecutionPlan>;
+
+    // expect the predicate to be pushed down into the DataSource
+    insta::assert_snapshot!(
+        OptimizationTest::new(Arc::clone(&plan), 
FilterPushdown::new_post_optimization(), true),
+        @r"
+    OptimizationTest:
+      input:
+        - SortExec: TopK(fetch=2), expr=[b@1 ASC NULLS LAST, a@0 DESC], 
preserve_partitioning=[false]
+        -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true
+      output:
+        Ok:
+          - SortExec: TopK(fetch=2), expr=[b@1 ASC NULLS LAST, a@0 DESC], 
preserve_partitioning=[false]
+          -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true, 
predicate=DynamicFilterPhysicalExpr [ true ]
+    "
+    );
+
+    // Actually apply the optimization to the plan and put some data through 
it to check that the filter is updated to reflect the TopK state
+    let mut config = ConfigOptions::default();
+    config.execution.parquet.pushdown_filters = true;
+    let plan = FilterPushdown::new_post_optimization()
+        .optimize(plan, &config)
+        .unwrap();
+    let config = SessionConfig::new().with_batch_size(2);
+    let session_ctx = SessionContext::new_with_config(config);
+    session_ctx.register_object_store(
+        ObjectStoreUrl::parse("test://").unwrap().as_ref(),
+        Arc::new(InMemory::new()),
+    );
+    let state = session_ctx.state();
+    let task_ctx = state.task_ctx();
+    let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap();
+    // Iterate one batch
+    let res = stream.next().await.unwrap().unwrap();
+    #[rustfmt::skip]
+    let expected = [
+        "+----+----+-----+",
+        "| a  | b  | c   |",
+        "+----+----+-----+",
+        "| ad | ba | 1.0 |",
+        "| ac | bb | 2.0 |",
+        "+----+----+-----+",
+    ];
+    assert_batches_eq!(expected, &[res]);
+    // Now check what our filter looks like
+    insta::assert_snapshot!(

Review Comment:
   this is pretty cool (that you can see the predicate after the plan runs)



##########
datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs:
##########
@@ -736,6 +737,101 @@ async fn test_topk_dynamic_filter_pushdown() {
     );
 }
 
+#[tokio::test]
+async fn test_topk_dynamic_filter_pushdown_multi_column_sort() {
+    let batches = vec![
+        // We are going to do ORDER BY b ASC NULLS LAST, a DESC
+        // And we put the values in such a way that the first batch will fill 
the TopK
+        // and we skip the second batch.
+        record_batch!(
+            ("a", Utf8, ["ac", "ad"]),
+            ("b", Utf8, ["bb", "ba"]),
+            ("c", Float64, [2.0, 1.0])
+        )
+        .unwrap(),
+        record_batch!(
+            ("a", Utf8, ["aa", "ab"]),
+            ("b", Utf8, ["bc", "bd"]),
+            ("c", Float64, [1.0, 2.0])
+        )
+        .unwrap(),
+    ];
+    let scan = TestScanBuilder::new(schema())
+        .with_support(true)
+        .with_batches(batches)
+        .build();
+    let plan = Arc::new(
+        SortExec::new(
+            LexOrdering::new(vec![
+                PhysicalSortExpr::new(
+                    col("b", &schema()).unwrap(),
+                    SortOptions::new(false, false), // ascending, nulls_first

Review Comment:
   FWIW you can use the builder style API here to make the code more self 
documenting
   
   ```suggestion
                       SortOptions::default().asc().nulls_first(),
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to