Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
Omega359 commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-3025461502 Awesome, thanks. Yes, I think a full blog post for this would be amazing but I'd still like to have it in the release blog post with a short summary and maybe a few stats. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
alamb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-3025443600 High level it is described in https://github.com/apache/datafusion/issues/15037 I think @adriangb is considering a writeup about it directly here: - https://github.com/apache/datafusion/issues/15513 (I am standing by to help with both BTW) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
Omega359 commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-3025141986 Can someone (@adriangb maybe?) give me a high level summary of what was improved in this PR for the [DF 49 blog post](https://github.com/apache/datafusion/issues/16347)? The performance improvement highlighted in https://github.com/apache/datafusion/pull/15770#issuecomment-2981184419 seem to make it a worthy candidate for highlighting in the performance improvement section :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
alamb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2981852178 woohoo! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2981552270 > Perhaps we can compare against the current filter and only update the expression if it is greater / more selective? Yeah I think that would be good. For context (I had to remember for a sec): the filter itself is shared across partitions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
Dandandan commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2981524048 Hm @adriangb another thing I wondered is `update_filter` does seem to take only the heap of the current partition into account, as in TopK (currently at least) each partition has it's own partition. Perhaps we can compare against the current filter and only update the expression if it is greater? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb merged PR #15770: URL: https://github.com/apache/datafusion/pull/15770 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
Dandandan commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2981406974 https://github.com/user-attachments/assets/b96882b9-f3da-4d9a-8635-ba77bdf8fbf3"; /> Well, it looks like for these benchmarks 95% is now spent on just scanning the data and only 5% elsewhere, so I guess we need to focus there :D -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
Dandandan commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2981292465 I'll also run some profiling on those topk benchmarks to see if there is any further low hanging fruit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2981275327 > I think the speed up changes a bit due to more partitions And maybe https://github.com/apache/datafusion/pull/16424 will speed up the wide partitions case by stopping those scans early! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
Dandandan commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2981263802 > > π€: Benchmark completed > > Details > > Very nice improvement even without filter pushdown! > > I'm going to merge this in the next couple of hours if there is no more feedback π This is super nice. I think the speed up changes a bit due to more partitions as you shared earlier. But it's starting to look really nice π -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2981197663 > π€: Benchmark completed > > Details Very nice improvement even without filter pushdown! I'm going to merge this in the next couple of hours if there is no more feedback π -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
alamb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2981184419 π€: Benchmark completed Details ``` Comparing HEAD and topk-dynamic-filters Benchmark run_topk_tpch.json ββββ³ββββ³βββ³ββββ β Queryβ HEAD β topk-dynamic-filters βChange β β‘ββββββββββββββ© β Q1 β 41.78 ms β 28.62 ms β +1.46x faster β β Q2 β 40.50 ms β 32.33 ms β +1.25x faster β β Q3 β 144.67 ms β101.83 ms β +1.42x faster β β Q4 β 43.48 ms β 36.12 ms β +1.20x faster β β Q5 β 31.85 ms β 26.77 ms β +1.19x faster β β Q6 β 54.85 ms β 47.29 ms β +1.16x faster β β Q7 β 129.61 ms β138.38 ms β 1.07x slower β β Q8 β 121.74 ms β 76.63 ms β +1.59x faster β β Q9 β 158.13 ms β114.49 ms β +1.38x faster β β Q10 β 215.83 ms β169.16 ms β +1.28x faster β β Q11 β 121.35 ms β 83.83 ms β +1.45x faster β ββββ΄ββββ΄βββ΄ββββ βββ³ββββ β Benchmark Summary β β β‘ββββββ© β Total Time (HEAD) β 1103.78ms β β Total Time (topk-dynamic-filters) β 855.46ms β β Average Time (HEAD) β 100.34ms β β Average Time (topk-dynamic-filters) β 77.77ms β β Queries Faster β10 β β Queries Slower β 1 β β Queries with No Change β 0 β β Queries with Failureβ 0 β βββ΄ββββ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
alamb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2981154277 π€ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.11.0-1015-gcp #15~24.04.1-Ubuntu SMP Thu Apr 24 20:41:05 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing topk-dynamic-filters (8e88bd9913f0e8b3d04c1a61e8e928d09f271dba) to dd936cb1b25cb685e0e146f297950eb00048c64c [diff](https://github.com/apache/datafusion/compare/dd936cb1b25cb685e0e146f297950eb00048c64c..8e88bd9913f0e8b3d04c1a61e8e928d09f271dba) Benchmarks: topk_tpch Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
alamb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2981073603 > > π€: Benchmark completed > > Details > > > π€: Benchmark completed > > Details > > could you maybe confirm the topk benchmark results @alamb ? `topk_tpch`? will do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
Dandandan commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2978979555 > π€: Benchmark completed > > Details > π€: Benchmark completed > > Details could you maybe confirm the topk benchmark results @alamb ? `topk_tp ch`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2978663984 @alamb I think we're ready to merge this and keep chipping away in https://github.com/apache/datafusion/pull/16424 and other spots right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
alamb commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2150940087
##
datafusion/physical-plan/src/execution_plan.rs:
##
@@ -509,8 +510,22 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
///
/// The default implementation bars all parent filters from being pushed
down and adds no new filters.
/// This is the safest option, making filter pushdown opt-in on a per-node
pasis.
+///
+/// Since this may perform deep modifications to the plan tree it is
called early in the optimization phase
+/// and is not expected to be called multiple times on the same plan.
+///
+/// A quick summary of the phases is below, see [`FilterPushdownPhase`]
for more details:
+/// - [`FilterPushdownPhase::Pre`]: Filters get pushded down before most
other optimizations are applied.
+/// At this stage the plan can be modified (e.g. when
[`ExecutionPlan::handle_child_pushdown_result`] is called the plan may choose
to return an entirely new plan tree)
+/// but subsequent optimizations may also rewrite the plan tree
drastically, thus it is *not guaranteed* that a [`PhysicalExpr`] can hold on to
a reference to the plan tree.
+/// During this phase static filters (such as `col = 1`) are pushed down.
+/// - [`FilterPushdownPhase::Post`]: Filters get pushed down after most
other optimizations are applied.
+/// At this stage the plan tree is expected to be stable and not change
drastically, and operators that do filter pushdown during this phase should
also not change the plan tree.
Review Comment:
I think the requirement is that the plan nodes don't change. Given that
DynamicFilters effectively can have pointers to existing ExecutionPlan
instances if a pass changes / removes / rewrites an `ExecutionPlan` that added
a DynamicFilter I am not sure what will happen π€
##
datafusion/common/src/config.rs:
##
@@ -614,6 +614,13 @@ config_namespace! {
/// during aggregations, if possible
pub enable_topk_aggregation: bool, default = true
+/// When set to true attempts to push down dynamic filters generated
by operators into the file scan phase.
Review Comment:
Would the idea be to prune hash table state, for example, if we knew some of
the groups were no longer needed?
I do think implementing more "late materialization" (aka turn on
filter_pushdown) will help too
##
datafusion/physical-plan/src/execution_plan.rs:
##
@@ -548,10 +563,22 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// This can be used alongside
[`FilterPushdownPropagation::with_filters`] and
[`FilterPushdownPropagation::with_updated_node`]
/// to dynamically build a result with a mix of supported and
unsupported filters.
///
+/// There are two different phases in filter pushdown, which some
operators may handle the same and some differently.
Review Comment:
While I love documentation, I would personally suggest not duplicating the
docs here as duplicates can get out of sync, and instead leave a link to
`FilterPushdownPhase` and focus on getting that documentation to be as clear as
possible
##
datafusion/physical-plan/src/filter_pushdown.rs:
##
@@ -20,6 +20,39 @@ use std::vec::IntoIter;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+#[derive(Debug, Clone, Copy)]
+pub enum FilterPushdownPhase {
+/// Pushdown that happens before most other optimizations.
+/// This pushdown allows static filters that do not reference any
[`ExecutionPlan`]s to be pushed down.
+/// Filters that reference an [`ExecutionPlan`] cannot be pushed down at
this stage since the whole plan tree may be rewritten
+/// by other optimizations.
+/// Implemneters are however allowed to modify the execution plan
themselves during this phase, for example by returning a completely
+/// different [`ExecutionPlan`] from
[`ExecutionPlan::handle_child_pushdown_result`].
+///
+/// [`ExecutionPlan`]: crate::ExecutionPlan
+/// [`ExecutionPlan::handle_child_pushdown_result`]:
crate::ExecutionPlan::handle_child_pushdown_result
+Pre,
+/// Pushdown that happens after most other optimizations.
+/// This pushdown allows filters that reference an [`ExecutionPlan`] to be
pushed down.
+/// It is guaranteed that subsequent optimizations will not make large
changes to the plan tree,
Review Comment:
Aa above, I think it woudl be good to make it more precise what "large
changes to the plan tree" means (basically I think it means don't remove
existing ExecutionPlans ? π€ )
##
datafusion/physical-optimizer/src/optimizer.rs:
##
@@ -131,6 +131,8 @@ impl PhysicalOptimizer {
// replacing operators with fetching variants, or adding limits
// past operators that support limit pushdown.
Arc::new(Limi
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
alamb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2978219556 I queued up some benchmarks Looking at naming now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
alamb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2978416696 π€: Benchmark completed Details ``` Comparing HEAD and topk-dynamic-filters Benchmark sort_tpch.json ββββ³β³βββ³ββββ β Queryβ HEAD β topk-dynamic-filters βChange β β‘βββββββββββ© β Q1 β 322.61 ms β336.40 ms β no change β β Q2 β 323.67 ms β279.65 ms β +1.16x faster β β Q3 β 1175.20 ms β 1152.85 ms β no change β β Q4 β 415.50 ms β414.71 ms β no change β β Q5 β 419.42 ms β429.43 ms β no change β β Q6 β 460.61 ms β462.88 ms β no change β β Q7 β 939.97 ms β928.19 ms β no change β β Q8 β 793.68 ms β791.83 ms β no change β β Q9 β 830.90 ms β832.78 ms β no change β β Q10 β 1219.46 ms β 1237.03 ms β no change β β Q11 β 718.13 ms β737.31 ms β no change β ββββ΄β΄βββ΄ββββ βββ³ββββ β Benchmark Summary β β β‘ββββββ© β Total Time (HEAD) β 7619.16ms β β Total Time (topk-dynamic-filters) β 7603.05ms β β Average Time (HEAD) β 692.65ms β β Average Time (topk-dynamic-filters) β 691.19ms β β Queries Faster β 1 β β Queries Slower β 0 β β Queries with No Change β10 β β Queries with Failureβ 0 β βββ΄ββββ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
alamb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2978414721 π€: Benchmark completed Details ``` Comparing HEAD and topk-dynamic-filters Benchmark clickbench_extended.json ββββ³ββ³βββ³ββββ β QueryβHEAD β topk-dynamic-filters βChange β β‘ββββββββββββ© β QQuery 0 β 1932.37 ms β 1879.32 ms β no change β β QQuery 1 β 700.00 ms β708.26 ms β no change β β QQuery 2 β 1327.98 ms β 1357.73 ms β no change β β QQuery 3 β 675.15 ms β663.31 ms β no change β β QQuery 4 β 1358.70 ms β 1371.20 ms β no change β β QQuery 5 β 14941.43 ms β 15114.86 ms β no change β β QQuery 6 β 2003.62 ms β 1988.73 ms β no change β β QQuery 7 β 1974.89 ms β 1889.14 ms β no change β β QQuery 8 β 804.64 ms β796.40 ms β no change β ββββ΄ββ΄βββ΄ββββ βββ³β β Benchmark Summary ββ β‘βββ© β Total Time (HEAD) β 25718.78ms β β Total Time (topk-dynamic-filters) β 25768.95ms β β Average Time (HEAD) β 2857.64ms β β Average Time (topk-dynamic-filters) β 2863.22ms β β Queries Faster β 0 β β Queries Slower β 0 β β Queries with No Change β 9 β β Queries with Failureβ 0 β βββ΄β Benchmark clickbench_partitioned.json ββββ³ββ³βββ³ββββ β QueryβHEAD β topk-dynamic-filters βChange β β‘ββββββββββββ© β QQuery 0 β15.60 ms β 15.34 ms β no change β β QQuery 1 β33.50 ms β 32.81 ms β no change β β QQuery 2 β80.76 ms β 80.51 ms β no change β β QQuery 3 β99.67 ms β 95.83 ms β no change β β QQuery 4 β 628.76 ms β577.08 ms β +1.09x faster β β QQuery 5 β 871.37 ms β852.85 ms β no change β β QQuery 6 β24.15 ms β 23.11 ms β no change β β QQuery 7 β35.78 ms β 35.47 ms β no change β β QQuery 8 β 868.37 ms β880.50 ms β no change β β QQuery 9 β 1149.06 ms β 1167.61 ms β no change β β QQuery 10β 254.37 ms β252.96 ms β no change β β QQuery 11β 285.04 ms β280.14 ms β no change β β QQuery 12β 867.64 ms β856.84 ms β no change β β QQuery 13β 1276.06 ms β 1264.78 ms β no change β β QQuery 14β 812.54 ms β791.25 ms β no change β β QQuery 15β 767.98 ms β778.42 ms β no change β β QQuery 16β 1628.17 ms β 1592.90 ms β no change β β QQuery 17β 1597.69 ms β 1583.41 ms β no change β β QQuery 18β 2883.08 ms β 2848.60 ms β no change β β QQuery 19β83.29 ms β 85.79 ms β no change β β QQuery 20β 1102.75 ms β 1125.53 ms β no change β β QQuery 21β 1243.71 ms β 1264.80 ms β no change β β QQuery 22β 2065.35 ms β 2062.86 ms β no change β β QQuery 23β 7575.06 ms β 7234.54 ms β no change β β QQuery 24β 445.22 ms β429.24 ms β no change β β QQuery 25β 374.18 ms β300.39 ms β +1.25x faster β β QQuery 26β 510.11 ms β424.15 ms β +1.20x faster β β QQuery 27β 1514.36 ms β 1528.90 ms β no change β β QQuery 28β 11858.75 ms β 12005.25 ms β no change β β QQuery 29β 532.06 ms β537.76 ms β no change β β QQuery 30β 760.11 ms β750.64 ms β no change β β QQuery 31β 793.67 ms β786.40 ms β no change β β QQuery 32β 2473.13 ms β 2403.92 ms β no change β β QQuery 33β 3116.67 ms β 3104.59 ms β no change β β QQuery 34β 3141.07 ms β 3148.95 ms β no change β β QQuery 35β 1238.65 ms β 1195.36 ms β no change β β QQuery 36β 124.11 ms β120.55 ms β no change β β QQuery 37β57.10 ms β 55.05 ms β no change β β QQuery 38β 118.93 ms β120.
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
alamb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2978414775 π€ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.11.0-1015-gcp #15~24.04.1-Ubuntu SMP Thu Apr 24 20:41:05 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing topk-dynamic-filters (cd56084f922cccd27a7c847fc18eb9cd213578c1) to dd936cb1b25cb685e0e146f297950eb00048c64c [diff](https://github.com/apache/datafusion/compare/dd936cb1b25cb685e0e146f297950eb00048c64c..cd56084f922cccd27a7c847fc18eb9cd213578c1) Benchmarks: sort_tpch Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2150949273
##
datafusion/physical-plan/src/execution_plan.rs:
##
@@ -509,8 +510,22 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
///
/// The default implementation bars all parent filters from being pushed
down and adds no new filters.
/// This is the safest option, making filter pushdown opt-in on a per-node
pasis.
+///
+/// Since this may perform deep modifications to the plan tree it is
called early in the optimization phase
+/// and is not expected to be called multiple times on the same plan.
+///
+/// A quick summary of the phases is below, see [`FilterPushdownPhase`]
for more details:
+/// - [`FilterPushdownPhase::Pre`]: Filters get pushded down before most
other optimizations are applied.
+/// At this stage the plan can be modified (e.g. when
[`ExecutionPlan::handle_child_pushdown_result`] is called the plan may choose
to return an entirely new plan tree)
+/// but subsequent optimizations may also rewrite the plan tree
drastically, thus it is *not guaranteed* that a [`PhysicalExpr`] can hold on to
a reference to the plan tree.
+/// During this phase static filters (such as `col = 1`) are pushed down.
+/// - [`FilterPushdownPhase::Post`]: Filters get pushed down after most
other optimizations are applied.
+/// At this stage the plan tree is expected to be stable and not change
drastically, and operators that do filter pushdown during this phase should
also not change the plan tree.
Review Comment:
Sort of. I am mincing my words in the comments because the reality is that
to push down filters into `DataSourceExec` a new `DataSourceExec` has to be
created and the whole tree has to be replaced "in place" to reference the new
children. But the structure of the plan does not change, and it's pretty much
guaranteed that `ExecutionPlan::new_with_children` does the right thing in
terms of preserving internal state that might be pointed to (unlike
`EnforceSorting`).
I'm not sure how to detail that in a comment, it's somewhat confusing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
alamb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2978330938 π€ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.11.0-1015-gcp #15~24.04.1-Ubuntu SMP Thu Apr 24 20:41:05 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing topk-dynamic-filters (cd56084f922cccd27a7c847fc18eb9cd213578c1) to dd936cb1b25cb685e0e146f297950eb00048c64c [diff](https://github.com/apache/datafusion/compare/dd936cb1b25cb685e0e146f297950eb00048c64c..cd56084f922cccd27a7c847fc18eb9cd213578c1) Benchmarks: tpch_mem clickbench_partitioned clickbench_extended Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2150957763
##
datafusion/physical-optimizer/src/filter_pushdown.rs:
##
@@ -362,17 +363,25 @@ use itertools::izip;
/// [`ProjectionExec`]: datafusion_physical_plan::projection::ProjectionExec
/// [`AggregateExec`]: datafusion_physical_plan::aggregates::AggregateExec
#[derive(Debug)]
-pub struct FilterPushdown {}
+pub struct FilterPushdown {
+phase: FilterPushdownPhase,
+name: String,
+}
impl FilterPushdown {
-pub fn new() -> Self {
-Self {}
+fn new(phase: FilterPushdownPhase) -> Self {
+Self {
+phase,
+name: format!("FilterPushdown({phase})"),
Review Comment:
I feel that if we do that we should rename the enum to `Static/Dynamic`. I
think it's confusing to mix `Static/Dynamic` and `Pre/Post`.
I like `Pre/Post` because that's the actual reason we're splitting it into
two phases: one runs before other optimizations, one runs after.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2150951172
##
datafusion/physical-optimizer/src/optimizer.rs:
##
@@ -131,6 +131,8 @@ impl PhysicalOptimizer {
// replacing operators with fetching variants, or adding limits
// past operators that support limit pushdown.
Arc::new(LimitPushdown::new()),
+// This FilterPushdown handles dynamic filters that may have
references to the source ExecutionPlan
+Arc::new(FilterPushdown::new_post_optimization()),
Review Comment:
I will move it lower and add a comment, with a reference to the enum with
larger docs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2978256702 Thank you Andrew! I will do the renames, docs edits, etc., push those tonight and we can merge this tomorrow evening if there is no more feedback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2977884089 I'd be curious to see perf impact after we merge https://github.com/apache/datafusion/pull/16424 as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2977763190 > I think it's great! I added one suggestion to minimize the changes to the explain output and maybe phrase the "pre" and "post" more as "normal" filter pushdown and "dynamic" filter pushdown? I think that might explain a bit more of what might be happening. > > Maybe @alamb can verify the (increased) performance improvements once more? Thanks! I have no strong opinions about the naming so I'll wait for Andrew to review to see if he has any thoughts otherwise I'll go with your suggestion π -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
Dandandan commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2977736578 > > Sounds reasonable. Perhaps just keep it this way and see if we somehow can separate it to 2 different passes later. > > It seems like the branch [pydantic#30](https://github.com/pydantic/datafusion/pull/30) passes? > > Then since you think that's reasonable I'd like to merge it and go with that instead for now even if we're not 100% sold on either direction it will let us keep iterating. > > I'll wait for your approval before merging this PR into main. I think it's great! I added one suggestion to minimize the changes to the explain output and maybe phrase the "pre" and "post" more as "normal" filter pushdown and "dynamic" filter pushdown? I think that might explain a bit more of what might be happening. Maybe @alamb can verify the (increased) performance improvements once more? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
Dandandan commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2150649026
##
datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs:
##
@@ -118,7 +131,7 @@ fn test_filter_collapse() {
let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap());
insta::assert_snapshot!(
-OptimizationTest::new(plan, FilterPushdown{}, true),
+OptimizationTest::new(plan, FilterPushdown::new_pre_optimization(),
true),
Review Comment:
Maybe call it `FilterPushdown::new` and `FilterPushdown::new_dynamic` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
Dandandan commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2150648173
##
datafusion/physical-optimizer/src/filter_pushdown.rs:
##
@@ -362,17 +363,25 @@ use itertools::izip;
/// [`ProjectionExec`]: datafusion_physical_plan::projection::ProjectionExec
/// [`AggregateExec`]: datafusion_physical_plan::aggregates::AggregateExec
#[derive(Debug)]
-pub struct FilterPushdown {}
+pub struct FilterPushdown {
+phase: FilterPushdownPhase,
+name: String,
+}
impl FilterPushdown {
-pub fn new() -> Self {
-Self {}
+fn new(phase: FilterPushdownPhase) -> Self {
+Self {
+phase,
+name: format!("FilterPushdown({phase})"),
Review Comment:
Perhaps we could keep formatting the original rule as `FilterPushdown` and
`FilterPushdown(Dynamic)`?
##
datafusion/physical-optimizer/src/filter_pushdown.rs:
##
@@ -362,17 +363,25 @@ use itertools::izip;
/// [`ProjectionExec`]: datafusion_physical_plan::projection::ProjectionExec
/// [`AggregateExec`]: datafusion_physical_plan::aggregates::AggregateExec
#[derive(Debug)]
-pub struct FilterPushdown {}
+pub struct FilterPushdown {
+phase: FilterPushdownPhase,
+name: String,
+}
impl FilterPushdown {
-pub fn new() -> Self {
-Self {}
+fn new(phase: FilterPushdownPhase) -> Self {
+Self {
+phase,
+name: format!("FilterPushdown({phase})"),
Review Comment:
Perhaps we could keep formatting the original rule as `FilterPushdown` and
the new as`FilterPushdown(Dynamic)`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2976187450 > Sounds reasonable. Perhaps just keep it this way and see if we somehow can separate it to 2 different passes later. > > > > It seems like the branch https://github.com/pydantic/datafusion/pull/30 passes? Yep so if you think that's reasonable I'd like to merge it and go with that instead for now even if we're not 100% sold on either direction it will let us keep iterating. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
Dandandan commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2975351398 > > I think two different phases sounds good. > > I don't really like complicating FilterPushdown with post / pre "phases". Why not creating the `PushDownDynamicFilters` one you suggested earlier? > > My reasoning for using an enum parameter was that there's a lot of shared implementation both on the optimizer side and ExectionPlan side. E.g. the transparent nodes like CoalesceBatchesExec don't really care which phase it is, but FilterExec needs to ignore the dynamic phase and SortExec needs to ignore the static filter pushdown phase. Sounds reasonable. Perhaps just keep it this way and see if we somehow can separate it to 2 different passes later. It seems like the branch https://github.com/pydantic/datafusion/pull/30 passes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2974704013 > I think two different phases sounds good. > > I don't really like complicating FilterPushdown with post / pre "phases". Why not creating the `PushDownDynamicFilters` one you suggested earlier? My reasoning for using a parameters was that there's a lot of shared implementation both on the optimizer side and ExectionPlan side. E.g. the transparent nodes like CoalesceBatchesExec don't really care which phase it is, but FilterExec needs to ignore the dynamic phase and SortExec needs to ignore the static filter pushdown phase. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
Dandandan commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2974698767 > @Dandandan thank you so much for pushing this forward! Really appreciate the help and collaboration. > > Since you've been looking at the code do you have any thoughts on [#15770 (comment)](https://github.com/apache/datafusion/pull/15770#issuecomment-2971653072) / the discussion around implications for `EnforceSorting`, etc.? I think two different phases sounds good. I don't really like complicating FilterPushdown with post / pre "phases". Why not creating the `PushDownDynamicFilters` one you suggested earlier? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
Dandandan commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2974699280 Apart from that, I agree with the sentiment to try eliminate it from `EnforceSorting` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2973326482 @Dandandan thank you so much for pushing this forward! Really appreciate the help and collaboration. Since you've been looking at the code do you have any thoughts on https://github.com/apache/datafusion/pull/15770#issuecomment-2971653072 / the discussion around implications for `EnforceSorting`, etc.? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
Dandandan commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2973096992 @alamb could you rerun the benchmarks? Maybe also run the topk benchmark (`topk_tpch`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2972849206 Let's merge your PR into here :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
Dandandan commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2972769559 Combined with changes in my PR (https://github.com/apache/datafusion/pull/16408), this is looking sweet for the TopK benchmarks: ``` Benchmark run_topk_tpch.json ββββ³ββββ³ββββ³ββββ β Queryβ main β topk-apply-filter βChange β β‘βββββββββββββββ© β Q1 β 28.59 ms β 16.69 ms β +1.71x faster β β Q2 β 30.36 ms β 20.65 ms β +1.47x faster β β Q3 β 101.92 ms β 54.89 ms β +1.86x faster β β Q4 β 27.61 ms β 22.37 ms β +1.23x faster β β Q5 β 19.31 ms β 12.73 ms β +1.52x faster β β Q6 β 42.45 ms β 26.39 ms β +1.61x faster β β Q7 β 85.18 ms β 70.72 ms β +1.20x faster β β Q8 β 103.80 ms β 46.25 ms β +2.24x faster β β Q9 β 99.54 ms β 63.53 ms β +1.57x faster β β Q10 β 141.00 ms β 100.62 ms β +1.40x faster β β Q11 β 78.28 ms β 56.94 ms β +1.37x faster β ββββ΄ββββ΄ββββ΄ββββ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
Dandandan commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2972642194 Results of applying this in TopK itself: https://github.com/apache/datafusion/pull/16408#issue-3145903557 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
Dandandan commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2146805677
##
datafusion/common/src/config.rs:
##
@@ -614,6 +614,13 @@ config_namespace! {
/// during aggregations, if possible
pub enable_topk_aggregation: bool, default = true
+/// When set to true attempts to push down dynamic filters generated
by operators into the file scan phase.
Review Comment:
I think we could also extend non-scan operators with the dynamic filter
pushdown?
It would be interesting to extend filters / aggregates / joins / etc. with
the support of this as well...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
Dandandan commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2972552039 I am reapplying my PR on top of this branch, I'll report my results. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2971653072 @alamb I opened https://github.com/pydantic/datafusion/pull/30 to explore the idea of having two pushdown phases. It's not complete (some failing tests, some TODOs) but I think it captures the idea. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
alamb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2971539472 > It could (and does currently) handle push down of dynamic filters, the issue is that it cannot be run after EnforceSorting because EnforceSorting and EnforceDistribution need to be run after e.g. a FilterExec is removed from the plan. I see. I will need to think if there is any better way, but I think someone would just have to try to make it work / detangle EnforceSorting more -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2971532504 Okay then I'll file a ticket for the multi-column sort and the display. But I do think we should hash out https://github.com/apache/datafusion/pull/15770#issuecomment-2971441638 in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2971531318 > I don't understand why FilterPushdown can't also push down DynamicFlters if it was run after EnforceSorting but I vaguely remember it being discussed and rejected before It could (and does currently) handle push down of dynamic filters, the issue is that *it cannot be run after EnforceSorting* because EnforceSorting and EnforceDistribution need to be run after e.g. a `FilterExec` is removed from the plan. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
alamb commented on PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2971510953
> 1. Mutli-column order by not working. This seems like a bug / oversight.
Fundamentally I don't see any reason it shouldn't work, I'll have to
investigate why.
I suggest we file a ticket explaining what is going on so we don't lose the
context, but don't try and fix it in this PR
> 2. Interaction with `EnforceSorting` & co. After grokking the code for a
while I'm pretty convinced this is the only way to do things correctly.
`SortExec` needs to establish a link (via references) with the
`DataSourceExec`, and that needs to happen _before_ `EnforceSorting` runs
because it might modify the plan in the process. So we have no choice but to
teach `EnforceSorting` how to preserve this link. But that's the part I'm not
sure I did correctly since I don't fully understand how `EnforceSorting` works.
There may be other alternatives here. @berkaysynnada suggested that the right
rule ordering might "just work", but neither of us could get it to. We might be
able to split the filter pushdown into two steps: static (cannot make
assumptions about reference links but can modify the plan tree, e.g. for
`FilterExec`) and dynamic (can make reference links but cannot modify the plan
tree, e.g. `SortExec` and `HashJoin`). I think this would consist of adding a
new optimizer rule
`DynamicFilterPushdown` or something like that and maybe a
`Phase::{Dynamic,Static}` argument to the existing method on `ExecutionPlan`.
I just know we have also found EnforceSorting to be fragile (and there have
been quite a few bugs related to limits being lost, etc) so I agree with your
assesment that it is tough to know if it is correct.
I don't understand why `FilterPushdown` can't also push down DynamicFlters
if it was run after `EnforceSorting` but I vaguely remember it being discussed
and rejected before
> 3. Formatting in explain plans. I'm open to suggestions on this!
Likewise I think we should just file a follow on ticket. Maybe we can change
the `Display` impl to be just `` or something to make it clear
it is something special (not just `false` constant)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2971441638 > We might be able to split the filter pushdown into two steps: static (cannot make assumptions about reference links but can modify the plan tree, e.g. for FitlerExec) and dynamic (can make reference links but cannot modify the plan tree, e.g. SortExec and HashJoin). @alamb I had not thought of this before but it does seem interesting. we'd basically add a new (simpler) rule called `PushDownDynamicFilters` or something like that. we'd also have to add a new method to `ExecutionPlan::gather_dynamic_filters_for_pushdown` and `SortExec` / `HashJoinExec` / any operator that wants to push down a filter that depends on the plan's structure / has references back to itself uses this method instead of the existing pushdown methods. then this can be run after `EnforceSorting`. I think the main con to this would be if an operator wanted to push down a dyanmic filter _and_ modify the plan structure. I don't think it will want to remove itself from the plan structure like `FilterExec` does, but I could see it wanting to add a node (e.g. if we implemented https://github.com/apache/datafusion/pull/15697 by having `SortExec` insert a `FilterExec` underneath itself). the big pro here is that I think it makes a clear distinction of capabilities and resolves th e fragility of rewriting a plan while keeping track of references between nodes. what do you think of that approach vs. the approach we have here of teaching `EnforceSorting` how to pass around dynamic filters? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on code in PR #15770: URL: https://github.com/apache/datafusion/pull/15770#discussion_r2145924185 ## datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs: ## @@ -114,6 +118,18 @@ fn pushdown_sorts_helper( sort_push_down.data.fetch = fetch; sort_push_down.data.ordering_requirement = Some(OrderingRequirements::from(sort_ordering)); +let filter = plan +.as_any() +.downcast_ref::() +.and_then(|s| s.filter().clone()); Review Comment: I see what you mean now. It would be `ExecutionPlan::dynamic_filters(&self) -> Arc` which is fully typed. That sounds reasonable to me π. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2145917211
##
datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs:
##
@@ -346,6 +359,137 @@ fn test_node_handles_child_pushdown_result() {
);
}
+#[tokio::test]
+async fn test_topk_dynamic_filter_pushdown() {
+// This test is a bit of a hack, but it shows that we can push down
dynamic filters
+// into the DataSourceExec. The test is not perfect because we don't have
a real
+// implementation of the dynamic filter yet, so we just use a static
filter.
+let batches = vec![
+record_batch!(
+("a", Utf8, ["aa", "ab"]),
+("b", Utf8, ["bd", "bc"]),
+("c", Float64, [1.0, 2.0])
+)
+.unwrap(),
+record_batch!(
+("a", Utf8, ["ac", "ad"]),
+("b", Utf8, ["bb", "ba"]),
+("c", Float64, [2.0, 1.0])
+)
+.unwrap(),
+];
+let scan = TestScanBuilder::new(schema())
+.with_support(true)
+.with_batches(batches)
+.build();
+let plan = Arc::new(
+SortExec::new(
+LexOrdering::new(vec![PhysicalSortExpr::new(
+col("b", &schema()).unwrap(),
+SortOptions::new(true, false), // descending, nulls_first
+)])
+.unwrap(),
+Arc::clone(&scan),
+)
+.with_fetch(Some(1)),
+) as Arc;
+
+// expect the predicate to be pushed down into the DataSource
+insta::assert_snapshot!(
+OptimizationTest::new(Arc::clone(&plan), FilterPushdown{}, true),
+@r"
+OptimizationTest:
+ input:
+- SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST],
preserve_partitioning=[false]
+- DataSourceExec: file_groups={1 group: [[test.paqruet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
Review Comment:
https://github.com/apache/datafusion/pull/16403
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2971391046 Thanks for the review @alamb! I'll try to summarize the high level issues: 1. Mutli-column order by not working. This seems like a bug / oversight. Fundamentally I don't see any reason it shouldn't work, I'll have to investigate why. 2. Interaction with `EnforceSorting` & co. After grokking the code for a while I'm pretty convinced this is the only way to do things correctly. `SortExec` needs to establish a link (via references) with the `DataSourceExec`, and that needs to happen _before_ `EnforceSorting` runs because it might modify the plan in the process. So we have no choice but to teach `EnforceSorting` how to preserve this link. But that's the part I'm not sure I did correctly since I don't fully understand how `EnforceSorting` works. There may be other alternatives here. @berkaysynnada suggested that the right rule ordering might "just work", but neither of us could get it to. We might be able to split the filter pushdown into two steps: static (cannot make assumptions about reference links but can modify the plan tree, e.g. for `FitlerExec`) and dynamic (can make reference links but cannot modify the plan tree, e.g. `SortExec` and `HashJoin`). 3. Formatting in explain plans. I'm open to suggestions on this! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
alamb commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2145903411
##
datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs:
##
@@ -70,6 +71,8 @@ pub fn assign_initial_requirements(sort_push_down: &mut
SortPushDown) {
// If the parent has a fetch value, assign it to the children
// Or use the fetch value of the child.
fetch: child.plan.fetch(),
+// If the parent has a filter, assign it to the children
Review Comment:
Thank you -- this makes sense to me
Maybe we can clarify with some additional comments to help the future readers
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on code in PR #15770: URL: https://github.com/apache/datafusion/pull/15770#discussion_r2145900061 ## datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs: ## @@ -114,6 +118,18 @@ fn pushdown_sorts_helper( sort_push_down.data.fetch = fetch; sort_push_down.data.ordering_requirement = Some(OrderingRequirements::from(sort_ordering)); +let filter = plan +.as_any() +.downcast_ref::() +.and_then(|s| s.filter().clone()); Review Comment: We kind of already have that for the filter pushdown. The issue is that so that `SortExec` / `TopK` can call `DynamicFilterPhysicalExpr::update` they have to know it's a concrete `DynamicFilterPhysicalExpr` and not an arbitrary `Arc`, but the existing methods all deal with `Arc`. Fundamentally what makes all of this code weird / scary to me is that we are taking data from a typed thing (`SortExec`) moving into an untyped world (a tree of `ExecutionPlan`) and then trying to re-create the typed thing restoring all data (`filter/limit`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2145895957
##
datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt:
##
@@ -246,38 +246,3 @@ physical_plan
02)--FilterExec: val@0 != part@1
03)RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet]]},
projection=[val, part], file_type=parquet, predicate=val@0 != d AND val@0 !=
c, pruning_predicate=val_null_count@2 != row_count@3 AND (val_min@0 != d OR d
!= val_max@1) AND val_null_count@2 != row_count@3 AND (val_min@0 != c OR c !=
val_max@1), required_guarantees=[val not in (c, d)]
-
-# The order of filters should not matter
Review Comment:
Might be accidental / from merging. Will revert.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2145894014
##
datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs:
##
@@ -70,6 +71,8 @@ pub fn assign_initial_requirements(sort_push_down: &mut
SortPushDown) {
// If the parent has a fetch value, assign it to the children
// Or use the fetch value of the child.
fetch: child.plan.fetch(),
+// If the parent has a filter, assign it to the children
Review Comment:
That's where the actual pushdown is happening! This part is just so that
that pushdown is not lost.
Basically: EnforceSorting wants to run last / late:
https://github.com/apache/datafusion/blob/61a2dfdc71508bd0b09abaefdc882d33a9e0d91e/datafusion/physical-optimizer/src/optimizer.rs#L111
Since `FilterPushdown` can absolutely modify the plan tree (e.g. by removing
a `FilterExec`) it needs to run _before_ this (maybe we just shouldn't remove
FilterExec and just make it a no-op, but that might loose optimizations e.g.
because `CoalesceBatchesExec` can't then be removed).
But at the same time `EnforceSorting` removes and re-adds `SortExec`'s
multiple times.
So what we are doing here is passing around the filters that were already
pushed down so that the connection between the SortExec and the DataSourceExec
is not lost. This is similar to what is happening with `limit` which is also
being passed around.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
alamb commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2145790872
##
datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs:
##
@@ -0,0 +1,387 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::{Arc, LazyLock};
+
+use arrow::array::{Int32Array, StringArray, StringDictionaryBuilder};
+use arrow::datatypes::Int32Type;
+use arrow::record_batch::RecordBatch;
+use arrow::util::pretty::pretty_format_batches;
+use arrow_schema::{DataType, Field, Schema};
+use datafusion::datasource::listing::{ListingOptions, ListingTable,
ListingTableConfig};
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_datasource::ListingTableUrl;
+use datafusion_datasource_parquet::ParquetFormat;
+use datafusion_execution::object_store::ObjectStoreUrl;
+use itertools::Itertools;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+use parquet::arrow::ArrowWriter;
+use rand::rngs::StdRng;
+use rand::{Rng, SeedableRng};
+use tokio::sync::Mutex;
+use tokio::task::JoinSet;
+
+#[derive(Clone)]
+struct TestDataSet {
+store: Arc,
+schema: Arc,
+}
+
+/// List of in memory parquet files with UTF8 data
+// Use a mutex rather than LazyLock to allow for async initialization
+static TESTFILES: LazyLock>> =
+LazyLock::new(|| Mutex::new(vec![]));
+
+async fn test_files() -> Vec {
+let files_mutex = &TESTFILES;
+let mut files = files_mutex.lock().await;
+if !files.is_empty() {
+return (*files).clone();
+}
+
+let mut rng = StdRng::seed_from_u64(0);
+
+for nulls_in_ids in [false, true] {
+for nulls_in_names in [false, true] {
+for nulls_in_departments in [false, true] {
+let store = Arc::new(InMemory::new());
+
+let schema = Arc::new(Schema::new(vec![
+Field::new("id", DataType::Int32, nulls_in_ids),
+Field::new("name", DataType::Utf8, nulls_in_names),
+Field::new(
+"department",
+DataType::Dictionary(
+Box::new(DataType::Int32),
+Box::new(DataType::Utf8),
+),
+nulls_in_departments,
+),
+]));
+
+let name_choices = if nulls_in_names {
+[Some("Alice"), Some("Bob"), None, Some("David"), None]
+} else {
+[
+Some("Alice"),
+Some("Bob"),
+Some("Charlie"),
+Some("David"),
+Some("Eve"),
+]
+};
+
+let department_choices = if nulls_in_departments {
+[
+Some("Theater"),
+Some("Engineering"),
+None,
+Some("Arts"),
+None,
+]
+} else {
+[
+Some("Theater"),
+Some("Engineering"),
+Some("Healthcare"),
+Some("Arts"),
+Some("Music"),
+]
+};
+
+// Generate 5 files, some with overlapping or repeated ids
some without
+for i in 0..5 {
+let num_batches = rng.random_range(1..3);
+let mut batches = Vec::with_capacity(num_batches);
+for _ in 0..num_batches {
+let num_rows = 25;
+let ids =
Int32Array::from_iter((0..num_rows).map(|file| {
+if nulls_in_ids {
+if rng.random_bool(1.0 / 10.0) {
+None
+} else {
+Some(rng.random_range(file..file + 5))
+}
+
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
alamb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2971297789 π€ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.11.0-1015-gcp #15~24.04.1-Ubuntu SMP Thu Apr 24 20:41:05 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing topk-dynamic-filters (61a2dfdc71508bd0b09abaefdc882d33a9e0d91e) to 1d73c59b998a0b76ee2ebaacd22df682b31a540c [diff](https://github.com/apache/datafusion/compare/1d73c59b998a0b76ee2ebaacd22df682b31a540c..61a2dfdc71508bd0b09abaefdc882d33a9e0d91e) Benchmarks: sort_tpch Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
alamb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2971305067 π€: Benchmark completed Details ``` Comparing HEAD and topk-dynamic-filters Benchmark sort_tpch.json ββββ³β³βββ³βββ β Queryβ HEAD β topk-dynamic-filters β Change β β‘ββββββββββ© β Q1 β 350.14 ms β352.17 ms βno change β β Q2 β 266.37 ms β302.58 ms β 1.14x slower β β Q3 β 1146.06 ms β 1164.19 ms βno change β β Q4 β 425.71 ms β417.77 ms βno change β β Q5 β 425.76 ms β430.96 ms βno change β β Q6 β 467.68 ms β467.15 ms βno change β β Q7 β 926.46 ms β921.51 ms βno change β β Q8 β 781.20 ms β789.52 ms βno change β β Q9 β 829.24 ms β825.37 ms βno change β β Q10 β 1223.05 ms β 1208.09 ms βno change β β Q11 β 719.15 ms β790.11 ms β 1.10x slower β ββββ΄β΄βββ΄βββ βββ³ββββ β Benchmark Summary β β β‘ββββββ© β Total Time (HEAD) β 7560.84ms β β Total Time (topk-dynamic-filters) β 7669.42ms β β Average Time (HEAD) β 687.35ms β β Average Time (topk-dynamic-filters) β 697.22ms β β Queries Faster β 0 β β Queries Slower β 2 β β Queries with No Change β 9 β β Queries with Failureβ 0 β βββ΄ββββ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
alamb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2971198838 I also filed a ticket to add a metric that we can use to see when file pruning is working: - https://github.com/apache/datafusion/issues/16402 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
alamb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2971173365 > > QQuery 25β 380.03 ms β279.23 ms β +1.36x faster > > ```sql > SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY SearchPhrase LIMIT 10; > ``` > > Looks like it might be real! > > And obviously we expect Q23 to get a huge bump once we enable filter pushdown... > > I think the only thing left here is for someone to review in detail π I can reproduce this improvement locally. I tested using ```shell $ cat q25.sql SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY "SearchPhrase" LIMIT 10; $ ./datafusion-cli-topk-dynamic-filters -f q25.sql ... ``` main (merge-base) ``` Elapsed 0.209 seconds. Elapsed 0.214 seconds. Elapsed 0.210 seconds. ``` This branch ``` Elapsed 0.173 seconds. Elapsed 0.182 seconds. Elapsed 0.178 seconds. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2145547651
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -843,6 +846,8 @@ pub struct SortExec {
common_sort_prefix: Vec,
/// Cache holding plan properties like equivalences, output partitioning
etc.
cache: PlanProperties,
+/// Filter matching the state of the sort for dynamic filter pushdown
+filter: Option>,
Review Comment:
It will be created regardless atm. Possibly could be optimized, e.g. by
keeping track of refs but not too sure.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
Dandandan commented on PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2971038192
I wonder if we see any improvements on the "sort tpch with limit" benchmark?
```cargo run --release --bin dfbench -- sort-tpch --iterations 5 --path
"${TPCH_DIR}" -o "${RESULTS_FILE}" --limit 100```
https://github.com/apache/datafusion/pull/15697/files#diff-1769f5787dc11c8b1f1b48288cdf3c89d25a5b5cbc6be4740bfcc70a6313ba99R930
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
Dandandan commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2145572928
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -843,6 +846,8 @@ pub struct SortExec {
common_sort_prefix: Vec,
/// Cache holding plan properties like equivalences, output partitioning
etc.
cache: PlanProperties,
+/// Filter matching the state of the sort for dynamic filter pushdown
+filter: Option>,
Review Comment:
That might make sense for now π€
I think once we do https://github.com/apache/datafusion/pull/15697 probably
it's not needed to disable computing it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2145548000
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -843,6 +846,8 @@ pub struct SortExec {
common_sort_prefix: Vec,
/// Cache holding plan properties like equivalences, output partitioning
etc.
cache: PlanProperties,
+/// Filter matching the state of the sort for dynamic filter pushdown
+filter: Option>,
Review Comment:
It _won't_ be created it there's no `limit` applied is the idea.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
Dandandan commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2145545013
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -843,6 +846,8 @@ pub struct SortExec {
common_sort_prefix: Vec,
/// Cache holding plan properties like equivalences, output partitioning
etc.
cache: PlanProperties,
+/// Filter matching the state of the sort for dynamic filter pushdown
+filter: Option>,
Review Comment:
What isn't really clear to me from this PR - will this create a filter here
regardless whether it can be pushed down or do we keep this as `None` if it
can't? As there might be a small overhead of evaluating this filter.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
Dandandan commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2145521489
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -207,6 +221,7 @@ impl TopK {
// Idea: filter out rows >= self.heap.max() early (before passing to
`RowConverter`)
// this avoids some work and also might be better vectorizable.
let mut batch_entry = self.heap.register_batch(batch.clone());
+let mut updated = false;
Review Comment:
In a followup would be fine!
From my earlier results here https://github.com/apache/datafusion/pull/15697
I see some queries that have a larger speedup for some of the benchmarks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
alamb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2970903681 > And obviously we expect Q23 to get a huge bump once we enable filter pushdown... > > I think the only thing left here is for someone to review in detail π I will do so later this afternoon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
alamb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2970721901 π€: Benchmark completed Details ``` Comparing HEAD and topk-dynamic-filters Benchmark clickbench_extended.json ββββ³ββ³βββ³ββββ β QueryβHEAD β topk-dynamic-filters βChange β β‘ββββββββββββ© β QQuery 0 β 1982.56 ms β 1955.26 ms β no change β β QQuery 1 β 711.82 ms β678.72 ms β no change β β QQuery 2 β 1513.91 ms β 1450.23 ms β no change β β QQuery 3 β 708.20 ms β693.14 ms β no change β β QQuery 4 β 1448.54 ms β 1465.17 ms β no change β β QQuery 5 β 16141.33 ms β 16207.13 ms β no change β β QQuery 6 β 2038.97 ms β 2054.36 ms β no change β β QQuery 7 β 2148.16 ms β 2078.78 ms β no change β β QQuery 8 β 875.11 ms β850.36 ms β no change β ββββ΄ββ΄βββ΄ββββ βββ³β β Benchmark Summary ββ β‘βββ© β Total Time (HEAD) β 27568.61ms β β Total Time (topk-dynamic-filters) β 27433.15ms β β Average Time (HEAD) β 3063.18ms β β Average Time (topk-dynamic-filters) β 3048.13ms β β Queries Faster β 0 β β Queries Slower β 0 β β Queries with No Change β 9 β β Queries with Failureβ 0 β βββ΄β Benchmark clickbench_partitioned.json ββββ³ββ³βββ³ββββ β QueryβHEAD β topk-dynamic-filters βChange β β‘ββββββββββββ© β QQuery 0 β14.73 ms β 16.09 ms β 1.09x slower β β QQuery 1 β32.46 ms β 34.15 ms β 1.05x slower β β QQuery 2 β82.16 ms β 82.93 ms β no change β β QQuery 3 β99.44 ms β 93.57 ms β +1.06x faster β β QQuery 4 β 586.83 ms β640.21 ms β 1.09x slower β β QQuery 5 β 869.83 ms β895.81 ms β no change β β QQuery 6 β24.73 ms β 23.35 ms β +1.06x faster β β QQuery 7 β38.17 ms β 39.22 ms β no change β β QQuery 8 β 927.63 ms β926.34 ms β no change β β QQuery 9 β 1189.27 ms β 1241.04 ms β no change β β QQuery 10β 268.91 ms β265.63 ms β no change β β QQuery 11β 296.18 ms β297.87 ms β no change β β QQuery 12β 919.60 ms β916.36 ms β no change β β QQuery 13β 1367.05 ms β 1375.43 ms β no change β β QQuery 14β 854.12 ms β866.26 ms β no change β β QQuery 15β 827.71 ms β829.16 ms β no change β β QQuery 16β 1760.76 ms β 1734.60 ms β no change β β QQuery 17β 1640.40 ms β 1595.59 ms β no change β β QQuery 18β 3152.52 ms β 3106.21 ms β no change β β QQuery 19β85.80 ms β 89.23 ms β no change β β QQuery 20β 1139.03 ms β 1165.68 ms β no change β β QQuery 21β 1327.33 ms β 1383.27 ms β no change β β QQuery 22β 2174.39 ms β 2303.49 ms β 1.06x slower β β QQuery 23β 7881.25 ms β 7861.79 ms β no change β β QQuery 24β 469.80 ms β482.91 ms β no change β β QQuery 25β 380.03 ms β279.23 ms β +1.36x faster β β QQuery 26β 533.18 ms β546.21 ms β no change β β QQuery 27β 1595.23 ms β 1646.22 ms β no change β β QQuery 28β 12629.48 ms β 12542.27 ms β no change β β QQuery 29β 537.79 ms β542.54 ms β no change β β QQuery 30β 791.58 ms β814.30 ms β no change β β QQuery 31β 856.79 ms β864.66 ms β no change β β QQuery 32β 2646.01 ms β 2693.32 ms β no change β β QQuery 33β 3339.13 ms β 3374.26 ms β no change β β QQuery 34β 3352.60 ms β 3397.57 ms β no change β β QQuery 35β 1269.19 ms β 1270.10 ms β no change β β QQuery 36β 126.35 ms β128.61 ms β no change β β QQuery 37β56.05 ms β 57.52 ms β no change β β QQuery 38β 125.95 ms β128.
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2970734597 > QQuery 25β 380.03 ms β279.23 ms β +1.36x faster ```sql SELECT SearchPhrase FROM hits WHERE SearchPhrase <> '' ORDER BY SearchPhrase LIMIT 10; ``` Looks like it might be real! And obviously we expect Q23 to get a huge bump once we enable filter pushdown... I think the only thing left here is for someone to review in detail π -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2145280129
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -207,6 +221,7 @@ impl TopK {
// Idea: filter out rows >= self.heap.max() early (before passing to
`RowConverter`)
// this avoids some work and also might be better vectorizable.
let mut batch_entry = self.heap.register_batch(batch.clone());
+let mut updated = false;
Review Comment:
Probably! Do you think we should add it in this PR or a followup?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
Dandandan commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2145258417
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -207,6 +221,7 @@ impl TopK {
// Idea: filter out rows >= self.heap.max() early (before passing to
`RowConverter`)
// this avoids some work and also might be better vectorizable.
let mut batch_entry = self.heap.register_batch(batch.clone());
+let mut updated = false;
Review Comment:
This doesn't do the filtering here, right
(https://github.com/apache/datafusion/pull/15697 ). Is there any benefit doing
that as well?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
alamb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2970575427 π€ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.11.0-1015-gcp #15~24.04.1-Ubuntu SMP Thu Apr 24 20:41:05 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing topk-dynamic-filters (61a2dfdc71508bd0b09abaefdc882d33a9e0d91e) to 1d73c59b998a0b76ee2ebaacd22df682b31a540c [diff](https://github.com/apache/datafusion/compare/1d73c59b998a0b76ee2ebaacd22df682b31a540c..61a2dfdc71508bd0b09abaefdc882d33a9e0d91e) Benchmarks: tpch_mem clickbench_partitioned clickbench_extended Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
alamb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2970437633 > The interesting bit is that this is now faster even with predicate pushdown turned off thanks to the late partition / stats based pruning @alamb !! For the case of a single partition it's 14x faster: That sounds pretty sweet. I looked at the numbers and it seems like you have achieved the "at least as good performance and sometimes better" test that I think is needed for a feature like this. Hopefully we can get an additional confirmation and then we can merge this PR. That would be epic -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
alamb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2970425995 > @alamb could you kick off your benchmarks run on this branch? Sorry -- I missed this. I have queued them up and they should be running shortly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
berkaysynnada commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2965256342 I couldn't find much time on upstream issues lately @adriangb, sorry for that. But I'll certainly take a look at this asap -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2138485861
##
datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs:
##
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::{Arc, LazyLock};
+
+use arrow::array::{Int32Array, StringArray, StringDictionaryBuilder};
+use arrow::datatypes::Int32Type;
+use arrow::record_batch::RecordBatch;
+use arrow::util::pretty::pretty_format_batches;
+use arrow_schema::{DataType, Field, Schema};
+use datafusion::datasource::listing::{ListingOptions, ListingTable,
ListingTableConfig};
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_datasource::ListingTableUrl;
+use datafusion_datasource_parquet::ParquetFormat;
+use datafusion_execution::object_store::ObjectStoreUrl;
+use itertools::Itertools;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+use parquet::arrow::ArrowWriter;
+use rand::rngs::StdRng;
+use rand::{Rng, SeedableRng};
+use tokio::sync::Mutex;
+use tokio::task::JoinSet;
+
+#[derive(Clone)]
+struct TestDataSet {
+store: Arc,
+schema: Arc,
+}
+
+/// List of in memory parquet files with UTF8 data
+// Use a mutex rather than LazyLock to allow for async initialization
+static TESTFILES: LazyLock>> =
+LazyLock::new(|| Mutex::new(vec![]));
+
+async fn test_files() -> Vec {
+let files_mutex = &TESTFILES;
+let mut files = files_mutex.lock().await;
+if !files.is_empty() {
+return (*files).clone();
+}
+
+let mut rng = StdRng::seed_from_u64(0);
+
+for nulls_in_ids in [false, true] {
+for nulls_in_names in [false, true] {
+for nulls_in_departments in [false, true] {
+let store = Arc::new(InMemory::new());
+
+let schema = Arc::new(Schema::new(vec![
+Field::new("id", DataType::Int32, nulls_in_ids),
+Field::new("name", DataType::Utf8, nulls_in_names),
+Field::new(
+"department",
+DataType::Dictionary(
+Box::new(DataType::Int32),
+Box::new(DataType::Utf8),
+),
+nulls_in_departments,
+),
+]));
+
+let name_choices = if nulls_in_names {
+[Some("Alice"), Some("Bob"), None, Some("David"), None]
+} else {
+[
+Some("Alice"),
+Some("Bob"),
+Some("Charlie"),
+Some("David"),
+Some("Eve"),
+]
+};
+
+let department_choices = if nulls_in_departments {
+[
+Some("Theater"),
+Some("Engineering"),
+None,
+Some("Arts"),
+None,
+]
+} else {
+[
+Some("Theater"),
+Some("Engineering"),
+Some("Healthcare"),
+Some("Arts"),
+Some("Music"),
+]
+};
+
+// Generate 5 files, some with overlapping or repeated ids
some without
+for i in 0..5 {
+let num_batches = rng.gen_range(1..3);
+let mut batches = Vec::with_capacity(num_batches);
+for _ in 0..num_batches {
+let num_rows = 25;
+let ids =
Int32Array::from_iter((0..num_rows).map(|file| {
+if nulls_in_ids {
+if rng.gen_bool(1.0 / 10.0) {
+None
+} else {
+Some(rng.gen_range(file..file + 5))
+}
+} e
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2960041995 @alamb could you kick off your benchmarks run on this branch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2959969124 Here's the current Q23 benchmarks on my local machine on this branch using the following test script: ```sql -- Current status quo before this PR: filter pushdown on, no dynamic filter pushdown, default target partitions. SET datafusion.execution.parquet.pushdown_filters = true; SET datafusion.optimizer.enable_dynamic_filter_pushdown = false; SET datafusion.execution.target_partitions = 0; explain analyze SELECT "EventTime" FROM 'benchmarks/data/hits_partitioned/' ORDER BY "EventTime" LIMIT 10; -- With dynamic filters turned on SET datafusion.execution.parquet.pushdown_filters = true; SET datafusion.optimizer.enable_dynamic_filter_pushdown = true; SET datafusion.execution.target_partitions = 0; explain analyze SELECT "EventTime" FROM 'benchmarks/data/hits_partitioned/' ORDER BY "EventTime" LIMIT 10; -- With filter pushdown off, dynamic filters on SET datafusion.execution.parquet.pushdown_filters = false; SET datafusion.optimizer.enable_dynamic_filter_pushdown = true; SET datafusion.execution.target_partitions = 0; SELECT "EventTime" FROM 'benchmarks/data/hits_partitioned/' ORDER BY "EventTime" LIMIT 10; -- With dynamic filters off and target partitions set to 1 SET datafusion.execution.parquet.pushdown_filters = true; SET datafusion.optimizer.enable_dynamic_filter_pushdown = false; SET datafusion.execution.target_partitions = 1; SELECT "EventTime" FROM 'benchmarks/data/hits_partitioned/' ORDER BY "EventTime" LIMIT 10; -- With dynamic filters on and target partitions set to 1 SET datafusion.execution.parquet.pushdown_filters = true; SET datafusion.optimizer.enable_dynamic_filter_pushdown = true; SET datafusion.execution.target_partitions = 1; SELECT "EventTime" FROM 'benchmarks/data/hits_partitioned/' ORDER BY "EventTime" LIMIT 10; ``` ``` DataFusion CLI v48.0.0 > -- Current status quo before this PR: filter pushdown on, no dynamic filter pushdown, default target partitions. SET datafusion.execution.parquet.pushdown_filters = true; SET datafusion.optimizer.enable_dynamic_filter_pushdown = false; SET datafusion.execution.target_partitions = 0; SELECT "EventTime" FROM 'benchmarks/data/hits_partitioned/' ORDER BY "EventTime" LIMIT 10; 0 row(s) fetched. Elapsed 0.003 seconds. 0 row(s) fetched. Elapsed 0.000 seconds. 0 row(s) fetched. Elapsed 0.000 seconds. ++ | EventTime | ++ | 1372708800 | | 1372708800 | | 1372708800 | | 1372708800 | | 1372708800 | | 1372708800 | | 1372708800 | | 1372708800 | | 1372708800 | | 1372708800 | ++ 10 row(s) fetched. Elapsed 0.214 seconds. > -- With dynamic filters turned on SET datafusion.execution.parquet.pushdown_filters = true; SET datafusion.optimizer.enable_dynamic_filter_pushdown = true; SET datafusion.execution.target_partitions = 0; SELECT "EventTime" FROM 'benchmarks/data/hits_partitioned/' ORDER BY "EventTime" LIMIT 10; 0 row(s) fetched. Elapsed 0.000 seconds. 0 row(s) fetched. Elapsed 0.000 seconds. 0 row(s) fetched. Elapsed 0.000 seconds. ++ | EventTime | ++ | 1372708800 | | 1372708800 | | 1372708800 | | 1372708800 | | 1372708800 | | 1372708800 | | 1372708800 | | 1372708800 | | 1372708800 | | 1372708800 | ++ 10 row(s) fetched. Elapsed 0.091 seconds. > -- With filter pushdown off, dynamic filters on SET datafusion.execution.parquet.pushdown_filters = false; SET datafusion.optimizer.enable_dynamic_filter_pushdown = true; SET datafusion.execution.target_partitions = 0; SELECT "EventTime" FROM 'benchmarks/data/hits_partitioned/' ORDER BY "EventTime" LIMIT 10; 0 row(s) fetched. Elapsed 0.000 seconds. 0 row(s) fetched. Elapsed 0.000 seconds. 0 row(s) fetched. Elapsed 0.000 seconds. ++ | EventTime | ++ | 1372708800 | | 1372708800 | | 1372708800 | | 1372708800 | | 1372708800 | | 1372708800 | | 1372708800 | | 1372708800 | | 1372708800 | | 1372708800 | ++ 10 row(s) fetched. Elapsed 0.104 seconds. > -- With dynamic filters off and target partitions set to 1 SET datafusion.execution.parquet.pushdown_filters = true; SET datafusion.optimizer.enable_dynamic_filter_pushdown = false; SET datafusion.execution.target_partitions = 1; SELECT "EventTime" FROM 'benchmarks/data/hits_partitioned/' ORDER BY "EventTime" LIMIT 10; 0 row(s) fetched. Elapsed 0.000 seconds. 0 row(s) fetched. Elapsed 0.000 seconds. 0 row(s) fetched. Elapsed 0.000 seconds. ++ | EventTime | ++
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2957661232 I actually seem to have been able to get the tests passing, it wasn't as complex of a bug as I thought. Still would appreciate π since this is complex bits of logic that are new to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2957654244 @berkaysynnada I think I made some progress. I was able to get the TopK pushdown tests passing by passing around the filter in the `EnforceSorting` rule as is already done for the `fetch` parameter. I struggled to wrap my head around this new to me bit of code and I mostly was taking stabs in the dark to try to understand it / get it to work, so as tests reflect I fear I may have broken some invariant. Could you take a look at 16fd477 and if you think that's a ballpark reasonable approach we schedule a call to try to fix it up together? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on code in PR #15770: URL: https://github.com/apache/datafusion/pull/15770#discussion_r2136896235 ## datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs: ## @@ -114,6 +118,18 @@ fn pushdown_sorts_helper( sort_push_down.data.fetch = fetch; sort_push_down.data.ordering_requirement = Some(OrderingRequirements::from(sort_ordering)); +let filter = plan +.as_any() +.downcast_ref::() +.and_then(|s| s.filter().clone()); Review Comment: This is... not the prettiest. And is a smell that it will be hard for 3rd party plans to implement this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2915135838 No worries I will try to take another look this week - I mainly wanted to make sure we didn't double up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
berkaysynnada commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2915043264 > hi @berkaysynnada any luck getting this last little bit through? anything I can do to help get this across the line? Hi @adriangb. I had to shift my focus to another area and havenβt been able to find the time to finish this yet. If you or anyone else has time to investigate the issue, Iβd really appreciate it. Otherwise, Iβll try to wrap it up this weekend. Thanks for your patience and understanding :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2913206532 hi @berkaysynnada any luck getting this last little bit through? anything I can do to help get this across the line? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
berkaysynnada commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2083518706
##
datafusion/sqllogictest/test_files/prepare.slt:
##
@@ -264,16 +264,19 @@ WHERE run_id = 'foo'
ORDER BY random()
LIMIT $1
-query I
+query error
EXECUTE get_N_rand_ints_from_last_run(1);
-1
+DataFusion error: Internal error: PhysicalOptimizer rule 'LimitPushdown'
failed. Schema mismatch. Expected original schema: Schema { fields: [Field {
name: "id", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }], metadata: {} }, got new schema: Schema { fields: [Field
{ name: "id", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }, Field { name: "run_id", data_type: Utf8, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }.
Review Comment:
Perhaps @jonahgao can make a quick diagnose here, as I cannot debug these
EXECUTE's via "explain" or "explain verbose". I'm also open if someone has any
idea how these changes break the EXECUTE feature
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
berkaysynnada commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2083517075
##
datafusion/physical-optimizer/src/optimizer.rs:
##
@@ -126,6 +119,13 @@ impl PhysicalOptimizer {
// into an `order by max(x) limit y`. In this case it will copy
the limit value down
// to the aggregation, allowing it to use only y number of
accumulators.
Arc::new(TopKAggregation::new()),
+// The FilterPushdown rule tries to push down filters as far as it
can.
+// For example, it will push down filtering from a `FilterExec` to
+// a `DataSourceExec`, or from a `TopK`'s current state to a
`DataSourceExec`.
+Arc::new(FilterPushdown::new()),
Review Comment:
Should we exclude this step if the pushdown config given as false?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2869838330 > The rule issue is not very trivial because we cannot just track and eliminate some hardcoded patterns, since we also need to be aware of upper parts of the plan, and new patterns may emerge as new tests are added, it's not limited to some specific cases. Yes this is unfortunate. Sadly I donβt think thatβs even specific to this change: just the way that rules work and are applied iteratively means thereβs bound to be conflicts. IMO each rule should have an apply and check step where eg it inserts repartitioning then on the way back up checks if itβs still needed and removes it if not. That way at least the logic is self contained with a rule. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2869838533 @berkaysynnada you might need to resolve the conflicts for CI to run -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
berkaysynnada commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2083516712
##
datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt:
##
@@ -229,7 +232,9 @@ EXPLAIN select * from t_pushdown where val != 'c';
logical_plan
01)Filter: t_pushdown.val != Utf8("c")
02)--TableScan: t_pushdown projection=[val, part],
partial_filters=[t_pushdown.val != Utf8("c")]
-physical_plan DataSourceExec: file_groups={3 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]},
projection=[val, part], file_type=parquet, predicate=val@0 != c,
pruning_predicate=val_null_count@2 != row_count@3 AND (val_min@0 != c OR c !=
val_max@1), required_guarantees=[val not in (c)]
+physical_plan
+01)RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
Review Comment:
The last issue is this. I think we need to eventually be able to run
FilterPushdown rule before EnforceSorting.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
berkaysynnada commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2083516280
##
datafusion/sqllogictest/test_files/prepare.slt:
##
@@ -264,16 +264,19 @@ WHERE run_id = 'foo'
ORDER BY random()
LIMIT $1
-query I
+query error
EXECUTE get_N_rand_ints_from_last_run(1);
-1
+DataFusion error: Internal error: PhysicalOptimizer rule 'LimitPushdown'
failed. Schema mismatch. Expected original schema: Schema { fields: [Field {
name: "id", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }], metadata: {} }, got new schema: Schema { fields: [Field
{ name: "id", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }, Field { name: "run_id", data_type: Utf8, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }.
Review Comment:
I'm dealing with these 2 now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2083516211
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -570,6 +680,47 @@ impl TopKHeap {
+ self.store.size()
+ self.owned_bytes
}
+
+fn get_threshold_values(
+&self,
+sort_exprs: &[PhysicalSortExpr],
+) -> Result>> {
+// If the heap doesn't have k elements yet, we can't create thresholds
+let max_row = match self.max() {
+Some(row) => row,
+None => return Ok(None),
+};
+
+// Get the batch that contains the max row
+let batch_entry = match self.store.get(max_row.batch_id) {
+Some(entry) => entry,
+None => return internal_err!("Invalid batch ID in TopKRow"),
+};
+
+// Extract threshold values for each sort expression
+let mut scalar_values = Vec::with_capacity(sort_exprs.len());
+for sort_expr in sort_exprs {
Review Comment:
Maybe? We should measure overhead / execution time for a large case first.
If itβs under a couple hundred ms itβs probably not worth the overhead. If we
do parallelize I assume we can just pipe the asyncness down to here and use
Tokio tasks?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2083515926
##
datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs:
##
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::{Arc, LazyLock};
+
+use arrow::array::{Int32Array, StringArray, StringDictionaryBuilder};
+use arrow::datatypes::Int32Type;
+use arrow::record_batch::RecordBatch;
+use arrow::util::pretty::pretty_format_batches;
+use arrow_schema::{DataType, Field, Schema};
+use datafusion::datasource::listing::{ListingOptions, ListingTable,
ListingTableConfig};
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_datasource::ListingTableUrl;
+use datafusion_datasource_parquet::ParquetFormat;
+use datafusion_execution::object_store::ObjectStoreUrl;
+use itertools::Itertools;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+use parquet::arrow::ArrowWriter;
+use rand::rngs::StdRng;
+use rand::{Rng, SeedableRng};
+use tokio::sync::Mutex;
+use tokio::task::JoinSet;
+
+#[derive(Clone)]
+struct TestDataSet {
+store: Arc,
+schema: Arc,
+}
+
+/// List of in memory parquet files with UTF8 data
+// Use a mutex rather than LazyLock to allow for async initialization
+static TESTFILES: LazyLock>> =
+LazyLock::new(|| Mutex::new(vec![]));
+
+async fn test_files() -> Vec {
+let files_mutex = &TESTFILES;
+let mut files = files_mutex.lock().await;
+if !files.is_empty() {
+return (*files).clone();
+}
+
+let mut rng = StdRng::seed_from_u64(0);
+
+for nulls_in_ids in [false, true] {
+for nulls_in_names in [false, true] {
+for nulls_in_departments in [false, true] {
+let store = Arc::new(InMemory::new());
+
+let schema = Arc::new(Schema::new(vec![
+Field::new("id", DataType::Int32, nulls_in_ids),
+Field::new("name", DataType::Utf8, nulls_in_names),
+Field::new(
+"department",
+DataType::Dictionary(
+Box::new(DataType::Int32),
+Box::new(DataType::Utf8),
+),
+nulls_in_departments,
+),
+]));
+
+let name_choices = if nulls_in_names {
+[Some("Alice"), Some("Bob"), None, Some("David"), None]
+} else {
+[
+Some("Alice"),
+Some("Bob"),
+Some("Charlie"),
+Some("David"),
+Some("Eve"),
+]
+};
+
+let department_choices = if nulls_in_departments {
+[
+Some("Theater"),
+Some("Engineering"),
+None,
+Some("Arts"),
+None,
+]
+} else {
+[
+Some("Theater"),
+Some("Engineering"),
+Some("Healthcare"),
+Some("Arts"),
+Some("Music"),
+]
+};
+
+// Generate 5 files, some with overlapping or repeated ids
some without
+for i in 0..5 {
+let num_batches = rng.gen_range(1..3);
+let mut batches = Vec::with_capacity(num_batches);
+for _ in 0..num_batches {
+let num_rows = 25;
+let ids =
Int32Array::from_iter((0..num_rows).map(|file| {
+if nulls_in_ids {
+if rng.gen_bool(1.0 / 10.0) {
+None
+} else {
+Some(rng.gen_range(file..file + 5))
+}
+} e
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
berkaysynnada commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2083513975
##
datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt:
##
@@ -86,7 +86,8 @@ logical_plan
physical_plan
01)SortPreservingMergeExec: [a@0 ASC NULLS LAST]
02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
-03)DataSourceExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
projection=[a], file_type=parquet, predicate=b@1 > 2,
pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2,
required_guarantees=[]
+03)RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
Review Comment:
I think this is an improvement
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
berkaysynnada commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2869830929 The PR looks really nice BTW, and every line is clearly understandable. Of course there are some possible optimizations (as you've also noticed some of them like the todo during batch insertion), but this initial version is more than enough IMO. The rule issue is not very trivial because we cannot just track and eliminate some hardcoded patterns, since we also need to be aware of upper parts of the plan, and new patterns may emerge as new tests are added, it's not limited to some specific cases. as I mentioned, I might have found another solution, and if it works I'll ping you and we can merge this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
berkaysynnada commented on code in PR #15770:
URL: https://github.com/apache/datafusion/pull/15770#discussion_r2083506643
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -570,6 +680,47 @@ impl TopKHeap {
+ self.store.size()
+ self.owned_bytes
}
+
+fn get_threshold_values(
+&self,
+sort_exprs: &[PhysicalSortExpr],
+) -> Result>> {
+// If the heap doesn't have k elements yet, we can't create thresholds
+let max_row = match self.max() {
+Some(row) => row,
+None => return Ok(None),
+};
+
+// Get the batch that contains the max row
+let batch_entry = match self.store.get(max_row.batch_id) {
+Some(entry) => entry,
+None => return internal_err!("Invalid batch ID in TopKRow"),
+};
+
+// Extract threshold values for each sort expression
+let mut scalar_values = Vec::with_capacity(sort_exprs.len());
+for sort_expr in sort_exprs {
Review Comment:
I wonder if it is worth to make this evaluates in a parallelized way π€
##
datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs:
##
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::{Arc, LazyLock};
+
+use arrow::array::{Int32Array, StringArray, StringDictionaryBuilder};
+use arrow::datatypes::Int32Type;
+use arrow::record_batch::RecordBatch;
+use arrow::util::pretty::pretty_format_batches;
+use arrow_schema::{DataType, Field, Schema};
+use datafusion::datasource::listing::{ListingOptions, ListingTable,
ListingTableConfig};
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_datasource::ListingTableUrl;
+use datafusion_datasource_parquet::ParquetFormat;
+use datafusion_execution::object_store::ObjectStoreUrl;
+use itertools::Itertools;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+use parquet::arrow::ArrowWriter;
+use rand::rngs::StdRng;
+use rand::{Rng, SeedableRng};
+use tokio::sync::Mutex;
+use tokio::task::JoinSet;
+
+#[derive(Clone)]
+struct TestDataSet {
+store: Arc,
+schema: Arc,
+}
+
+/// List of in memory parquet files with UTF8 data
+// Use a mutex rather than LazyLock to allow for async initialization
+static TESTFILES: LazyLock>> =
+LazyLock::new(|| Mutex::new(vec![]));
+
+async fn test_files() -> Vec {
+let files_mutex = &TESTFILES;
+let mut files = files_mutex.lock().await;
+if !files.is_empty() {
+return (*files).clone();
+}
+
+let mut rng = StdRng::seed_from_u64(0);
+
+for nulls_in_ids in [false, true] {
+for nulls_in_names in [false, true] {
+for nulls_in_departments in [false, true] {
+let store = Arc::new(InMemory::new());
+
+let schema = Arc::new(Schema::new(vec![
+Field::new("id", DataType::Int32, nulls_in_ids),
+Field::new("name", DataType::Utf8, nulls_in_names),
+Field::new(
+"department",
+DataType::Dictionary(
+Box::new(DataType::Int32),
+Box::new(DataType::Utf8),
+),
+nulls_in_departments,
+),
+]));
+
+let name_choices = if nulls_in_names {
+[Some("Alice"), Some("Bob"), None, Some("David"), None]
+} else {
+[
+Some("Alice"),
+Some("Bob"),
+Some("Charlie"),
+Some("David"),
+Some("Eve"),
+]
+};
+
+let department_choices = if nulls_in_departments {
+[
+Some("Theater"),
+Some("Engineering"),
+None,
+Some("Arts"),
+None,
+]
+} else {
+[
+
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
berkaysynnada commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2865349477 Update: I tried to re-order the rules like: 1) EnforceSorting 2) FilterPushdown 3) EnforceDistribution 4) CombinePartialFinalAggregate ... 5) CoalesceBatches The issue here is fixed, but some others came up (related with EnforceSorting and EnforceDistribution swap). I will further debug that issue. EnforceSorting and EnforceDistribution are initially designed to operate orthogonally, and there are many tests for that. But it seems somehow we broke it. Unless I can figure out easily to recover that orthogonality feature, I'll manually implement FilterPushdown rule to eliminate ``` CoalesceBatches --Repartition ``` patterns -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
berkaysynnada commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2865382987 > Maybe worth confirming that it is broken independently of this PR (on main)? Yes, it was broken before. One can move EnforceSorting up on EnforceDistribution and see the errors -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] TopK dynamic filter pushdown attempt 2 [datafusion]
adriangb commented on PR #15770: URL: https://github.com/apache/datafusion/pull/15770#issuecomment-2865358754 > EnforceSorting and EnforceDistribution are initially designed to operate orthogonally, and there are many tests for that. But it seems somehow we broke it. Maybe worth confirming that it is broken independently of this PR (on main)? > Unless I can figure out easily to recover that orthogonality feature, I'll manually implement FilterPushdown rule to eliminate Sounds good! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
