Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-22 Thread via GitHub


adriangb merged PR #16433:
URL: https://github.com/apache/datafusion/pull/16433


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-22 Thread via GitHub


adriangb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3215218818

   Benchmarks look good, several faster queries, no queries really slower! I'll 
merge this in the next couple hours if you don't first Andrew.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-22 Thread via GitHub


alamb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3215208209

   πŸ€–: Benchmark completed
   
   Details
   
   
   
   ```
   Comparing HEAD and topk-filters
   
   Benchmark clickbench_extended.json
   
   ┏━━┳━┳━━┳━━━┓
   ┃ Query┃HEAD ┃ topk-filters ┃Change ┃
   ┑━━╇━╇━━╇━━━┩
   β”‚ QQuery 0 β”‚  2639.26 ms β”‚   2604.08 ms β”‚ no change β”‚
   β”‚ QQuery 1 β”‚  1219.97 ms β”‚   1240.23 ms β”‚ no change β”‚
   β”‚ QQuery 2 β”‚  2324.31 ms β”‚   2392.51 ms β”‚ no change β”‚
   β”‚ QQuery 3 β”‚  1207.99 ms β”‚   1172.77 ms β”‚ no change β”‚
   β”‚ QQuery 4 β”‚  2236.03 ms β”‚   2231.24 ms β”‚ no change β”‚
   β”‚ QQuery 5 β”‚ 27457.81 ms β”‚  27345.17 ms β”‚ no change β”‚
   β”‚ QQuery 6 β”‚  4125.85 ms β”‚   4209.18 ms β”‚ no change β”‚
   β”‚ QQuery 7 β”‚  3599.32 ms β”‚   3503.08 ms β”‚ no change β”‚
   β””β”€β”€β”΄β”€β”΄β”€β”€β”΄β”€β”€β”€β”˜
   ┏━┳┓
   ┃ Benchmark Summary   ┃┃
   ┑━╇┩
   β”‚ Total Time (HEAD)   β”‚ 44810.53ms β”‚
   β”‚ Total Time (topk-filters)   β”‚ 44698.26ms β”‚
   β”‚ Average Time (HEAD) β”‚  5601.32ms β”‚
   β”‚ Average Time (topk-filters) β”‚  5587.28ms β”‚
   β”‚ Queries Faster  β”‚  0 β”‚
   β”‚ Queries Slower  β”‚  0 β”‚
   β”‚ Queries with No Change  β”‚  8 β”‚
   β”‚ Queries with Failureβ”‚  0 β”‚
   β””β”€β”΄β”˜
   
   Benchmark clickbench_partitioned.json
   
   ┏━━┳━┳━━┳━━━┓
   ┃ Query┃HEAD ┃ topk-filters ┃Change ┃
   ┑━━╇━╇━━╇━━━┩
   β”‚ QQuery 0 β”‚ 2.12 ms β”‚  2.15 ms β”‚ no change β”‚
   β”‚ QQuery 1 β”‚50.61 ms β”‚ 51.14 ms β”‚ no change β”‚
   β”‚ QQuery 2 β”‚   135.87 ms β”‚137.82 ms β”‚ no change β”‚
   β”‚ QQuery 3 β”‚   162.54 ms β”‚162.41 ms β”‚ no change β”‚
   β”‚ QQuery 4 β”‚  1055.31 ms β”‚   1027.29 ms β”‚ no change β”‚
   β”‚ QQuery 5 β”‚  1475.94 ms β”‚   1472.47 ms β”‚ no change β”‚
   β”‚ QQuery 6 β”‚ 2.13 ms β”‚  2.17 ms β”‚ no change β”‚
   β”‚ QQuery 7 β”‚54.79 ms β”‚ 54.63 ms β”‚ no change β”‚
   β”‚ QQuery 8 β”‚  1472.25 ms β”‚   1422.87 ms β”‚ no change β”‚
   β”‚ QQuery 9 β”‚  1837.78 ms β”‚   1812.93 ms β”‚ no change β”‚
   β”‚ QQuery 10β”‚   369.82 ms β”‚385.13 ms β”‚ no change β”‚
   β”‚ QQuery 11β”‚   428.04 ms β”‚435.19 ms β”‚ no change β”‚
   β”‚ QQuery 12β”‚  1347.17 ms β”‚   1365.75 ms β”‚ no change β”‚
   β”‚ QQuery 13β”‚  2098.07 ms β”‚   2115.15 ms β”‚ no change β”‚
   β”‚ QQuery 14β”‚  1237.31 ms β”‚   1239.03 ms β”‚ no change β”‚
   β”‚ QQuery 15β”‚  1173.37 ms β”‚   1173.14 ms β”‚ no change β”‚
   β”‚ QQuery 16β”‚  2612.56 ms β”‚   2607.94 ms β”‚ no change β”‚
   β”‚ QQuery 17β”‚  2597.59 ms β”‚   2607.24 ms β”‚ no change β”‚
   β”‚ QQuery 18β”‚  5156.33 ms β”‚   4834.22 ms β”‚ +1.07x faster β”‚
   β”‚ QQuery 19β”‚   126.62 ms β”‚128.06 ms β”‚ no change β”‚
   β”‚ QQuery 20β”‚  1961.14 ms β”‚   2007.25 ms β”‚ no change β”‚
   β”‚ QQuery 21β”‚  2279.44 ms β”‚   2322.19 ms β”‚ no change β”‚
   β”‚ QQuery 22β”‚  3932.07 ms β”‚   3939.04 ms β”‚ no change β”‚
   β”‚ QQuery 23β”‚ 15969.33 ms β”‚  12635.10 ms β”‚ +1.26x faster β”‚
   β”‚ QQuery 24β”‚   265.46 ms β”‚208.07 ms β”‚ +1.28x faster β”‚
   β”‚ QQuery 25β”‚   492.34 ms β”‚505.17 ms β”‚ no change β”‚
   β”‚ QQuery 26β”‚   259.62 ms β”‚229.17 ms β”‚ +1.13x faster β”‚
   β”‚ QQuery 27β”‚  2872.01 ms β”‚   2888.04 ms β”‚ no change β”‚
   β”‚ QQuery 28β”‚ 23296.75 ms β”‚  23107.00 ms β”‚ no change β”‚
   β”‚ QQuery 29β”‚   984.75 ms β”‚   1005.99 ms β”‚ no change β”‚
   β”‚ QQuery 30β”‚  1328.94 ms β”‚   1289.57 ms β”‚ no change β”‚
   β”‚ QQuery 31β”‚  1326.20 ms β”‚   1330.12 ms β”‚ no change β”‚
   β”‚ QQuery 32β”‚  4487.03 ms β”‚   4331.85 ms β”‚ no change β”‚
   β”‚ QQuery 33β”‚  5750.40 ms β”‚   5628.25 ms β”‚ no change β”‚
   β”‚ QQuery 34β”‚  5811.68 ms β”‚   5931.34 ms β”‚ no change β”‚
   β”‚ QQuery 35β”‚  2014.76 ms β”‚   2008.45 ms β”‚ no change β”‚
   β”‚ QQuery 36β”‚   122.81 ms β”‚120.62 ms β”‚ no change β”‚
   β”‚ QQuery 37β”‚53.34 ms β”‚ 53.77 ms β”‚ no change β”‚
   β”‚ QQuery 38β”‚   122.13 ms β”‚123.16 ms β”‚ no change β”‚
   β”‚ QQuery 39β”‚   194.75 ms β”‚197.72 ms β”‚ no change β”‚
   β”‚ QQuery 40β”‚43.97 ms β”‚ 43.22 ms β”‚ no change β”‚
   β”‚ QQuery 41β”‚40.40 ms β”‚ 39.76 ms β”‚ no change β”‚
   β”‚ QQuery 42β”‚32.38 ms β”‚ 31.56 ms β”‚ no change β”‚
   β””β”€β”€β”΄β”€β”΄β”€β”€β”΄β”€β”€β”€β”˜
   ┏━┳┓
   ┃ Benchmark Summary   ┃┃
   ┑━╇┩
   β”‚ Total Time (HEAD)   β”‚ 97037.93ms β”‚
   β”‚ Total Time (topk-filters)   β”‚ 93013.16ms β”‚
   β”‚ Average Tim

Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-22 Thread via GitHub


alamb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2294333805


##
datafusion/datasource-parquet/src/opener.rs:
##
@@ -409,12 +409,58 @@ impl FileOpener for ParquetOpener {
 .with_row_groups(row_group_indexes)
 .build()?;
 
-let adapted = stream
-.map_err(|e| ArrowError::ExternalError(Box::new(e)))
-.map(move |maybe_batch| {
-maybe_batch
-.and_then(|b| 
schema_mapping.map_batch(b).map_err(Into::into))
-});
+// Create a stateful stream that can check pruning after each batch
+let adapted = {

Review Comment:
   - https://github.com/apache/datafusion/pull/17293



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-22 Thread via GitHub


alamb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2294236933


##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -319,19 +343,89 @@ impl TopK {
 /// (a > 2 OR (a = 2 AND b < 3))
 /// ```
 fn update_filter(&mut self) -> Result<()> {
-let Some(filter) = &self.filter else {
+// If the heap doesn't have k elements yet, we can't create thresholds
+let Some(max_row) = self.heap.max() else {
 return Ok(());
 };
-let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? 
else {
+
+let new_threshold_row = &max_row.row;
+
+// Extract scalar values BEFORE acquiring lock to reduce critical 
section
+let thresholds = match self.heap.get_threshold_values(&self.expr)? {

Review Comment:
   I think we can move this down after the check for update too:
   - https://github.com/pydantic/datafusion/pull/37



##
datafusion/datasource-parquet/src/opener.rs:
##
@@ -409,12 +409,58 @@ impl FileOpener for ParquetOpener {
 .with_row_groups(row_group_indexes)
 .build()?;
 
-let adapted = stream
-.map_err(|e| ArrowError::ExternalError(Box::new(e)))
-.map(move |maybe_batch| {
-maybe_batch
-.and_then(|b| 
schema_mapping.map_batch(b).map_err(Into::into))
-});
+// Create a stateful stream that can check pruning after each batch
+let adapted = {

Review Comment:
   I found this code somewhat 🀯  (and this function is already 100s of lines 
long) I spent some time refactoring it into its own stream for readability and 
I also understand it better now. I'll put up a follow on PR to extract this 
logic -- no need to do it in this one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-22 Thread via GitHub


alamb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3215116380

   πŸ€–: Benchmark completed
   
   Details
   
   
   
   ```
   Comparing HEAD and topk-filters
   
   Benchmark run_topk_tpch.json
   
   ┏━━┳━━━┳━━┳━━━┓
   ┃ Query┃  HEAD ┃ topk-filters ┃Change ┃
   ┑━━╇━━━╇━━╇━━━┩
   β”‚ Q1   β”‚  36.60 ms β”‚  8.76 ms β”‚ +4.18x faster β”‚
   β”‚ Q2   β”‚  45.45 ms β”‚ 45.55 ms β”‚ no change β”‚
   β”‚ Q3   β”‚ 124.01 ms β”‚125.24 ms β”‚ no change β”‚
   β”‚ Q4   β”‚  51.42 ms β”‚ 48.09 ms β”‚ +1.07x faster β”‚
   β”‚ Q5   β”‚  35.31 ms β”‚ 38.12 ms β”‚  1.08x slower β”‚
   β”‚ Q6   β”‚  66.22 ms β”‚ 67.99 ms β”‚ no change β”‚
   β”‚ Q7   β”‚ 162.63 ms β”‚166.41 ms β”‚ no change β”‚
   β”‚ Q8   β”‚ 106.50 ms β”‚108.96 ms β”‚ no change β”‚
   β”‚ Q9   β”‚ 135.16 ms β”‚135.68 ms β”‚ no change β”‚
   β”‚ Q10  β”‚ 223.96 ms β”‚221.17 ms β”‚ no change β”‚
   β”‚ Q11  β”‚ 113.44 ms β”‚117.74 ms β”‚ no change β”‚
   β””β”€β”€β”΄β”€β”€β”€β”΄β”€β”€β”΄β”€β”€β”€β”˜
   ┏━┳━━━┓
   ┃ Benchmark Summary   ┃   ┃
   ┑━╇━━━┩
   β”‚ Total Time (HEAD)   β”‚ 1100.71ms β”‚
   β”‚ Total Time (topk-filters)   β”‚ 1083.72ms β”‚
   β”‚ Average Time (HEAD) β”‚  100.06ms β”‚
   β”‚ Average Time (topk-filters) β”‚   98.52ms β”‚
   β”‚ Queries Faster  β”‚ 2 β”‚
   β”‚ Queries Slower  β”‚ 1 β”‚
   β”‚ Queries with No Change  β”‚ 8 β”‚
   β”‚ Queries with Failureβ”‚ 0 β”‚
   β””β”€β”΄β”€β”€β”€β”˜
   ```
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-22 Thread via GitHub


alamb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3215116778

   πŸ€– `./gh_compare_branch.sh` [Benchmark 
Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh)
 Running
   Linux aal-dev 6.14.0-1014-gcp #15~24.04.1-Ubuntu SMP Fri Jul 25 23:26:08 UTC 
2025 x86_64 x86_64 x86_64 GNU/Linux
   Comparing topk-filters (d4e2312d2c3b28db15d194bb279577f495f84029) to 
f363e382661a4f45dad2912e9988f1703e46939b 
[diff](https://github.com/apache/datafusion/compare/f363e382661a4f45dad2912e9988f1703e46939b..d4e2312d2c3b28db15d194bb279577f495f84029)
 using:  tpch_mem clickbench_partitioned clickbench_extended
   Results will be posted here when complete
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-22 Thread via GitHub


alamb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3215086665

   πŸ€– `./gh_compare_branch.sh` [Benchmark 
Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh)
 Running
   Linux aal-dev 6.14.0-1014-gcp #15~24.04.1-Ubuntu SMP Fri Jul 25 23:26:08 UTC 
2025 x86_64 x86_64 x86_64 GNU/Linux
   Comparing topk-filters (d4e2312d2c3b28db15d194bb279577f495f84029) to 
f363e382661a4f45dad2912e9988f1703e46939b 
[diff](https://github.com/apache/datafusion/compare/f363e382661a4f45dad2912e9988f1703e46939b..d4e2312d2c3b28db15d194bb279577f495f84029)
 using:  topk_tpch
   Results will be posted here when complete
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-22 Thread via GitHub


alamb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3215086584

   πŸ€–: Benchmark completed
   
   Details
   
   
   
   ```
   Comparing HEAD and topk-filters
   
   Benchmark tpch_mem_sf1.json
   
   ┏━━┳━━━┳━━┳━━━┓
   ┃ Query┃  HEAD ┃ topk-filters ┃Change ┃
   ┑━━╇━━━╇━━╇━━━┩
   β”‚ QQuery 1 β”‚ 169.63 ms β”‚165.07 ms β”‚ no change β”‚
   β”‚ QQuery 2 β”‚  27.55 ms β”‚ 27.70 ms β”‚ no change β”‚
   β”‚ QQuery 3 β”‚  44.86 ms β”‚ 45.11 ms β”‚ no change β”‚
   β”‚ QQuery 4 β”‚  26.77 ms β”‚ 26.41 ms β”‚ no change β”‚
   β”‚ QQuery 5 β”‚  72.18 ms β”‚ 73.65 ms β”‚ no change β”‚
   β”‚ QQuery 6 β”‚  19.67 ms β”‚ 19.50 ms β”‚ no change β”‚
   β”‚ QQuery 7 β”‚ 144.25 ms β”‚139.83 ms β”‚ no change β”‚
   β”‚ QQuery 8 β”‚  30.48 ms β”‚ 33.56 ms β”‚  1.10x slower β”‚
   β”‚ QQuery 9 β”‚  83.26 ms β”‚ 82.54 ms β”‚ no change β”‚
   β”‚ QQuery 10β”‚  57.63 ms β”‚ 58.02 ms β”‚ no change β”‚
   β”‚ QQuery 11β”‚  42.63 ms β”‚ 40.83 ms β”‚ no change β”‚
   β”‚ QQuery 12β”‚  49.97 ms β”‚ 50.12 ms β”‚ no change β”‚
   β”‚ QQuery 13β”‚  44.87 ms β”‚ 46.68 ms β”‚ no change β”‚
   β”‚ QQuery 14β”‚  13.69 ms β”‚ 13.90 ms β”‚ no change β”‚
   β”‚ QQuery 15β”‚  23.59 ms β”‚ 23.69 ms β”‚ no change β”‚
   β”‚ QQuery 16β”‚  23.76 ms β”‚ 23.45 ms β”‚ no change β”‚
   β”‚ QQuery 17β”‚ 141.23 ms β”‚146.02 ms β”‚ no change β”‚
   β”‚ QQuery 18β”‚ 318.80 ms β”‚313.81 ms β”‚ no change β”‚
   β”‚ QQuery 19β”‚  48.11 ms β”‚ 34.80 ms β”‚ +1.38x faster β”‚
   β”‚ QQuery 20β”‚  60.59 ms β”‚ 47.85 ms β”‚ +1.27x faster β”‚
   β”‚ QQuery 21β”‚ 224.40 ms β”‚216.34 ms β”‚ no change β”‚
   β”‚ QQuery 22β”‚  20.00 ms β”‚ 19.36 ms β”‚ no change β”‚
   β””β”€β”€β”΄β”€β”€β”€β”΄β”€β”€β”΄β”€β”€β”€β”˜
   ┏━┳━━━┓
   ┃ Benchmark Summary   ┃   ┃
   ┑━╇━━━┩
   β”‚ Total Time (HEAD)   β”‚ 1687.90ms β”‚
   β”‚ Total Time (topk-filters)   β”‚ 1648.24ms β”‚
   β”‚ Average Time (HEAD) β”‚   76.72ms β”‚
   β”‚ Average Time (topk-filters) β”‚   74.92ms β”‚
   β”‚ Queries Faster  β”‚ 2 β”‚
   β”‚ Queries Slower  β”‚ 1 β”‚
   β”‚ Queries with No Change  β”‚19 β”‚
   β”‚ Queries with Failureβ”‚ 0 β”‚
   β””β”€β”΄β”€β”€β”€β”˜
   ```
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-22 Thread via GitHub


alamb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3215012818

   Looking at it now -- I kicked off the benchmarks again after making some 
changes to my gcp machine that hopefully will make the results more consistent


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-22 Thread via GitHub


alamb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3215011892

   πŸ€– `./gh_compare_branch.sh` [Benchmark 
Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh)
 Running
   Linux aal-dev 6.14.0-1014-gcp #15~24.04.1-Ubuntu SMP Fri Jul 25 23:26:08 UTC 
2025 x86_64 x86_64 x86_64 GNU/Linux
   Comparing topk-filters (0b1cb583b781e99e0163c6caa62b8f735bafdcf8) to 
f363e382661a4f45dad2912e9988f1703e46939b 
[diff](https://github.com/apache/datafusion/compare/f363e382661a4f45dad2912e9988f1703e46939b..0b1cb583b781e99e0163c6caa62b8f735bafdcf8)
 using:  tpch_mem
   Results will be posted here when complete
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-22 Thread via GitHub


adriangb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3214807633

   @alamb I think you've reviewed but not approved, I thought you had approved. 
Can you take another look at this PR when you get a chance? I think I've 
addressed all of the feedback.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-21 Thread via GitHub


adriangb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3210820667

   I've rebased this on `main` and plan to merge it once CI passes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-19 Thread via GitHub


adriangb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3203042277

   @alamb I think most of the feedback has been addressed, thank you for your 
review and improvements


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-19 Thread via GitHub


alamb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3202553698

   πŸ€–: Benchmark completed
   
   Details
   
   
   
   ```
   Comparing HEAD and topk-filters
   
   Benchmark clickbench_extended.json
   
   ┏━━┳━┳━━┳━━━┓
   ┃ Query┃HEAD ┃ topk-filters ┃Change ┃
   ┑━━╇━╇━━╇━━━┩
   β”‚ QQuery 0 β”‚  1941.87 ms β”‚   1911.66 ms β”‚ no change β”‚
   β”‚ QQuery 1 β”‚   745.79 ms β”‚645.87 ms β”‚ +1.15x faster β”‚
   β”‚ QQuery 2 β”‚  1448.99 ms β”‚   1286.31 ms β”‚ +1.13x faster β”‚
   β”‚ QQuery 3 β”‚   620.31 ms β”‚614.76 ms β”‚ no change β”‚
   β”‚ QQuery 4 β”‚  1293.11 ms β”‚   1282.47 ms β”‚ no change β”‚
   β”‚ QQuery 5 β”‚ 13859.62 ms β”‚  13799.54 ms β”‚ no change β”‚
   β”‚ QQuery 6 β”‚  2359.44 ms β”‚   2349.22 ms β”‚ no change β”‚
   β”‚ QQuery 7 β”‚  1800.51 ms β”‚   1811.91 ms β”‚ no change β”‚
   β””β”€β”€β”΄β”€β”΄β”€β”€β”΄β”€β”€β”€β”˜
   ┏━┳┓
   ┃ Benchmark Summary   ┃┃
   ┑━╇┩
   β”‚ Total Time (HEAD)   β”‚ 24069.65ms β”‚
   β”‚ Total Time (topk-filters)   β”‚ 23701.73ms β”‚
   β”‚ Average Time (HEAD) β”‚  3008.71ms β”‚
   β”‚ Average Time (topk-filters) β”‚  2962.72ms β”‚
   β”‚ Queries Faster  β”‚  2 β”‚
   β”‚ Queries Slower  β”‚  0 β”‚
   β”‚ Queries with No Change  β”‚  6 β”‚
   β”‚ Queries with Failureβ”‚  0 β”‚
   β””β”€β”΄β”˜
   
   Benchmark clickbench_partitioned.json
   
   ┏━━┳━┳━━┳━━━┓
   ┃ Query┃HEAD ┃ topk-filters ┃Change ┃
   ┑━━╇━╇━━╇━━━┩
   β”‚ QQuery 0 β”‚ 2.47 ms β”‚  2.23 ms β”‚ +1.11x faster β”‚
   β”‚ QQuery 1 β”‚29.14 ms β”‚ 29.27 ms β”‚ no change β”‚
   β”‚ QQuery 2 β”‚75.23 ms β”‚ 76.03 ms β”‚ no change β”‚
   β”‚ QQuery 3 β”‚95.06 ms β”‚ 89.60 ms β”‚ +1.06x faster β”‚
   β”‚ QQuery 4 β”‚   595.70 ms β”‚594.73 ms β”‚ no change β”‚
   β”‚ QQuery 5 β”‚   862.32 ms β”‚877.24 ms β”‚ no change β”‚
   β”‚ QQuery 6 β”‚ 2.31 ms β”‚  2.18 ms β”‚ +1.06x faster β”‚
   β”‚ QQuery 7 β”‚32.52 ms β”‚ 33.61 ms β”‚ no change β”‚
   β”‚ QQuery 8 β”‚   848.36 ms β”‚863.85 ms β”‚ no change β”‚
   β”‚ QQuery 9 β”‚  1168.82 ms β”‚   1152.74 ms β”‚ no change β”‚
   β”‚ QQuery 10β”‚   229.56 ms β”‚225.21 ms β”‚ no change β”‚
   β”‚ QQuery 11β”‚   254.54 ms β”‚249.47 ms β”‚ no change β”‚
   β”‚ QQuery 12β”‚   882.94 ms β”‚836.94 ms β”‚ +1.05x faster β”‚
   β”‚ QQuery 13β”‚  1234.12 ms β”‚   1189.38 ms β”‚ no change β”‚
   β”‚ QQuery 14β”‚   805.61 ms β”‚789.22 ms β”‚ no change β”‚
   β”‚ QQuery 15β”‚   783.32 ms β”‚783.07 ms β”‚ no change β”‚
   β”‚ QQuery 16β”‚  1593.42 ms β”‚   1584.35 ms β”‚ no change β”‚
   β”‚ QQuery 17β”‚  1586.06 ms β”‚   1578.98 ms β”‚ no change β”‚
   β”‚ QQuery 18β”‚  2818.83 ms β”‚   2850.55 ms β”‚ no change β”‚
   β”‚ QQuery 19β”‚82.25 ms β”‚ 79.62 ms β”‚ no change β”‚
   β”‚ QQuery 20β”‚  1204.54 ms β”‚   1212.60 ms β”‚ no change β”‚
   β”‚ QQuery 21β”‚  1379.57 ms β”‚   1350.79 ms β”‚ no change β”‚
   β”‚ QQuery 22β”‚  2278.24 ms β”‚   2259.20 ms β”‚ no change β”‚
   β”‚ QQuery 23β”‚  7743.86 ms β”‚   6870.55 ms β”‚ +1.13x faster β”‚
   β”‚ QQuery 24β”‚   416.53 ms β”‚405.25 ms β”‚ no change β”‚
   β”‚ QQuery 25β”‚   294.09 ms β”‚288.28 ms β”‚ no change β”‚
   β”‚ QQuery 26β”‚   416.90 ms β”‚405.81 ms β”‚ no change β”‚
   β”‚ QQuery 27β”‚  1617.30 ms β”‚   1592.63 ms β”‚ no change β”‚
   β”‚ QQuery 28β”‚ 11970.14 ms β”‚  11759.08 ms β”‚ no change β”‚
   β”‚ QQuery 29β”‚   508.33 ms β”‚520.47 ms β”‚ no change β”‚
   β”‚ QQuery 30β”‚   787.97 ms β”‚783.59 ms β”‚ no change β”‚
   β”‚ QQuery 31β”‚   787.57 ms β”‚773.39 ms β”‚ no change β”‚
   β”‚ QQuery 32β”‚  2368.78 ms β”‚   2381.14 ms β”‚ no change β”‚
   β”‚ QQuery 33β”‚  3177.45 ms β”‚   3187.93 ms β”‚ no change β”‚
   β”‚ QQuery 34β”‚  3188.98 ms β”‚   3216.57 ms β”‚ no change β”‚
   β”‚ QQuery 35β”‚  1215.92 ms β”‚   1294.23 ms β”‚  1.06x slower β”‚
   β”‚ QQuery 36β”‚   129.68 ms β”‚128.39 ms β”‚ no change β”‚
   β”‚ QQuery 37β”‚56.08 ms β”‚ 56.33 ms β”‚ no change β”‚
   β”‚ QQuery 38β”‚   124.60 ms β”‚127.86 ms β”‚ no change β”‚
   β”‚ QQuery 39β”‚   198.97 ms β”‚209.04 ms β”‚  1.05x slower β”‚
   β”‚ QQuery 40β”‚41.21 ms β”‚ 42.17 ms β”‚ no change β”‚
   β”‚ QQuery 41β”‚38.62 ms β”‚ 39.69 ms β”‚ no change β”‚
   β”‚ QQuery 42β”‚31.94 ms β”‚ 33.52 ms β”‚ no change β”‚
   β””β”€β”€β”΄β”€β”΄β”€β”€β”΄β”€β”€β”€β”˜
   ┏━┳┓
   ┃ Benchmark Summary   ┃┃
   ┑━╇┩
   β”‚ Total Time (HEAD)   β”‚ 53959.86ms β”‚
   β”‚ Total Time 

Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-19 Thread via GitHub


adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2286514826


##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -121,13 +122,37 @@ pub struct TopK {
 /// Common sort prefix between the input and the sort expressions to allow 
early exit optimization
 common_sort_prefix: Arc<[PhysicalSortExpr]>,
 /// Filter matching the state of the `TopK` heap used for dynamic filter 
pushdown
-filter: Option>,
+filter: TopKDynamicFilters,
 /// If true, indicates that all rows of subsequent batches are guaranteed
 /// to be greater (by byte order, after row conversion) than the top K,
 /// which means the top K won't change and the computation can be finished 
early.
 pub(crate) finished: bool,
 }
 
+#[derive(Debug, Clone)]
+pub struct TopKDynamicFilters {
+/// The current *global* threshold for the dynamic filter.
+/// This is shared across all partitions and is updated by any of them.
+/// Stored as row bytes for efficient comparison.
+threshold_row: Arc>>,
+/// The expression used to evaluate the dynamic filter
+expr: Arc,

Review Comment:
   Yep good call! I think otherwise we're bound to run into bugs / 
synchronization  issues. Did it in bcc0dcdc4.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-19 Thread via GitHub


adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2286494820


##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -319,19 +342,84 @@ impl TopK {
 /// (a > 2 OR (a = 2 AND b < 3))
 /// ```
 fn update_filter(&mut self) -> Result<()> {
-let Some(filter) = &self.filter else {
+// If the heap doesn't have k elements yet, we can't create thresholds
+let Some(max_row) = self.heap.max() else {
 return Ok(());
 };
-let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? 
else {
-return Ok(());
+
+let new_threshold_row = &max_row.row;
+
+// Extract scalar values BEFORE acquiring lock to reduce critical 
section
+let thresholds = match self.heap.get_threshold_values(&self.expr)? {
+Some(t) => t,
+None => return Ok(()),
 };
 
+// Extract filter expression reference before entering critical section
+let filter_expr = Arc::clone(&self.filter.expr);
+
+// Check if we need to update the threshold (lock-free read)
+let current_threshold = self.filter.threshold_row.load();
+let needs_update = match current_threshold.as_ref() {
+Some(current_row) => {
+// new < current means new threshold is more selective
+current_row.as_slice().cmp(new_threshold_row) == 
Ordering::Greater
+}
+None => true, // No current threshold, so we need to set one
+};
+
+// Only proceed if we need to update
+if needs_update {
+// Build the filter expression OUTSIDE any synchronization
+let predicate = Self::build_filter_expression(&self.expr, 
thresholds)?;
+let new_threshold_arc = Arc::new(new_threshold_row.to_vec());
+
+// Atomically update the threshold using compare-and-swap
+let old_threshold = self.filter.threshold_row.compare_and_swap(
+Β€t_threshold,
+Some(Arc::clone(&new_threshold_arc)),
+);
+
+// Only update filter if we successfully updated the threshold
+// (or if there was no previous threshold and we're the first)
+let should_update_filter =
+match (old_threshold.as_ref(), current_threshold.as_ref()) {
+// We successfully swapped
+(Some(old), Some(expected)) if Arc::ptr_eq(old, expected) 
=> true,
+// We were the first to set it
+(None, None) => true,
+// Another thread updated before us, check if our 
threshold is still better
+(Some(actual_old), _) => {
+actual_old.as_slice().cmp(new_threshold_row) == 
Ordering::Greater

Review Comment:
   I think you resolved this in your commits, just confirming



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-19 Thread via GitHub


adriangb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3202471933

   > πŸ€–: Benchmark completed
   > 
   > Details
   
   Hmm quite a bit worse than 
https://github.com/apache/datafusion/pull/16433#issuecomment-3201715769. I 
still think we should move forward with this sans ArcSwap and then we can look 
at that in isolation, otherwise it's too hard to know what's responsible for 
performance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-19 Thread via GitHub


alamb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3202450803

   πŸ€– `./gh_compare_branch.sh` [Benchmark 
Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh)
 Running
   Linux aal-dev 6.11.0-1016-gcp #16~24.04.1-Ubuntu SMP Wed May 28 02:40:52 UTC 
2025 x86_64 x86_64 x86_64 GNU/Linux
   Comparing topk-filters (ffe68565b55ba640f9c5197a4f38106ab0460807) to 
8f15991f33bf6aca9d4da8958141b59d196b2ed6 
[diff](https://github.com/apache/datafusion/compare/8f15991f33bf6aca9d4da8958141b59d196b2ed6..ffe68565b55ba640f9c5197a4f38106ab0460807)
 using:  tpch_mem clickbench_partitioned clickbench_extended
   Results will be posted here when complete
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-19 Thread via GitHub


alamb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3202450580

   πŸ€–: Benchmark completed
   
   Details
   
   
   
   ```
   Comparing HEAD and topk-filters
   
   Benchmark run_topk_tpch.json
   
   ┏━━┳━━━┳━━┳━━━┓
   ┃ Query┃  HEAD ┃ topk-filters ┃Change ┃
   ┑━━╇━━━╇━━╇━━━┩
   β”‚ Q1   β”‚  24.01 ms β”‚  8.64 ms β”‚ +2.78x faster β”‚
   β”‚ Q2   β”‚  30.33 ms β”‚ 30.83 ms β”‚ no change β”‚
   β”‚ Q3   β”‚  88.89 ms β”‚ 98.89 ms β”‚  1.11x slower β”‚
   β”‚ Q4   β”‚  34.17 ms β”‚ 39.35 ms β”‚  1.15x slower β”‚
   β”‚ Q5   β”‚  23.42 ms β”‚ 27.51 ms β”‚  1.17x slower β”‚
   β”‚ Q6   β”‚  48.61 ms β”‚ 48.06 ms β”‚ no change β”‚
   β”‚ Q7   β”‚ 121.79 ms β”‚121.96 ms β”‚ no change β”‚
   β”‚ Q8   β”‚  90.35 ms β”‚ 69.09 ms β”‚ +1.31x faster β”‚
   β”‚ Q9   β”‚  93.10 ms β”‚ 91.78 ms β”‚ no change β”‚
   β”‚ Q10  β”‚ 163.82 ms β”‚170.23 ms β”‚ no change β”‚
   β”‚ Q11  β”‚  78.22 ms β”‚ 13.29 ms β”‚ +5.89x faster β”‚
   β””β”€β”€β”΄β”€β”€β”€β”΄β”€β”€β”΄β”€β”€β”€β”˜
   ┏━┳━━┓
   ┃ Benchmark Summary   ┃  ┃
   ┑━╇━━┩
   β”‚ Total Time (HEAD)   β”‚ 796.69ms β”‚
   β”‚ Total Time (topk-filters)   β”‚ 719.62ms β”‚
   β”‚ Average Time (HEAD) β”‚  72.43ms β”‚
   β”‚ Average Time (topk-filters) β”‚  65.42ms β”‚
   β”‚ Queries Faster  β”‚3 β”‚
   β”‚ Queries Slower  β”‚3 β”‚
   β”‚ Queries with No Change  β”‚5 β”‚
   β”‚ Queries with Failureβ”‚0 β”‚
   β””β”€β”΄β”€β”€β”˜
   ```
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-19 Thread via GitHub


alamb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3202449463

   πŸ€– `./gh_compare_branch.sh` [Benchmark 
Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh)
 Running
   Linux aal-dev 6.11.0-1016-gcp #16~24.04.1-Ubuntu SMP Wed May 28 02:40:52 UTC 
2025 x86_64 x86_64 x86_64 GNU/Linux
   Comparing topk-filters (ffe68565b55ba640f9c5197a4f38106ab0460807) to 
8f15991f33bf6aca9d4da8958141b59d196b2ed6 
[diff](https://github.com/apache/datafusion/compare/8f15991f33bf6aca9d4da8958141b59d196b2ed6..ffe68565b55ba640f9c5197a4f38106ab0460807)
 using:  topk_tpch
   Results will be posted here when complete
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-19 Thread via GitHub


alamb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3202449210

   πŸ€–: Benchmark completed
   
   Details
   
   
   
   ```
   Comparing HEAD and topk-filters
   
   Benchmark tpch_mem_sf1.json
   
   ┏━━┳━━━┳━━┳━━━┓
   ┃ Query┃  HEAD ┃ topk-filters ┃Change ┃
   ┑━━╇━━━╇━━╇━━━┩
   β”‚ QQuery 1 β”‚  98.51 ms β”‚ 98.91 ms β”‚ no change β”‚
   β”‚ QQuery 2 β”‚  20.91 ms β”‚ 20.89 ms β”‚ no change β”‚
   β”‚ QQuery 3 β”‚  31.83 ms β”‚ 32.16 ms β”‚ no change β”‚
   β”‚ QQuery 4 β”‚  18.38 ms β”‚ 18.19 ms β”‚ no change β”‚
   β”‚ QQuery 5 β”‚  49.14 ms β”‚ 49.23 ms β”‚ no change β”‚
   β”‚ QQuery 6 β”‚  11.96 ms β”‚ 11.79 ms β”‚ no change β”‚
   β”‚ QQuery 7 β”‚  88.37 ms β”‚ 85.17 ms β”‚ no change β”‚
   β”‚ QQuery 8 β”‚  24.71 ms β”‚ 23.62 ms β”‚ no change β”‚
   β”‚ QQuery 9 β”‚  52.90 ms β”‚ 53.36 ms β”‚ no change β”‚
   β”‚ QQuery 10β”‚  40.20 ms β”‚ 39.90 ms β”‚ no change β”‚
   β”‚ QQuery 11β”‚  39.34 ms β”‚ 38.67 ms β”‚ no change β”‚
   β”‚ QQuery 12β”‚  29.89 ms β”‚ 30.23 ms β”‚ no change β”‚
   β”‚ QQuery 13β”‚  25.64 ms β”‚ 26.43 ms β”‚ no change β”‚
   β”‚ QQuery 14β”‚   9.81 ms β”‚  9.54 ms β”‚ no change β”‚
   β”‚ QQuery 15β”‚  19.13 ms β”‚ 18.48 ms β”‚ no change β”‚
   β”‚ QQuery 16β”‚  17.07 ms β”‚ 17.31 ms β”‚ no change β”‚
   β”‚ QQuery 17β”‚  94.55 ms β”‚ 93.25 ms β”‚ no change β”‚
   β”‚ QQuery 18β”‚ 189.44 ms β”‚188.32 ms β”‚ no change β”‚
   β”‚ QQuery 19β”‚  23.73 ms β”‚ 24.38 ms β”‚ no change β”‚
   β”‚ QQuery 20β”‚  31.93 ms β”‚ 30.80 ms β”‚ no change β”‚
   β”‚ QQuery 21β”‚ 141.87 ms β”‚139.60 ms β”‚ no change β”‚
   β”‚ QQuery 22β”‚  13.79 ms β”‚ 14.01 ms β”‚ no change β”‚
   β””β”€β”€β”΄β”€β”€β”€β”΄β”€β”€β”΄β”€β”€β”€β”˜
   ┏━┳━━━┓
   ┃ Benchmark Summary   ┃   ┃
   ┑━╇━━━┩
   β”‚ Total Time (HEAD)   β”‚ 1073.08ms β”‚
   β”‚ Total Time (topk-filters)   β”‚ 1064.25ms β”‚
   β”‚ Average Time (HEAD) β”‚   48.78ms β”‚
   β”‚ Average Time (topk-filters) β”‚   48.37ms β”‚
   β”‚ Queries Faster  β”‚ 0 β”‚
   β”‚ Queries Slower  β”‚ 0 β”‚
   β”‚ Queries with No Change  β”‚22 β”‚
   β”‚ Queries with Failureβ”‚ 0 β”‚
   β””β”€β”΄β”€β”€β”€β”˜
   ```
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-19 Thread via GitHub


alamb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3202448125

   πŸ€– `./gh_compare_branch.sh` [Benchmark 
Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh)
 Running
   Linux aal-dev 6.11.0-1016-gcp #16~24.04.1-Ubuntu SMP Wed May 28 02:40:52 UTC 
2025 x86_64 x86_64 x86_64 GNU/Linux
   Comparing topk-filters (ffe68565b55ba640f9c5197a4f38106ab0460807) to 
8f15991f33bf6aca9d4da8958141b59d196b2ed6 
[diff](https://github.com/apache/datafusion/compare/8f15991f33bf6aca9d4da8958141b59d196b2ed6..ffe68565b55ba640f9c5197a4f38106ab0460807)
 using:  tpch_mem
   Results will be posted here when complete
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-19 Thread via GitHub


alamb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2286474109


##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -137,7 +139,7 @@ impl DynamicFilterPhysicalExpr {
 Self {
 children,
 remapped_children: None, // Initially no remapped children
-inner: Arc::new(RwLock::new(Inner::new(inner))),
+inner: Arc::new(ArcSwap::from_pointee(Inner::new(inner))),

Review Comment:
   I maybe have been burned one too many times by random maintainers on the 
internet deciding to disappear, etc and I am trying to keep the dependency load 
down for DataFusion. I realize I may be over sensitive on this topic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-19 Thread via GitHub


adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2286456332


##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -121,13 +122,37 @@ pub struct TopK {
 /// Common sort prefix between the input and the sort expressions to allow 
early exit optimization
 common_sort_prefix: Arc<[PhysicalSortExpr]>,
 /// Filter matching the state of the `TopK` heap used for dynamic filter 
pushdown
-filter: Option>,
+filter: TopKDynamicFilters,
 /// If true, indicates that all rows of subsequent batches are guaranteed
 /// to be greater (by byte order, after row conversion) than the top K,
 /// which means the top K won't change and the computation can be finished 
early.
 pub(crate) finished: bool,
 }
 
+#[derive(Debug, Clone)]
+pub struct TopKDynamicFilters {

Review Comment:
   Yes because it gets passed into `TopK::try_new` which is currently public. 
Maybe `TopK` should be `pub(crate)` then we could make the whole thing 
`pub(crate)` but as it stands that's a breaking change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-19 Thread via GitHub


adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2286456332


##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -121,13 +122,37 @@ pub struct TopK {
 /// Common sort prefix between the input and the sort expressions to allow 
early exit optimization
 common_sort_prefix: Arc<[PhysicalSortExpr]>,
 /// Filter matching the state of the `TopK` heap used for dynamic filter 
pushdown
-filter: Option>,
+filter: TopKDynamicFilters,
 /// If true, indicates that all rows of subsequent batches are guaranteed
 /// to be greater (by byte order, after row conversion) than the top K,
 /// which means the top K won't change and the computation can be finished 
early.
 pub(crate) finished: bool,
 }
 
+#[derive(Debug, Clone)]
+pub struct TopKDynamicFilters {

Review Comment:
   Yes because it gets passed into `TopK::try_new`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-19 Thread via GitHub


adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2286448656


##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -137,7 +139,7 @@ impl DynamicFilterPhysicalExpr {
 Self {
 children,
 remapped_children: None, // Initially no remapped children
-inner: Arc::new(RwLock::new(Inner::new(inner))),
+inner: Arc::new(ArcSwap::from_pointee(Inner::new(inner))),

Review Comment:
   > it seems to have plenty of dependencies 
https://crates.io/crates/arc-swap/1.7.1/dependencies
   
   I see no required dependencies. Only dev deps.
   
   > And its last release was over a year ago: 
https://crates.io/crates/arc-swap/1.7.1
   
   Personally I don't see any issue with this: I'd look at the ratio of open 
issues / obvious bugs vs. releases. If there's no features to add and no major 
bugs having few releases can be a good thing πŸ˜ƒ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-19 Thread via GitHub


alamb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2286140587


##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -214,41 +239,39 @@ impl TopK {
 
 let mut selected_rows = None;
 
-if let Some(filter) = self.filter.as_ref() {
-// If a filter is provided, update it with the new rows
-let filter = filter.current()?;
-let filtered = filter.evaluate(&batch)?;
-let num_rows = batch.num_rows();
-let array = filtered.into_array(num_rows)?;
-let mut filter = array.as_boolean().clone();
-let true_count = filter.true_count();
-if true_count == 0 {
-// nothing to filter, so no need to update
-return Ok(());
+// If a filter is provided, update it with the new rows
+let filter = self.filter.expr.current()?;
+let filtered = filter.evaluate(&batch)?;

Review Comment:
   A minor nit for a follow on would be to move this code into its own function 
like `TopK::update_dynamic_filter` or something to make it easier to understand



##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -137,7 +139,7 @@ impl DynamicFilterPhysicalExpr {
 Self {
 children,
 remapped_children: None, // Initially no remapped children
-inner: Arc::new(RwLock::new(Inner::new(inner))),
+inner: Arc::new(ArcSwap::from_pointee(Inner::new(inner))),

Review Comment:
   I made a PR to show what this looks like without arc_swap
   - https://github.com/apache/datafusion/pull/17245



##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -121,13 +122,37 @@ pub struct TopK {
 /// Common sort prefix between the input and the sort expressions to allow 
early exit optimization
 common_sort_prefix: Arc<[PhysicalSortExpr]>,
 /// Filter matching the state of the `TopK` heap used for dynamic filter 
pushdown
-filter: Option>,
+filter: TopKDynamicFilters,
 /// If true, indicates that all rows of subsequent batches are guaranteed
 /// to be greater (by byte order, after row conversion) than the top K,
 /// which means the top K won't change and the computation can be finished 
early.
 pub(crate) finished: bool,
 }
 
+#[derive(Debug, Clone)]
+pub struct TopKDynamicFilters {

Review Comment:
   does it need to be pub?



##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -319,19 +342,84 @@ impl TopK {
 /// (a > 2 OR (a = 2 AND b < 3))
 /// ```
 fn update_filter(&mut self) -> Result<()> {
-let Some(filter) = &self.filter else {
+// If the heap doesn't have k elements yet, we can't create thresholds
+let Some(max_row) = self.heap.max() else {
 return Ok(());
 };
-let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? 
else {
-return Ok(());
+
+let new_threshold_row = &max_row.row;
+
+// Extract scalar values BEFORE acquiring lock to reduce critical 
section
+let thresholds = match self.heap.get_threshold_values(&self.expr)? {
+Some(t) => t,
+None => return Ok(()),
 };
 
+// Extract filter expression reference before entering critical section
+let filter_expr = Arc::clone(&self.filter.expr);
+
+// Check if we need to update the threshold (lock-free read)
+let current_threshold = self.filter.threshold_row.load();
+let needs_update = match current_threshold.as_ref() {
+Some(current_row) => {
+// new < current means new threshold is more selective
+current_row.as_slice().cmp(new_threshold_row) == 
Ordering::Greater
+}
+None => true, // No current threshold, so we need to set one
+};
+
+// Only proceed if we need to update
+if needs_update {
+// Build the filter expression OUTSIDE any synchronization
+let predicate = Self::build_filter_expression(&self.expr, 
thresholds)?;
+let new_threshold_arc = Arc::new(new_threshold_row.to_vec());
+
+// Atomically update the threshold using compare-and-swap
+let old_threshold = self.filter.threshold_row.compare_and_swap(
+Β€t_threshold,
+Some(Arc::clone(&new_threshold_arc)),
+);
+
+// Only update filter if we successfully updated the threshold
+// (or if there was no previous threshold and we're the first)
+let should_update_filter =
+match (old_threshold.as_ref(), current_threshold.as_ref()) {
+// We successfully swapped
+(Some(old), Some(expected)) if Arc::ptr_eq(old, expected) 
=> true,
+// We were the first to set it
+(None, None) => true,
+  

Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-19 Thread via GitHub


alamb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3201878567

   🌢️ 🌢️ 🌢️ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-19 Thread via GitHub


adriangb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3201759854

   > Q11  β”‚  80.40 ms β”‚ 11.82 ms β”‚ +6.80x faster
   
   
![image](https://github.com/user-attachments/assets/ac6cf0d6-f7a4-40b9-b595-0f693f1d8f72)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-19 Thread via GitHub


alamb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3201715769

   πŸ€–: Benchmark completed
   
   Details
   
   
   
   ```
   Comparing HEAD and topk-filters
   
   Benchmark run_topk_tpch.json
   
   ┏━━┳━━━┳━━┳━━━┓
   ┃ Query┃  HEAD ┃ topk-filters ┃Change ┃
   ┑━━╇━━━╇━━╇━━━┩
   β”‚ Q1   β”‚  24.21 ms β”‚  7.05 ms β”‚ +3.43x faster β”‚
   β”‚ Q2   β”‚  31.06 ms β”‚ 30.96 ms β”‚ no change β”‚
   β”‚ Q3   β”‚ 100.83 ms β”‚ 92.74 ms β”‚ +1.09x faster β”‚
   β”‚ Q4   β”‚  40.93 ms β”‚ 33.97 ms β”‚ +1.20x faster β”‚
   β”‚ Q5   β”‚  23.65 ms β”‚ 27.52 ms β”‚  1.16x slower β”‚
   β”‚ Q6   β”‚  45.75 ms β”‚ 49.21 ms β”‚  1.08x slower β”‚
   β”‚ Q7   β”‚ 119.32 ms β”‚125.34 ms β”‚  1.05x slower β”‚
   β”‚ Q8   β”‚  93.13 ms β”‚ 71.32 ms β”‚ +1.31x faster β”‚
   β”‚ Q9   β”‚  94.83 ms β”‚ 92.51 ms β”‚ no change β”‚
   β”‚ Q10  β”‚ 166.33 ms β”‚160.07 ms β”‚ no change β”‚
   β”‚ Q11  β”‚  80.40 ms β”‚ 11.82 ms β”‚ +6.80x faster β”‚
   β””β”€β”€β”΄β”€β”€β”€β”΄β”€β”€β”΄β”€β”€β”€β”˜
   ┏━┳━━┓
   ┃ Benchmark Summary   ┃  ┃
   ┑━╇━━┩
   β”‚ Total Time (HEAD)   β”‚ 820.45ms β”‚
   β”‚ Total Time (topk-filters)   β”‚ 702.52ms β”‚
   β”‚ Average Time (HEAD) β”‚  74.59ms β”‚
   β”‚ Average Time (topk-filters) β”‚  63.87ms β”‚
   β”‚ Queries Faster  β”‚5 β”‚
   β”‚ Queries Slower  β”‚3 β”‚
   β”‚ Queries with No Change  β”‚3 β”‚
   β”‚ Queries with Failureβ”‚0 β”‚
   β””β”€β”΄β”€β”€β”˜
   ```
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-19 Thread via GitHub


alamb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3201681965

   Given the nice performance improvements this seems to provide, I plan to 
review this PR carefully in the next day or two


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-19 Thread via GitHub


alamb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3201680033

   πŸ€– `./gh_compare_branch.sh` [Benchmark 
Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh)
 Running
   Linux aal-dev 6.11.0-1016-gcp #16~24.04.1-Ubuntu SMP Wed May 28 02:40:52 UTC 
2025 x86_64 x86_64 x86_64 GNU/Linux
   Comparing topk-filters (3945aba663062debc24757c857a8bd679ad8b223) to 
8f15991f33bf6aca9d4da8958141b59d196b2ed6 
[diff](https://github.com/apache/datafusion/compare/8f15991f33bf6aca9d4da8958141b59d196b2ed6..3945aba663062debc24757c857a8bd679ad8b223)
 using:  topk_tpch
   Results will be posted here when complete
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-18 Thread via GitHub


adriangb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3197984660

   > QQuery 23β”‚  7751.39 ms β”‚   6855.54 ms β”‚ +1.13x faster
   > QQuery 23β”‚  7690.27 ms β”‚   6835.65 ms β”‚ +1.13x faster
   
   I'm guessing this is mostly real πŸ˜„ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-18 Thread via GitHub


alamb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3197955377

   πŸ€–: Benchmark completed
   
   Details
   
   
   
   ```
   Comparing HEAD and topk-filters
   
   Benchmark clickbench_extended.json
   
   ┏━━┳━┳━━┳━━━┓
   ┃ Query┃HEAD ┃ topk-filters ┃Change ┃
   ┑━━╇━╇━━╇━━━┩
   β”‚ QQuery 0 β”‚  2053.78 ms β”‚   1974.31 ms β”‚ no change β”‚
   β”‚ QQuery 1 β”‚   735.78 ms β”‚660.02 ms β”‚ +1.11x faster β”‚
   β”‚ QQuery 2 β”‚  1460.75 ms β”‚   1307.73 ms β”‚ +1.12x faster β”‚
   β”‚ QQuery 3 β”‚   623.71 ms β”‚617.23 ms β”‚ no change β”‚
   β”‚ QQuery 4 β”‚  1285.31 ms β”‚   1298.10 ms β”‚ no change β”‚
   β”‚ QQuery 5 β”‚ 13863.88 ms β”‚  14128.38 ms β”‚ no change β”‚
   β”‚ QQuery 6 β”‚  2383.77 ms β”‚   2341.30 ms β”‚ no change β”‚
   β”‚ QQuery 7 β”‚  1951.39 ms β”‚   1953.50 ms β”‚ no change β”‚
   β””β”€β”€β”΄β”€β”΄β”€β”€β”΄β”€β”€β”€β”˜
   ┏━┳┓
   ┃ Benchmark Summary   ┃┃
   ┑━╇┩
   β”‚ Total Time (HEAD)   β”‚ 24358.37ms β”‚
   β”‚ Total Time (topk-filters)   β”‚ 24280.57ms β”‚
   β”‚ Average Time (HEAD) β”‚  3044.80ms β”‚
   β”‚ Average Time (topk-filters) β”‚  3035.07ms β”‚
   β”‚ Queries Faster  β”‚  2 β”‚
   β”‚ Queries Slower  β”‚  0 β”‚
   β”‚ Queries with No Change  β”‚  6 β”‚
   β”‚ Queries with Failureβ”‚  0 β”‚
   β””β”€β”΄β”˜
   
   Benchmark clickbench_partitioned.json
   
   ┏━━┳━┳━━┳━━━┓
   ┃ Query┃HEAD ┃ topk-filters ┃Change ┃
   ┑━━╇━╇━━╇━━━┩
   β”‚ QQuery 0 β”‚ 2.30 ms β”‚  2.32 ms β”‚ no change β”‚
   β”‚ QQuery 1 β”‚29.71 ms β”‚ 28.77 ms β”‚ no change β”‚
   β”‚ QQuery 2 β”‚77.53 ms β”‚ 76.14 ms β”‚ no change β”‚
   β”‚ QQuery 3 β”‚94.53 ms β”‚ 90.05 ms β”‚ no change β”‚
   β”‚ QQuery 4 β”‚   597.57 ms β”‚609.85 ms β”‚ no change β”‚
   β”‚ QQuery 5 β”‚   880.91 ms β”‚881.20 ms β”‚ no change β”‚
   β”‚ QQuery 6 β”‚ 2.22 ms β”‚  2.35 ms β”‚  1.05x slower β”‚
   β”‚ QQuery 7 β”‚31.73 ms β”‚ 33.48 ms β”‚  1.06x slower β”‚
   β”‚ QQuery 8 β”‚   861.09 ms β”‚883.18 ms β”‚ no change β”‚
   β”‚ QQuery 9 β”‚  1167.22 ms β”‚   1144.09 ms β”‚ no change β”‚
   β”‚ QQuery 10β”‚   233.16 ms β”‚227.48 ms β”‚ no change β”‚
   β”‚ QQuery 11β”‚   263.75 ms β”‚259.39 ms β”‚ no change β”‚
   β”‚ QQuery 12β”‚   865.53 ms β”‚865.13 ms β”‚ no change β”‚
   β”‚ QQuery 13β”‚  1180.94 ms β”‚   1231.48 ms β”‚ no change β”‚
   β”‚ QQuery 14β”‚   809.24 ms β”‚809.07 ms β”‚ no change β”‚
   β”‚ QQuery 15β”‚   775.27 ms β”‚777.78 ms β”‚ no change β”‚
   β”‚ QQuery 16β”‚  1619.25 ms β”‚   1639.14 ms β”‚ no change β”‚
   β”‚ QQuery 17β”‚  1608.47 ms β”‚   1643.10 ms β”‚ no change β”‚
   β”‚ QQuery 18β”‚  2875.25 ms β”‚   3002.07 ms β”‚ no change β”‚
   β”‚ QQuery 19β”‚79.14 ms β”‚ 81.14 ms β”‚ no change β”‚
   β”‚ QQuery 20β”‚  1227.30 ms β”‚   1234.70 ms β”‚ no change β”‚
   β”‚ QQuery 21β”‚  1377.70 ms β”‚   1356.98 ms β”‚ no change β”‚
   β”‚ QQuery 22β”‚  2268.17 ms β”‚   2238.49 ms β”‚ no change β”‚
   β”‚ QQuery 23β”‚  7690.27 ms β”‚   6835.65 ms β”‚ +1.13x faster β”‚
   β”‚ QQuery 24β”‚   426.90 ms β”‚417.85 ms β”‚ no change β”‚
   β”‚ QQuery 25β”‚   296.39 ms β”‚294.12 ms β”‚ no change β”‚
   β”‚ QQuery 26β”‚   428.93 ms β”‚417.70 ms β”‚ no change β”‚
   β”‚ QQuery 27β”‚  1618.21 ms β”‚   1573.12 ms β”‚ no change β”‚
   β”‚ QQuery 28β”‚ 12002.03 ms β”‚  12016.65 ms β”‚ no change β”‚
   β”‚ QQuery 29β”‚   520.45 ms β”‚523.19 ms β”‚ no change β”‚
   β”‚ QQuery 30β”‚   774.86 ms β”‚761.23 ms β”‚ no change β”‚
   β”‚ QQuery 31β”‚   794.38 ms β”‚783.65 ms β”‚ no change β”‚
   β”‚ QQuery 32β”‚  2461.37 ms β”‚   2452.40 ms β”‚ no change β”‚
   β”‚ QQuery 33β”‚  3218.56 ms β”‚   3214.72 ms β”‚ no change β”‚
   β”‚ QQuery 34β”‚  3200.35 ms β”‚   3223.54 ms β”‚ no change β”‚
   β”‚ QQuery 35β”‚  1267.63 ms β”‚   1276.62 ms β”‚ no change β”‚
   β”‚ QQuery 36β”‚   125.57 ms β”‚121.93 ms β”‚ no change β”‚
   β”‚ QQuery 37β”‚53.62 ms β”‚ 53.02 ms β”‚ no change β”‚
   β”‚ QQuery 38β”‚   127.07 ms β”‚124.19 ms β”‚ no change β”‚
   β”‚ QQuery 39β”‚   206.77 ms β”‚199.55 ms β”‚ no change β”‚
   β”‚ QQuery 40β”‚41.71 ms β”‚ 41.96 ms β”‚ no change β”‚
   β”‚ QQuery 41β”‚39.44 ms β”‚ 38.61 ms β”‚ no change β”‚
   β”‚ QQuery 42β”‚32.99 ms β”‚ 32.51 ms β”‚ no change β”‚
   β””β”€β”€β”΄β”€β”΄β”€β”€β”΄β”€β”€β”€β”˜
   ┏━┳┓
   ┃ Benchmark Summary   ┃┃
   ┑━╇┩
   β”‚ Total Time (HEAD)   β”‚ 54255.46ms β”‚
   β”‚ Total Time 

Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-18 Thread via GitHub


Dandandan commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3197945496

   Really nice πŸš€ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-18 Thread via GitHub


adriangb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3197906730

   @alamb I feel like the interesting benchmark to run is `topk_tpch`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-18 Thread via GitHub


alamb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3197868163

   πŸ€– `./gh_compare_branch.sh` [Benchmark 
Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh)
 Running
   Linux aal-dev 6.11.0-1016-gcp #16~24.04.1-Ubuntu SMP Wed May 28 02:40:52 UTC 
2025 x86_64 x86_64 x86_64 GNU/Linux
   Comparing topk-filters (3945aba663062debc24757c857a8bd679ad8b223) to 
8f15991f33bf6aca9d4da8958141b59d196b2ed6 
[diff](https://github.com/apache/datafusion/compare/8f15991f33bf6aca9d4da8958141b59d196b2ed6..3945aba663062debc24757c857a8bd679ad8b223)
 using:  tpch_mem clickbench_partitioned clickbench_extended
   Results will be posted here when complete
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-18 Thread via GitHub


adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2282817078


##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -137,7 +139,7 @@ impl DynamicFilterPhysicalExpr {
 Self {
 children,
 remapped_children: None, // Initially no remapped children
-inner: Arc::new(RwLock::new(Inner::new(inner))),
+inner: Arc::new(ArcSwap::from_pointee(Inner::new(inner))),

Review Comment:
   Well I stumbled upon it because we originally had some performance 
regressions and I suspected they might be due to hitting the `Arc>`s a 
lot heavier. It seems that `ArcSwap` is the solution to that but I haven't 
benchmarked to confirm the performance impact / if it's that or other 
refactors. From a micro benchmark sitauation it makes sense. I can make a 
branch / another PR without ArcSwap and we can see if there's a measurable 
change at the macro level, but I expect it may be hard to measure.
   
   That said it's a well maintained dep from a reputable maintainer, has a lot 
of usage and has no dependencies of its own.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-18 Thread via GitHub


adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2282817078


##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -137,7 +139,7 @@ impl DynamicFilterPhysicalExpr {
 Self {
 children,
 remapped_children: None, // Initially no remapped children
-inner: Arc::new(RwLock::new(Inner::new(inner))),
+inner: Arc::new(ArcSwap::from_pointee(Inner::new(inner))),

Review Comment:
   Well I stumbled upon it because we originally had some performance 
regressions and I suspected they might be due to hitting the `Arc>`s a 
lot heavier. It seems that `ArcSwap` is the solution to that but I haven't 
benchmarked to confirm the performance impact / if it's that or other 
refactors. From a micro benchmark sitauation it makes sense. I can make a 
branch / another PR without ArcSwap and we can see if there's a measurable 
change at the macro level.
   
   That said it's a well maintained dep from a reputable maintainer, has a lot 
of usage and has no dependencies of its own.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-18 Thread via GitHub


alamb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3197557763

   πŸ€–: Benchmark completed
   
   Details
   
   
   
   ```
   Comparing HEAD and topk-filters
   
   Benchmark clickbench_extended.json
   
   ┏━━┳━┳━━┳━━━┓
   ┃ Query┃HEAD ┃ topk-filters ┃Change ┃
   ┑━━╇━╇━━╇━━━┩
   β”‚ QQuery 0 β”‚  2098.75 ms β”‚   2062.77 ms β”‚ no change β”‚
   β”‚ QQuery 1 β”‚   736.19 ms β”‚636.32 ms β”‚ +1.16x faster β”‚
   β”‚ QQuery 2 β”‚  1447.86 ms β”‚   1301.32 ms β”‚ +1.11x faster β”‚
   β”‚ QQuery 3 β”‚   640.39 ms β”‚605.76 ms β”‚ +1.06x faster β”‚
   β”‚ QQuery 4 β”‚  1297.18 ms β”‚   1346.75 ms β”‚ no change β”‚
   β”‚ QQuery 5 β”‚ 13751.05 ms β”‚  14220.36 ms β”‚ no change β”‚
   β”‚ QQuery 6 β”‚  2400.54 ms β”‚   2335.19 ms β”‚ no change β”‚
   β”‚ QQuery 7 β”‚  1933.90 ms β”‚   2000.55 ms β”‚ no change β”‚
   β””β”€β”€β”΄β”€β”΄β”€β”€β”΄β”€β”€β”€β”˜
   ┏━┳┓
   ┃ Benchmark Summary   ┃┃
   ┑━╇┩
   β”‚ Total Time (HEAD)   β”‚ 24305.85ms β”‚
   β”‚ Total Time (topk-filters)   β”‚ 24509.01ms β”‚
   β”‚ Average Time (HEAD) β”‚  3038.23ms β”‚
   β”‚ Average Time (topk-filters) β”‚  3063.63ms β”‚
   β”‚ Queries Faster  β”‚  3 β”‚
   β”‚ Queries Slower  β”‚  0 β”‚
   β”‚ Queries with No Change  β”‚  5 β”‚
   β”‚ Queries with Failureβ”‚  0 β”‚
   β””β”€β”΄β”˜
   
   Benchmark clickbench_partitioned.json
   
   ┏━━┳━┳━━┳━━━┓
   ┃ Query┃HEAD ┃ topk-filters ┃Change ┃
   ┑━━╇━╇━━╇━━━┩
   β”‚ QQuery 0 β”‚ 2.29 ms β”‚  2.43 ms β”‚  1.06x slower β”‚
   β”‚ QQuery 1 β”‚28.83 ms β”‚ 29.37 ms β”‚ no change β”‚
   β”‚ QQuery 2 β”‚75.39 ms β”‚ 76.90 ms β”‚ no change β”‚
   β”‚ QQuery 3 β”‚88.02 ms β”‚ 90.54 ms β”‚ no change β”‚
   β”‚ QQuery 4 β”‚   585.22 ms β”‚605.52 ms β”‚ no change β”‚
   β”‚ QQuery 5 β”‚   849.36 ms β”‚886.24 ms β”‚ no change β”‚
   β”‚ QQuery 6 β”‚ 2.32 ms β”‚  2.36 ms β”‚ no change β”‚
   β”‚ QQuery 7 β”‚31.99 ms β”‚ 33.28 ms β”‚ no change β”‚
   β”‚ QQuery 8 β”‚   830.49 ms β”‚866.81 ms β”‚ no change β”‚
   β”‚ QQuery 9 β”‚  1110.42 ms β”‚   1160.29 ms β”‚ no change β”‚
   β”‚ QQuery 10β”‚   221.99 ms β”‚221.95 ms β”‚ no change β”‚
   β”‚ QQuery 11β”‚   255.21 ms β”‚260.43 ms β”‚ no change β”‚
   β”‚ QQuery 12β”‚   826.88 ms β”‚862.45 ms β”‚ no change β”‚
   β”‚ QQuery 13β”‚  1186.96 ms β”‚   1159.18 ms β”‚ no change β”‚
   β”‚ QQuery 14β”‚   767.58 ms β”‚799.01 ms β”‚ no change β”‚
   β”‚ QQuery 15β”‚   717.27 ms β”‚794.54 ms β”‚  1.11x slower β”‚
   β”‚ QQuery 16β”‚  1552.61 ms β”‚   1604.86 ms β”‚ no change β”‚
   β”‚ QQuery 17β”‚  1542.14 ms β”‚   1589.89 ms β”‚ no change β”‚
   β”‚ QQuery 18β”‚  2895.45 ms β”‚   2868.94 ms β”‚ no change β”‚
   β”‚ QQuery 19β”‚79.79 ms β”‚ 81.57 ms β”‚ no change β”‚
   β”‚ QQuery 20β”‚  1278.75 ms β”‚   1211.54 ms β”‚ +1.06x faster β”‚
   β”‚ QQuery 21β”‚  1374.67 ms β”‚   1346.29 ms β”‚ no change β”‚
   β”‚ QQuery 22β”‚  2277.06 ms β”‚   2246.93 ms β”‚ no change β”‚
   β”‚ QQuery 23β”‚  7751.39 ms β”‚   6855.54 ms β”‚ +1.13x faster β”‚
   β”‚ QQuery 24β”‚   423.18 ms β”‚406.23 ms β”‚ no change β”‚
   β”‚ QQuery 25β”‚   294.80 ms β”‚289.58 ms β”‚ no change β”‚
   β”‚ QQuery 26β”‚   421.41 ms β”‚404.45 ms β”‚ no change β”‚
   β”‚ QQuery 27β”‚  1609.35 ms β”‚   1548.29 ms β”‚ no change β”‚
   β”‚ QQuery 28β”‚ 12027.18 ms β”‚  11975.67 ms β”‚ no change β”‚
   β”‚ QQuery 29β”‚   499.72 ms β”‚516.83 ms β”‚ no change β”‚
   β”‚ QQuery 30β”‚   762.44 ms β”‚768.76 ms β”‚ no change β”‚
   β”‚ QQuery 31β”‚   788.02 ms β”‚800.18 ms β”‚ no change β”‚
   β”‚ QQuery 32β”‚  2404.71 ms β”‚   2486.31 ms β”‚ no change β”‚
   β”‚ QQuery 33β”‚  3305.56 ms β”‚   3284.68 ms β”‚ no change β”‚
   β”‚ QQuery 34β”‚  3362.30 ms β”‚   3402.04 ms β”‚ no change β”‚
   β”‚ QQuery 35β”‚  1299.53 ms β”‚   1264.01 ms β”‚ no change β”‚
   β”‚ QQuery 36β”‚   133.03 ms β”‚130.08 ms β”‚ no change β”‚
   β”‚ QQuery 37β”‚54.36 ms β”‚ 56.59 ms β”‚ no change β”‚
   β”‚ QQuery 38β”‚   124.72 ms β”‚124.72 ms β”‚ no change β”‚
   β”‚ QQuery 39β”‚   210.60 ms β”‚203.82 ms β”‚ no change β”‚
   β”‚ QQuery 40β”‚46.04 ms β”‚ 42.45 ms β”‚ +1.08x faster β”‚
   β”‚ QQuery 41β”‚39.97 ms β”‚ 40.83 ms β”‚ no change β”‚
   β”‚ QQuery 42β”‚33.44 ms β”‚ 33.46 ms β”‚ no change β”‚
   β””β”€β”€β”΄β”€β”΄β”€β”€β”΄β”€β”€β”€β”˜
   ┏━┳┓
   ┃ Benchmark Summary   ┃┃
   ┑━╇┩
   β”‚ Total Time (HEAD)   β”‚ 54172.42ms β”‚
   β”‚ Total Time 

Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-18 Thread via GitHub


adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2282817078


##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -137,7 +139,7 @@ impl DynamicFilterPhysicalExpr {
 Self {
 children,
 remapped_children: None, // Initially no remapped children
-inner: Arc::new(RwLock::new(Inner::new(inner))),
+inner: Arc::new(ArcSwap::from_pointee(Inner::new(inner))),

Review Comment:
   Well I stumbled upon it because we originally had some performance 
regressions and I suspected they might be due to hitting the `Arc>`s a 
lot heavier. It seems that `ArcSwap` is the solution to that but I haven't 
benchmarked to confirm the performance impact / if it's that or other 
refactors. From a micro benchmark sitauation it makes sense. I can make a 
branch / another PR without ArcSwap and we can confirm from there if we want.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-18 Thread via GitHub


alamb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3197411815

   πŸ€– `./gh_compare_branch.sh` [Benchmark 
Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh)
 Running
   Linux aal-dev 6.11.0-1016-gcp #16~24.04.1-Ubuntu SMP Wed May 28 02:40:52 UTC 
2025 x86_64 x86_64 x86_64 GNU/Linux
   Comparing topk-filters (b7ac320c0d9fcc72f2bb3750cf19ea7be12ff320) to 
8f15991f33bf6aca9d4da8958141b59d196b2ed6 
[diff](https://github.com/apache/datafusion/compare/8f15991f33bf6aca9d4da8958141b59d196b2ed6..b7ac320c0d9fcc72f2bb3750cf19ea7be12ff320)
 using:  tpch_mem clickbench_partitioned clickbench_extended
   Results will be posted here when complete
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-18 Thread via GitHub


alamb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2282739532


##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -137,7 +139,7 @@ impl DynamicFilterPhysicalExpr {
 Self {
 children,
 remapped_children: None, // Initially no remapped children
-inner: Arc::new(RwLock::new(Inner::new(inner))),
+inner: Arc::new(ArcSwap::from_pointee(Inner::new(inner))),

Review Comment:
   do we have evidence that using ArcSwap is needed for performance?
   
   I think we could use `parking_lot` mutexes if we wanted to make the code 
cleaner (not check for Lock poisioning)



##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -1158,7 +1153,10 @@ impl ExecutionPlan for SortExec {
 context.session_config().batch_size(),
 context.runtime_env(),
 &self.metrics_set,
-self.filter.clone(),
+self.filter
+.as_ref()
+.expect("Filter should be set when fetch is Some")
+.clone(),

Review Comment:
   I recommend turning this into an internal error so that if someone does hit 
this for some reason the symptom is less severe



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-16 Thread via GitHub


adriangb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3193850159

   Okay I've re-run after 310100b and am now seeing only improvements across 
the board. @Dandandan I've requested another review from you since I think this 
may be in a good place now.
   
   ```
   Comparing main and topk-filters
   
   Benchmark run_topk_tpch.json
   
   ┏━━┳━━┳━━┳━━━┓
   ┃ Query┃ main ┃ topk-filters ┃Change ┃
   ┑━━╇━━╇━━╇━━━┩
   β”‚ Q1   β”‚ 11.22 ms β”‚  9.41 ms β”‚ +1.19x faster β”‚
   β”‚ Q2   β”‚ 13.90 ms β”‚ 11.78 ms β”‚ +1.18x faster β”‚
   β”‚ Q3   β”‚ 33.71 ms β”‚ 29.55 ms β”‚ +1.14x faster β”‚
   β”‚ Q4   β”‚ 13.81 ms β”‚ 12.89 ms β”‚ +1.07x faster β”‚
   β”‚ Q5   β”‚  8.65 ms β”‚  7.04 ms β”‚ +1.23x faster β”‚
   β”‚ Q6   β”‚ 14.67 ms β”‚ 14.65 ms β”‚ no change β”‚
   β”‚ Q7   β”‚ 45.67 ms β”‚ 38.13 ms β”‚ +1.20x faster β”‚
   β”‚ Q8   β”‚ 27.73 ms β”‚ 25.61 ms β”‚ +1.08x faster β”‚
   β”‚ Q9   β”‚ 35.25 ms β”‚ 34.09 ms β”‚ no change β”‚
   β”‚ Q10  β”‚ 56.27 ms β”‚ 52.07 ms β”‚ +1.08x faster β”‚
   β”‚ Q11  β”‚ 32.06 ms β”‚ 30.42 ms β”‚ +1.05x faster β”‚
   β””β”€β”€β”΄β”€β”€β”΄β”€β”€β”΄β”€β”€β”€β”˜
   ┏━┳━━┓
   ┃ Benchmark Summary   ┃  ┃
   ┑━╇━━┩
   β”‚ Total Time (main)   β”‚ 292.93ms β”‚
   β”‚ Total Time (topk-filters)   β”‚ 265.65ms β”‚
   β”‚ Average Time (main) β”‚  26.63ms β”‚
   β”‚ Average Time (topk-filters) β”‚  24.15ms β”‚
   β”‚ Queries Faster  β”‚9 β”‚
   β”‚ Queries Slower  β”‚0 β”‚
   β”‚ Queries with No Change  β”‚2 β”‚
   β”‚ Queries with Failureβ”‚0 β”‚
   β””β”€β”΄β”€β”€β”˜
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-13 Thread via GitHub


adriangb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3184720872

   ClickBench results for partitioned datasets look much better:
   
   ```
   ┏━━┳┳━━┳━━━┓
   ┃ Query┃   main ┃ topk-filters ┃Change ┃
   ┑━━╇╇━━╇━━━┩
   β”‚ QQuery 0 β”‚0.92 ms β”‚  1.15 ms β”‚  1.25x slower β”‚
   β”‚ QQuery 1 β”‚   12.45 ms β”‚ 11.01 ms β”‚ +1.13x faster β”‚
   β”‚ QQuery 2 β”‚   42.41 ms β”‚ 35.19 ms β”‚ +1.21x faster β”‚
   β”‚ QQuery 3 β”‚   40.63 ms β”‚ 33.55 ms β”‚ +1.21x faster β”‚
   β”‚ QQuery 4 β”‚  337.67 ms β”‚267.36 ms β”‚ +1.26x faster β”‚
   β”‚ QQuery 5 β”‚  434.44 ms β”‚362.25 ms β”‚ +1.20x faster β”‚
   β”‚ QQuery 6 β”‚1.21 ms β”‚  1.05 ms β”‚ +1.15x faster β”‚
   β”‚ QQuery 7 β”‚   14.20 ms β”‚ 11.94 ms β”‚ +1.19x faster β”‚
   β”‚ QQuery 8 β”‚  413.55 ms β”‚317.13 ms β”‚ +1.30x faster β”‚
   β”‚ QQuery 9 β”‚  601.85 ms β”‚480.06 ms β”‚ +1.25x faster β”‚
   β”‚ QQuery 10β”‚  100.43 ms β”‚ 84.75 ms β”‚ +1.18x faster β”‚
   β”‚ QQuery 11β”‚  108.91 ms β”‚ 91.61 ms β”‚ +1.19x faster β”‚
   β”‚ QQuery 12β”‚  419.13 ms β”‚323.82 ms β”‚ +1.29x faster β”‚
   β”‚ QQuery 13β”‚  570.70 ms β”‚481.27 ms β”‚ +1.19x faster β”‚
   β”‚ QQuery 14β”‚  428.36 ms β”‚310.63 ms β”‚ +1.38x faster β”‚
   β”‚ QQuery 15β”‚  426.88 ms β”‚309.22 ms β”‚ +1.38x faster β”‚
   β”‚ QQuery 16β”‚ 1251.45 ms β”‚729.69 ms β”‚ +1.72x faster β”‚
   β”‚ QQuery 17β”‚ 1175.59 ms β”‚738.35 ms β”‚ +1.59x faster β”‚
   β”‚ QQuery 18β”‚ 2670.84 ms β”‚   1945.33 ms β”‚ +1.37x faster β”‚
   β”‚ QQuery 19β”‚   41.71 ms β”‚ 32.77 ms β”‚ +1.27x faster β”‚
   β”‚ QQuery 20β”‚ 1002.98 ms β”‚738.13 ms β”‚ +1.36x faster β”‚
   β”‚ QQuery 21β”‚  969.39 ms β”‚763.04 ms β”‚ +1.27x faster β”‚
   β”‚ QQuery 22β”‚ 1506.05 ms β”‚   1214.86 ms β”‚ +1.24x faster β”‚
   β”‚ QQuery 23β”‚ 4418.24 ms β”‚   4008.72 ms β”‚ +1.10x faster β”‚
   β”‚ QQuery 24β”‚  198.37 ms β”‚196.65 ms β”‚ no change β”‚
   β”‚ QQuery 25β”‚  150.97 ms β”‚150.72 ms β”‚ no change β”‚
   β”‚ QQuery 26β”‚  200.45 ms β”‚191.65 ms β”‚ no change β”‚
   β”‚ QQuery 27β”‚ 1007.57 ms β”‚893.24 ms β”‚ +1.13x faster β”‚
   β”‚ QQuery 28β”‚ 7214.47 ms β”‚   7065.58 ms β”‚ no change β”‚
   β”‚ QQuery 29β”‚  356.81 ms β”‚327.34 ms β”‚ +1.09x faster β”‚
   β”‚ QQuery 30β”‚  323.66 ms β”‚310.25 ms β”‚ no change β”‚
   β”‚ QQuery 31β”‚  366.94 ms β”‚346.37 ms β”‚ +1.06x faster β”‚
   β”‚ QQuery 32β”‚ 2028.11 ms β”‚   1692.80 ms β”‚ +1.20x faster β”‚
   β”‚ QQuery 33β”‚ 2286.38 ms β”‚   1775.46 ms β”‚ +1.29x faster β”‚
   β”‚ QQuery 34β”‚ 1978.34 ms β”‚   2305.40 ms β”‚  1.17x slower β”‚
   β”‚ QQuery 35β”‚  555.06 ms β”‚552.82 ms β”‚ no change β”‚
   β”‚ QQuery 36β”‚   61.16 ms β”‚ 62.54 ms β”‚ no change β”‚
   β”‚ QQuery 37β”‚   24.59 ms β”‚ 23.99 ms β”‚ no change β”‚
   β”‚ QQuery 38β”‚   61.70 ms β”‚ 63.87 ms β”‚ no change β”‚
   β”‚ QQuery 39β”‚  102.67 ms β”‚103.98 ms β”‚ no change β”‚
   β”‚ QQuery 40β”‚   17.16 ms β”‚ 18.43 ms β”‚  1.07x slower β”‚
   β”‚ QQuery 41β”‚   16.16 ms β”‚ 17.05 ms β”‚  1.06x slower β”‚
   β”‚ QQuery 42β”‚   12.72 ms β”‚ 13.14 ms β”‚ no change β”‚
   β””β”€β”€β”΄β”΄β”€β”€β”΄β”€β”€β”€β”˜
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-08-13 Thread via GitHub


adriangb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3184612966

   I tried some stuff. Still not an obvious improvement:
   
   ```
   /bench.sh compare main topk-filters
   Comparing main and topk-filters
   
   Benchmark run_topk_tpch.json
   
   ┏━━┳━━┳━━┳━━━┓
   ┃ Query┃ main ┃ topk-filters ┃Change ┃
   ┑━━╇━━╇━━╇━━━┩
   β”‚ Q1   β”‚ 12.46 ms β”‚ 10.55 ms β”‚ +1.18x faster β”‚
   β”‚ Q2   β”‚ 14.02 ms β”‚ 12.06 ms β”‚ +1.16x faster β”‚
   β”‚ Q3   β”‚ 29.39 ms β”‚ 30.10 ms β”‚ no change β”‚
   β”‚ Q4   β”‚ 13.74 ms β”‚ 12.79 ms β”‚ +1.07x faster β”‚
   β”‚ Q5   β”‚  7.44 ms β”‚  7.09 ms β”‚ no change β”‚
   β”‚ Q6   β”‚ 15.87 ms β”‚ 13.79 ms β”‚ +1.15x faster β”‚
   β”‚ Q7   β”‚ 37.22 ms β”‚ 44.24 ms β”‚  1.19x slower β”‚
   β”‚ Q8   β”‚ 27.51 ms β”‚ 25.17 ms β”‚ +1.09x faster β”‚
   β”‚ Q9   β”‚ 33.04 ms β”‚ 31.28 ms β”‚ +1.06x faster β”‚
   β”‚ Q10  β”‚ 55.26 ms β”‚ 58.96 ms β”‚  1.07x slower β”‚
   β”‚ Q11  β”‚ 28.74 ms β”‚ 29.75 ms β”‚ no change β”‚
   β””β”€β”€β”΄β”€β”€β”΄β”€β”€β”΄β”€β”€β”€β”˜
   ┏━┳━━┓
   ┃ Benchmark Summary   ┃  ┃
   ┑━╇━━┩
   β”‚ Total Time (main)   β”‚ 274.68ms β”‚
   β”‚ Total Time (topk-filters)   β”‚ 275.79ms β”‚
   β”‚ Average Time (main) β”‚  24.97ms β”‚
   β”‚ Average Time (topk-filters) β”‚  25.07ms β”‚
   β”‚ Queries Faster  β”‚6 β”‚
   β”‚ Queries Slower  β”‚2 β”‚
   β”‚ Queries with No Change  β”‚3 β”‚
   β”‚ Queries with Failureβ”‚0 β”‚
   β””β”€β”΄β”€β”€β”˜
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-07-04 Thread via GitHub


Dandandan commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2186109216


##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -319,19 +342,75 @@ impl TopK {
 /// (a > 2 OR (a = 2 AND b < 3))
 /// ```
 fn update_filter(&mut self) -> Result<()> {
-let Some(filter) = &self.filter else {
+// If the heap doesn't have k elements yet, we can't create thresholds
+let Some(max_row) = self.heap.max() else {
 return Ok(());
 };
-let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? 
else {
-return Ok(());
+
+let new_threshold_row = &max_row.row;
+
+// Extract filter expression reference before entering critical section
+let filter_expr = Arc::clone(&self.filter.expr);
+
+// Check if we need to update and do both threshold and filter update 
atomically
+{
+let mut threshold_guard = self.filter.threshold_row.write();
+if let Some(current_row) = threshold_guard.as_ref() {
+match current_row.as_slice().cmp(new_threshold_row) {
+Ordering::Greater => {
+// new < current, so new threshold is more selective

Review Comment:
   I guess we should try changing it to the global heap thing - write updates 
to the shared/global heap and update the filter based on global heap.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-07-04 Thread via GitHub


Dandandan commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2186107501


##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -319,19 +342,75 @@ impl TopK {
 /// (a > 2 OR (a = 2 AND b < 3))
 /// ```
 fn update_filter(&mut self) -> Result<()> {
-let Some(filter) = &self.filter else {
+// If the heap doesn't have k elements yet, we can't create thresholds
+let Some(max_row) = self.heap.max() else {
 return Ok(());
 };
-let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? 
else {
-return Ok(());
+
+let new_threshold_row = &max_row.row;
+
+// Extract filter expression reference before entering critical section
+let filter_expr = Arc::clone(&self.filter.expr);
+
+// Check if we need to update and do both threshold and filter update 
atomically
+{
+let mut threshold_guard = self.filter.threshold_row.write();
+if let Some(current_row) = threshold_guard.as_ref() {
+match current_row.as_slice().cmp(new_threshold_row) {
+Ordering::Greater => {
+// new < current, so new threshold is more selective

Review Comment:
   The current solution seems roughly on par with main, I don't observe a 
speedup.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-07-04 Thread via GitHub


Dandandan commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2186106170


##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -319,19 +342,75 @@ impl TopK {
 /// (a > 2 OR (a = 2 AND b < 3))
 /// ```
 fn update_filter(&mut self) -> Result<()> {
-let Some(filter) = &self.filter else {
+// If the heap doesn't have k elements yet, we can't create thresholds
+let Some(max_row) = self.heap.max() else {
 return Ok(());
 };
-let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? 
else {
-return Ok(());
+
+let new_threshold_row = &max_row.row;
+
+// Extract filter expression reference before entering critical section
+let filter_expr = Arc::clone(&self.filter.expr);
+
+// Check if we need to update and do both threshold and filter update 
atomically
+{
+let mut threshold_guard = self.filter.threshold_row.write();
+if let Some(current_row) = threshold_guard.as_ref() {
+match current_row.as_slice().cmp(new_threshold_row) {
+Ordering::Greater => {
+// new < current, so new threshold is more selective

Review Comment:
   I think this was wrong before @adriangb  - in the heap lower means more 
selective 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-07-04 Thread via GitHub


Dandandan commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3037193167

   I am taking a look now, see if I can find a thing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-07-04 Thread via GitHub


adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2185669579


##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -214,41 +238,39 @@ impl TopK {
 
 let mut selected_rows = None;
 
-if let Some(filter) = self.filter.as_ref() {
-// If a filter is provided, update it with the new rows
-let filter = filter.current()?;
-let filtered = filter.evaluate(&batch)?;
-let num_rows = batch.num_rows();
-let array = filtered.into_array(num_rows)?;
-let mut filter = array.as_boolean().clone();
-let true_count = filter.true_count();
-if true_count == 0 {
-// nothing to filter, so no need to update
-return Ok(());
+// If a filter is provided, update it with the new rows

Review Comment:
   Agreed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-07-04 Thread via GitHub


Dandandan commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2185350825


##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -214,41 +238,39 @@ impl TopK {
 
 let mut selected_rows = None;
 
-if let Some(filter) = self.filter.as_ref() {
-// If a filter is provided, update it with the new rows
-let filter = filter.current()?;
-let filtered = filter.evaluate(&batch)?;
-let num_rows = batch.num_rows();
-let array = filtered.into_array(num_rows)?;
-let mut filter = array.as_boolean().clone();
-let true_count = filter.true_count();
-if true_count == 0 {
-// nothing to filter, so no need to update
-return Ok(());
+// If a filter is provided, update it with the new rows

Review Comment:
   Ah yeah I understand... maybe we should check if it is unset / true as 
additional "fast path".
   But I think that's not the source of the slow down.



##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -214,41 +238,39 @@ impl TopK {
 
 let mut selected_rows = None;
 
-if let Some(filter) = self.filter.as_ref() {
-// If a filter is provided, update it with the new rows
-let filter = filter.current()?;
-let filtered = filter.evaluate(&batch)?;
-let num_rows = batch.num_rows();
-let array = filtered.into_array(num_rows)?;
-let mut filter = array.as_boolean().clone();
-let true_count = filter.true_count();
-if true_count == 0 {
-// nothing to filter, so no need to update
-return Ok(());
+// If a filter is provided, update it with the new rows

Review Comment:
   Ah yeah I understand... maybe we should check if it is unset / returning 
constant true as additional "fast path".
   But I think that's not the source of the slow down.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-07-04 Thread via GitHub


adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2185318951


##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -214,41 +238,39 @@ impl TopK {
 
 let mut selected_rows = None;
 
-if let Some(filter) = self.filter.as_ref() {
-// If a filter is provided, update it with the new rows
-let filter = filter.current()?;
-let filtered = filter.evaluate(&batch)?;
-let num_rows = batch.num_rows();
-let array = filtered.into_array(num_rows)?;
-let mut filter = array.as_boolean().clone();
-let true_count = filter.true_count();
-if true_count == 0 {
-// nothing to filter, so no need to update
-return Ok(());
+// If a filter is provided, update it with the new rows

Review Comment:
   I think Some(filter) was always passed into the constructor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-07-04 Thread via GitHub


Dandandan commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2185314221


##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -214,41 +238,39 @@ impl TopK {
 
 let mut selected_rows = None;
 
-if let Some(filter) = self.filter.as_ref() {
-// If a filter is provided, update it with the new rows
-let filter = filter.current()?;
-let filtered = filter.evaluate(&batch)?;
-let num_rows = batch.num_rows();
-let array = filtered.into_array(num_rows)?;
-let mut filter = array.as_boolean().clone();
-let true_count = filter.true_count();
-if true_count == 0 {
-// nothing to filter, so no need to update
-return Ok(());
+// If a filter is provided, update it with the new rows

Review Comment:
   Hm.. it was used, as in the initial phase of topk there might not be 
initialized (heap not yet filled) for larger values for limit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-07-04 Thread via GitHub


adriangb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3036131650

   I did a bench run, confounding results:
   
   ```
   ┏━━┳━━┳━━┳━━━┓
   ┃ Query┃ main ┃ topk-filters ┃Change ┃
   ┑━━╇━━╇━━╇━━━┩
   β”‚ Q1   β”‚  9.96 ms β”‚ 10.70 ms β”‚  1.07x slower β”‚
   β”‚ Q2   β”‚ 12.63 ms β”‚ 15.30 ms β”‚  1.21x slower β”‚
   β”‚ Q3   β”‚ 33.45 ms β”‚ 35.22 ms β”‚  1.05x slower β”‚
   β”‚ Q4   β”‚ 15.22 ms β”‚ 16.82 ms β”‚  1.11x slower β”‚
   β”‚ Q5   β”‚  9.87 ms β”‚  8.85 ms β”‚ +1.12x faster β”‚
   β”‚ Q6   β”‚ 16.54 ms β”‚ 22.16 ms β”‚  1.34x slower β”‚
   β”‚ Q7   β”‚ 43.41 ms β”‚ 44.00 ms β”‚ no change β”‚
   β”‚ Q8   β”‚ 22.91 ms β”‚ 34.56 ms β”‚  1.51x slower β”‚
   β”‚ Q9   β”‚ 38.68 ms β”‚ 39.60 ms β”‚ no change β”‚
   β”‚ Q10  β”‚ 65.73 ms β”‚ 73.24 ms β”‚  1.11x slower β”‚
   β”‚ Q11  β”‚ 34.78 ms β”‚ 37.14 ms β”‚  1.07x slower β”‚
   β””β”€β”€β”΄β”€β”€β”΄β”€β”€β”΄β”€β”€β”€β”˜
   ┏━┳━━┓
   ┃ Benchmark Summary   ┃  ┃
   ┑━╇━━┩
   β”‚ Total Time (main)   β”‚ 303.17ms β”‚
   β”‚ Total Time (topk-filters)   β”‚ 337.58ms β”‚
   β”‚ Average Time (main) β”‚  27.56ms β”‚
   β”‚ Average Time (topk-filters) β”‚  30.69ms β”‚
   β”‚ Queries Faster  β”‚1 β”‚
   β”‚ Queries Slower  β”‚8 β”‚
   β”‚ Queries with No Change  β”‚2 β”‚
   β”‚ Queries with Failureβ”‚0 β”‚
   β””β”€β”΄β”€β”€β”˜
   ```
   
   Mostly... slower


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-06-19 Thread via GitHub


adriangb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-2989360115

   > how would you compute the shared heap on the fly?
   
   I was thinking you'd compute the top K of the top K * partitions on the fly.
   
   But maybe your proposal makes more sense.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-06-19 Thread via GitHub


Dandandan commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-2989336247

   > We could try a shared heap. It might work? I guess it will be a sort of 
balance between lock contention and better selectivity. Maybe we can balance it 
by having distinct heaps for writes with no locks but read only references to 
all of them so that when we do reads we compute on the fly the "combined" heap? 
Then we don't need any locks. The cost is that computations on the heap are 
larger but as long as `k ~ constant` then it should be fine.
   
   how would you compute the shared heap on the fly?
   
   I was thinking something similar: each write to a own heap, only write the 
ones that updated the local heap to the shared heap (limiting the lock access 
time). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-06-19 Thread via GitHub


adriangb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-2988328335

   We could try a shared heap. It might work? I guess it will be a sort of 
balance between lock contention and better selectivity. Maybe we can balance it 
by having distinct heaps for writes with no locks but read only references to 
all of them so that when we do reads we compute on the fly the "combined" heap? 
Then we don't need any locks. The cost is that computations on the heap are 
larger but as long as `k ~ constant` then it should be fine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-06-19 Thread via GitHub


Dandandan commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-2988289302

   > > Hm my earlier benchmarks didn't seem correct. not sure where the earlier 
run came from πŸ€”
   > 
   > What do the current ones show? Not much improvement?
   
   Yes about the same compared to main.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-06-19 Thread via GitHub


adriangb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-2987967895

   > Hm my earlier benchmarks didn't seem correct. not sure where the earlier 
run came from πŸ€”
   
   What do the current ones show? Not much improvement?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-06-19 Thread via GitHub


adriangb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-2987966449

   > I am wondering if we can somehow synchronize the values in the heap 
efficiently in order to make the filter for a topk(n*partitions) as efficient 
as topk(n) πŸ€”
   
   I think the only way to do that would be to make a global heap and put a 
lock on it, similar to what we do with the filters?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-06-19 Thread via GitHub


adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2156917768


##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -214,41 +238,39 @@ impl TopK {
 
 let mut selected_rows = None;
 
-if let Some(filter) = self.filter.as_ref() {
-// If a filter is provided, update it with the new rows
-let filter = filter.current()?;
-let filtered = filter.evaluate(&batch)?;
-let num_rows = batch.num_rows();
-let array = filtered.into_array(num_rows)?;
-let mut filter = array.as_boolean().clone();
-let true_count = filter.true_count();
-if true_count == 0 {
-// nothing to filter, so no need to update
-return Ok(());
+// If a filter is provided, update it with the new rows

Review Comment:
   Yeah I don't think that was useful /being used as a code path?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-06-19 Thread via GitHub


Dandandan commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-2987413335

   (Removed a non-working proposal).
   
   I am wondering if we can somehow synchronize the values in the heap 
efficiently in order to make the filter for a `topk(n*partitions)` as efficient 
as `topk(n)` πŸ€” 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-06-19 Thread via GitHub


Dandandan commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-2987392519

   WDYT @adriangb ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-06-19 Thread via GitHub


Dandandan commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-2987381689

   Synchronizing the max between all partitions will help a bit - but I wonder 
if we shouldn't just synchronize it based on the true TopK in 
SortPreservingMergeExec ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-06-19 Thread via GitHub


Dandandan commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-2987370039

   In hindsight I think actually another fact that we don't see topk being as 
effective with more partitions is that spreading them over partitions will 
essentially make `topk(n)` into a `topk(n*partitions)`.
   So a limit of `10` will be `200` with 20 partitions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-06-18 Thread via GitHub


Dandandan commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-2986650188

   Hm my earlier benchmarks didn't seem correct. not sure where the earlier run 
came from πŸ€” 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-06-18 Thread via GitHub


Dandandan commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2156139567


##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -319,13 +341,87 @@ impl TopK {
 /// (a > 2 OR (a = 2 AND b < 3))
 /// ```
 fn update_filter(&mut self) -> Result<()> {
-let Some(filter) = &self.filter else {
-return Ok(());
-};
 let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? 
else {
 return Ok(());
 };
 
+// Are the new thresholds more selective than our existing ones?
+let should_update = {
+let mut current = self.filter.thresholds.write();

Review Comment:
   I think this would be a good idea anyway to simplify the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-06-18 Thread via GitHub


Dandandan commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2156133514


##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -319,13 +341,87 @@ impl TopK {
 /// (a > 2 OR (a = 2 AND b < 3))
 /// ```
 fn update_filter(&mut self) -> Result<()> {
-let Some(filter) = &self.filter else {
-return Ok(());
-};
 let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? 
else {
 return Ok(());
 };
 
+// Are the new thresholds more selective than our existing ones?
+let should_update = {
+let mut current = self.filter.thresholds.write();

Review Comment:
   Perhaps this lock takes too long + overhead of doing this for all updates + 
all partitions quite often?
   We could also store a `Row` instead of  `thresholds` to make comparison much 
quicker (should also be able to avoid allocations)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-06-18 Thread via GitHub


Dandandan commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2156106525


##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -214,41 +238,39 @@ impl TopK {
 
 let mut selected_rows = None;
 
-if let Some(filter) = self.filter.as_ref() {
-// If a filter is provided, update it with the new rows
-let filter = filter.current()?;
-let filtered = filter.evaluate(&batch)?;
-let num_rows = batch.num_rows();
-let array = filtered.into_array(num_rows)?;
-let mut filter = array.as_boolean().clone();
-let true_count = filter.true_count();
-if true_count == 0 {
-// nothing to filter, so no need to update
-return Ok(());
+// If a filter is provided, update it with the new rows

Review Comment:
   `if let Some(filter)` condition is gone?
   This provided the "unitialized filter" case before (that just runs the 
normal topk without any filtering overhead).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-06-18 Thread via GitHub


adriangb commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-2985967718

   Seems like a bug in my implementation right? I'd be surprised if the update 
checks I added are that heavy compared to other work...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-06-18 Thread via GitHub


Dandandan commented on PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#issuecomment-2985955904

   It seems in some cases it's faster:
   
   ```
   ┏━━┳━┳━━┳━━━┓
   ┃ Query┃ topk-dynamic-filter ┃ topk-filters ┃Change ┃
   ┑━━╇━╇━━╇━━━┩
   β”‚ Q1   β”‚16.17 ms β”‚ 16.89 ms β”‚ no change β”‚
   β”‚ Q2   β”‚27.60 ms β”‚ 20.22 ms β”‚ +1.36x faster β”‚
   β”‚ Q3   β”‚55.41 ms β”‚ 54.45 ms β”‚ no change β”‚
   β”‚ Q4   β”‚22.26 ms β”‚ 22.07 ms β”‚ no change β”‚
   β”‚ Q5   β”‚11.98 ms β”‚ 12.10 ms β”‚ no change β”‚
   β”‚ Q6   β”‚26.25 ms β”‚ 27.26 ms β”‚ no change β”‚
   β”‚ Q7   β”‚71.56 ms β”‚ 67.86 ms β”‚ +1.05x faster β”‚
   β”‚ Q8   β”‚90.02 ms β”‚ 46.15 ms β”‚ +1.95x faster β”‚
   β”‚ Q9   β”‚61.64 ms β”‚ 58.91 ms β”‚ no change β”‚
   β”‚ Q10  β”‚   101.13 ms β”‚ 98.24 ms β”‚ no change β”‚
   β”‚ Q11  β”‚58.43 ms β”‚ 56.85 ms β”‚ no change β”‚
   β””β”€β”€β”΄β”€β”΄β”€β”€β”΄β”€β”€β”€β”˜
   ```
   
   And in other cases it's mixed / slower:
   
   ```┏━━┳━┳━━┳━━━┓
   ┃ Query┃ topk-dynamic-filter ┃ topk-filters ┃Change ┃
   ┑━━╇━╇━━╇━━━┩
   β”‚ QQuery 0 β”‚14.00 ms β”‚ 13.07 ms β”‚ +1.07x faster β”‚
   β”‚ QQuery 1 β”‚20.55 ms β”‚ 18.81 ms β”‚ +1.09x faster β”‚
   β”‚ QQuery 2 β”‚56.58 ms β”‚ 56.33 ms β”‚ no change β”‚
   β”‚ QQuery 3 β”‚56.30 ms β”‚ 54.94 ms β”‚ no change β”‚
   β”‚ QQuery 4 β”‚   468.00 ms β”‚516.71 ms β”‚  1.10x slower β”‚
   β”‚ QQuery 5 β”‚   626.85 ms β”‚659.29 ms β”‚  1.05x slower β”‚
   β”‚ QQuery 6 β”‚15.31 ms β”‚ 15.26 ms β”‚ no change β”‚
   β”‚ QQuery 7 β”‚30.44 ms β”‚ 21.48 ms β”‚ +1.42x faster β”‚
   β”‚ QQuery 8 β”‚   547.48 ms β”‚569.67 ms β”‚ no change β”‚
   β”‚ QQuery 9 β”‚   744.50 ms β”‚778.67 ms β”‚ no change β”‚
   β”‚ QQuery 10β”‚   182.53 ms β”‚176.03 ms β”‚ no change β”‚
   β”‚ QQuery 11β”‚   217.10 ms β”‚190.23 ms β”‚ +1.14x faster β”‚
   β”‚ QQuery 12β”‚   832.33 ms β”‚715.38 ms β”‚ +1.16x faster β”‚
   β”‚ QQuery 13β”‚  1102.37 ms β”‚914.96 ms β”‚ +1.20x faster β”‚
   β”‚ QQuery 14β”‚   823.97 ms β”‚576.94 ms β”‚ +1.43x faster β”‚
   β”‚ QQuery 15β”‚   574.53 ms β”‚496.51 ms β”‚ +1.16x faster β”‚
   β”‚ QQuery 16β”‚  1324.12 ms β”‚   1150.68 ms β”‚ +1.15x faster β”‚
   β”‚ QQuery 17β”‚  1324.29 ms β”‚   1245.55 ms β”‚ +1.06x faster β”‚
   β”‚ QQuery 18β”‚  2603.07 ms β”‚   2248.86 ms β”‚ +1.16x faster β”‚
   β”‚ QQuery 19β”‚48.17 ms β”‚ 49.60 ms β”‚ no change β”‚
   β”‚ QQuery 20β”‚   978.10 ms β”‚964.90 ms β”‚ no change β”‚
   β”‚ QQuery 21β”‚  1021.47 ms β”‚   1050.51 ms β”‚ no change β”‚
   β”‚ QQuery 22β”‚  1958.07 ms β”‚   1660.81 ms β”‚ +1.18x faster β”‚
   β”‚ QQuery 23β”‚   760.66 ms β”‚   5353.24 ms β”‚  7.04x slower β”‚
   β”‚ QQuery 24β”‚   525.78 ms β”‚342.39 ms β”‚ +1.54x faster β”‚
   β”‚ QQuery 25β”‚   480.36 ms β”‚278.59 ms β”‚ +1.72x faster β”‚
   β”‚ QQuery 26β”‚   542.13 ms β”‚342.32 ms β”‚ +1.58x faster β”‚
   β”‚ QQuery 27β”‚  1826.05 ms β”‚   1260.73 ms β”‚ +1.45x faster β”‚
   β”‚ QQuery 28β”‚ 11094.31 ms β”‚  10685.64 ms β”‚ no change β”‚
   β”‚ QQuery 29β”‚   427.18 ms β”‚432.16 ms β”‚ no change β”‚
   β”‚ QQuery 30β”‚   826.43 ms β”‚539.39 ms β”‚ +1.53x faster β”‚
   β”‚ QQuery 31β”‚   789.89 ms β”‚553.49 ms β”‚ +1.43x faster β”‚
   β”‚ QQuery 32β”‚  2580.93 ms β”‚   2339.06 ms β”‚ +1.10x faster β”‚
   β”‚ QQuery 33β”‚  2660.40 ms β”‚   2761.14 ms β”‚ no change β”‚
   β”‚ QQuery 34β”‚  2948.08 ms β”‚   2923.44 ms β”‚ no change β”‚
   β”‚ QQuery 35β”‚   886.56 ms β”‚810.35 ms β”‚ +1.09x faster β”‚
   β”‚ QQuery 36β”‚18.59 ms β”‚ 83.97 ms β”‚  4.52x slower β”‚
   β”‚ QQuery 37β”‚18.67 ms β”‚ 35.52 ms β”‚  1.90x slower β”‚
   β”‚ QQuery 38β”‚18.10 ms β”‚ 81.86 ms β”‚  4.52x slower β”‚
   β”‚ QQuery 39β”‚17.89 ms β”‚135.04 ms β”‚  7.55x slower β”‚
   β”‚ QQuery 40β”‚18.30 ms β”‚ 27.70 ms β”‚  1.51x slower β”‚
   β”‚ QQuery 41β”‚18.15 ms β”‚ 26.07 ms β”‚  1.44x slower β”‚
   β”‚ QQuery 42β”‚17.78 ms β”‚ 22.56 ms β”‚  1.27x slower β”‚
   β””β”€β”€β”΄β”€β”΄β”€β”€β”΄β”€β”€β”€β”˜
   ┏┳┓
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e

Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-06-18 Thread via GitHub


Dandandan commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2154078467


##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -319,13 +341,88 @@ impl TopK {
 /// (a > 2 OR (a = 2 AND b < 3))
 /// ```
 fn update_filter(&mut self) -> Result<()> {
-let Some(filter) = &self.filter else {
-return Ok(());
-};
 let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? 
else {
 return Ok(());
 };
 
+// Are the new thresholds more selective than our existing ones?
+let should_update = {
+if let Some(current) = self.filter.thresholds.write().as_mut() {
+assert!(current.len() == thresholds.len());
+// Check if new thresholds are more selective than current ones
+let mut more_selective = false;
+for ((current_value, new_value), sort_expr) in
+current.iter().zip(thresholds.iter()).zip(self.expr.iter())
+{
+// Handle null cases
+let (current_is_null, new_is_null) =
+(current_value.is_null(), new_value.is_null());

Review Comment:
   Can't we use `ScalarValue::partial_cmp`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]

2025-06-17 Thread via GitHub


adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2153281386


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -846,8 +846,10 @@ pub struct SortExec {
 common_sort_prefix: Vec,
 /// Cache holding plan properties like equivalences, output partitioning 
etc.
 cache: PlanProperties,
-/// Filter matching the state of the sort for dynamic filter pushdown
-filter: Option>,
+/// Filter matching the state of the sort for dynamic filter pushdown.
+/// If `fetch` is `Some`, this will also be set and a TopK operator may be 
used.
+/// If `fetch` is `None`, this will be `None`.
+filter: Option,

Review Comment:
   I feel like there's some further refactoring that could happen here, e.g. 
split up SortExec, leaving for another day.



##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -1158,7 +1153,10 @@ impl ExecutionPlan for SortExec {
 context.session_config().batch_size(),
 context.runtime_env(),
 &self.metrics_set,
-self.filter.clone(),
+self.filter
+.as_ref()
+.expect("Filter should be set when fetch is Some")
+.clone(),

Review Comment:
   I refactored so that the `TopK` struct always expects this parameter which 
better reflects the reality of execution. But since it's strangely tied to the 
`fetch` param I am doing an `expect` assertion here. It should never fail at 
runtime.



##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -935,11 +940,6 @@ impl SortExec {
 }
 }
 
-pub fn with_filter(mut self, filter: Arc) -> 
Self {
-self.filter = Some(filter);
-self
-}

Review Comment:
   This was unused and had slipped through the cracks. I can make a new PR to 
just remove these methods.



##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -319,13 +341,88 @@ impl TopK {
 /// (a > 2 OR (a = 2 AND b < 3))
 /// ```
 fn update_filter(&mut self) -> Result<()> {
-let Some(filter) = &self.filter else {
-return Ok(());
-};
 let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? 
else {
 return Ok(());
 };
 
+// Are the new thresholds more selective than our existing ones?
+let should_update = {
+if let Some(current) = self.filter.thresholds.write().as_mut() {
+assert!(current.len() == thresholds.len());
+// Check if new thresholds are more selective than current ones
+let mut more_selective = false;
+for ((current_value, new_value), sort_expr) in
+current.iter().zip(thresholds.iter()).zip(self.expr.iter())
+{
+// Handle null cases
+let (current_is_null, new_is_null) =
+(current_value.is_null(), new_value.is_null());
+
+match (current_is_null, new_is_null) {
+(true, true) => {
+// Both null, continue checking next values
+}
+(true, false) => {
+// Current is null, new is not null
+// For nulls_first: null < non-null, so new value 
is less selective
+// For nulls_last: null > non-null, so new value 
is more selective
+more_selective = !sort_expr.options.nulls_first;
+break;
+}
+(false, true) => {
+// Current is not null, new is null
+// For nulls_first: non-null > null, so new value 
is more selective
+// For nulls_last: non-null < null, so new value 
is less selective
+more_selective = sort_expr.options.nulls_first;
+break;
+}
+(false, false) => {
+// Neither is null, compare values
+match current_value.partial_cmp(new_value) {
+Some(ordering) => {
+match ordering {
+Ordering::Equal => {
+// Continue checking next values
+}
+Ordering::Less => {
+// For descending sort: new > 
current means more selective
+// For ascending sort: new > 
current means less selective
+more_selective = 
sort_expr.