Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb merged PR #16433: URL: https://github.com/apache/datafusion/pull/16433 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3215218818 Benchmarks look good, several faster queries, no queries really slower! I'll merge this in the next couple hours if you don't first Andrew. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
alamb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3215208209 π€: Benchmark completed Details ``` Comparing HEAD and topk-filters Benchmark clickbench_extended.json ββββ³ββ³βββ³ββββ β QueryβHEAD β topk-filters βChange β β‘ββββββββββββ© β QQuery 0 β 2639.26 ms β 2604.08 ms β no change β β QQuery 1 β 1219.97 ms β 1240.23 ms β no change β β QQuery 2 β 2324.31 ms β 2392.51 ms β no change β β QQuery 3 β 1207.99 ms β 1172.77 ms β no change β β QQuery 4 β 2236.03 ms β 2231.24 ms β no change β β QQuery 5 β 27457.81 ms β 27345.17 ms β no change β β QQuery 6 β 4125.85 ms β 4209.18 ms β no change β β QQuery 7 β 3599.32 ms β 3503.08 ms β no change β ββββ΄ββ΄βββ΄ββββ βββ³β β Benchmark Summary ββ β‘βββ© β Total Time (HEAD) β 44810.53ms β β Total Time (topk-filters) β 44698.26ms β β Average Time (HEAD) β 5601.32ms β β Average Time (topk-filters) β 5587.28ms β β Queries Faster β 0 β β Queries Slower β 0 β β Queries with No Change β 8 β β Queries with Failureβ 0 β βββ΄β Benchmark clickbench_partitioned.json ββββ³ββ³βββ³ββββ β QueryβHEAD β topk-filters βChange β β‘ββββββββββββ© β QQuery 0 β 2.12 ms β 2.15 ms β no change β β QQuery 1 β50.61 ms β 51.14 ms β no change β β QQuery 2 β 135.87 ms β137.82 ms β no change β β QQuery 3 β 162.54 ms β162.41 ms β no change β β QQuery 4 β 1055.31 ms β 1027.29 ms β no change β β QQuery 5 β 1475.94 ms β 1472.47 ms β no change β β QQuery 6 β 2.13 ms β 2.17 ms β no change β β QQuery 7 β54.79 ms β 54.63 ms β no change β β QQuery 8 β 1472.25 ms β 1422.87 ms β no change β β QQuery 9 β 1837.78 ms β 1812.93 ms β no change β β QQuery 10β 369.82 ms β385.13 ms β no change β β QQuery 11β 428.04 ms β435.19 ms β no change β β QQuery 12β 1347.17 ms β 1365.75 ms β no change β β QQuery 13β 2098.07 ms β 2115.15 ms β no change β β QQuery 14β 1237.31 ms β 1239.03 ms β no change β β QQuery 15β 1173.37 ms β 1173.14 ms β no change β β QQuery 16β 2612.56 ms β 2607.94 ms β no change β β QQuery 17β 2597.59 ms β 2607.24 ms β no change β β QQuery 18β 5156.33 ms β 4834.22 ms β +1.07x faster β β QQuery 19β 126.62 ms β128.06 ms β no change β β QQuery 20β 1961.14 ms β 2007.25 ms β no change β β QQuery 21β 2279.44 ms β 2322.19 ms β no change β β QQuery 22β 3932.07 ms β 3939.04 ms β no change β β QQuery 23β 15969.33 ms β 12635.10 ms β +1.26x faster β β QQuery 24β 265.46 ms β208.07 ms β +1.28x faster β β QQuery 25β 492.34 ms β505.17 ms β no change β β QQuery 26β 259.62 ms β229.17 ms β +1.13x faster β β QQuery 27β 2872.01 ms β 2888.04 ms β no change β β QQuery 28β 23296.75 ms β 23107.00 ms β no change β β QQuery 29β 984.75 ms β 1005.99 ms β no change β β QQuery 30β 1328.94 ms β 1289.57 ms β no change β β QQuery 31β 1326.20 ms β 1330.12 ms β no change β β QQuery 32β 4487.03 ms β 4331.85 ms β no change β β QQuery 33β 5750.40 ms β 5628.25 ms β no change β β QQuery 34β 5811.68 ms β 5931.34 ms β no change β β QQuery 35β 2014.76 ms β 2008.45 ms β no change β β QQuery 36β 122.81 ms β120.62 ms β no change β β QQuery 37β53.34 ms β 53.77 ms β no change β β QQuery 38β 122.13 ms β123.16 ms β no change β β QQuery 39β 194.75 ms β197.72 ms β no change β β QQuery 40β43.97 ms β 43.22 ms β no change β β QQuery 41β40.40 ms β 39.76 ms β no change β β QQuery 42β32.38 ms β 31.56 ms β no change β ββββ΄ββ΄βββ΄ββββ βββ³β β Benchmark Summary ββ β‘βββ© β Total Time (HEAD) β 97037.93ms β β Total Time (topk-filters) β 93013.16ms β β Average Tim
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
alamb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2294333805
##
datafusion/datasource-parquet/src/opener.rs:
##
@@ -409,12 +409,58 @@ impl FileOpener for ParquetOpener {
.with_row_groups(row_group_indexes)
.build()?;
-let adapted = stream
-.map_err(|e| ArrowError::ExternalError(Box::new(e)))
-.map(move |maybe_batch| {
-maybe_batch
-.and_then(|b|
schema_mapping.map_batch(b).map_err(Into::into))
-});
+// Create a stateful stream that can check pruning after each batch
+let adapted = {
Review Comment:
- https://github.com/apache/datafusion/pull/17293
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
alamb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2294236933
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -319,19 +343,89 @@ impl TopK {
/// (a > 2 OR (a = 2 AND b < 3))
/// ```
fn update_filter(&mut self) -> Result<()> {
-let Some(filter) = &self.filter else {
+// If the heap doesn't have k elements yet, we can't create thresholds
+let Some(max_row) = self.heap.max() else {
return Ok(());
};
-let Some(thresholds) = self.heap.get_threshold_values(&self.expr)?
else {
+
+let new_threshold_row = &max_row.row;
+
+// Extract scalar values BEFORE acquiring lock to reduce critical
section
+let thresholds = match self.heap.get_threshold_values(&self.expr)? {
Review Comment:
I think we can move this down after the check for update too:
- https://github.com/pydantic/datafusion/pull/37
##
datafusion/datasource-parquet/src/opener.rs:
##
@@ -409,12 +409,58 @@ impl FileOpener for ParquetOpener {
.with_row_groups(row_group_indexes)
.build()?;
-let adapted = stream
-.map_err(|e| ArrowError::ExternalError(Box::new(e)))
-.map(move |maybe_batch| {
-maybe_batch
-.and_then(|b|
schema_mapping.map_batch(b).map_err(Into::into))
-});
+// Create a stateful stream that can check pruning after each batch
+let adapted = {
Review Comment:
I found this code somewhat π€― (and this function is already 100s of lines
long) I spent some time refactoring it into its own stream for readability and
I also understand it better now. I'll put up a follow on PR to extract this
logic -- no need to do it in this one
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
alamb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3215116380 π€: Benchmark completed Details ``` Comparing HEAD and topk-filters Benchmark run_topk_tpch.json ββββ³ββββ³βββ³ββββ β Queryβ HEAD β topk-filters βChange β β‘ββββββββββββββ© β Q1 β 36.60 ms β 8.76 ms β +4.18x faster β β Q2 β 45.45 ms β 45.55 ms β no change β β Q3 β 124.01 ms β125.24 ms β no change β β Q4 β 51.42 ms β 48.09 ms β +1.07x faster β β Q5 β 35.31 ms β 38.12 ms β 1.08x slower β β Q6 β 66.22 ms β 67.99 ms β no change β β Q7 β 162.63 ms β166.41 ms β no change β β Q8 β 106.50 ms β108.96 ms β no change β β Q9 β 135.16 ms β135.68 ms β no change β β Q10 β 223.96 ms β221.17 ms β no change β β Q11 β 113.44 ms β117.74 ms β no change β ββββ΄ββββ΄βββ΄ββββ βββ³ββββ β Benchmark Summary β β β‘ββββββ© β Total Time (HEAD) β 1100.71ms β β Total Time (topk-filters) β 1083.72ms β β Average Time (HEAD) β 100.06ms β β Average Time (topk-filters) β 98.52ms β β Queries Faster β 2 β β Queries Slower β 1 β β Queries with No Change β 8 β β Queries with Failureβ 0 β βββ΄ββββ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
alamb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3215116778 π€ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.14.0-1014-gcp #15~24.04.1-Ubuntu SMP Fri Jul 25 23:26:08 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing topk-filters (d4e2312d2c3b28db15d194bb279577f495f84029) to f363e382661a4f45dad2912e9988f1703e46939b [diff](https://github.com/apache/datafusion/compare/f363e382661a4f45dad2912e9988f1703e46939b..d4e2312d2c3b28db15d194bb279577f495f84029) using: tpch_mem clickbench_partitioned clickbench_extended Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
alamb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3215086665 π€ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.14.0-1014-gcp #15~24.04.1-Ubuntu SMP Fri Jul 25 23:26:08 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing topk-filters (d4e2312d2c3b28db15d194bb279577f495f84029) to f363e382661a4f45dad2912e9988f1703e46939b [diff](https://github.com/apache/datafusion/compare/f363e382661a4f45dad2912e9988f1703e46939b..d4e2312d2c3b28db15d194bb279577f495f84029) using: topk_tpch Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
alamb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3215086584 π€: Benchmark completed Details ``` Comparing HEAD and topk-filters Benchmark tpch_mem_sf1.json ββββ³ββββ³βββ³ββββ β Queryβ HEAD β topk-filters βChange β β‘ββββββββββββββ© β QQuery 1 β 169.63 ms β165.07 ms β no change β β QQuery 2 β 27.55 ms β 27.70 ms β no change β β QQuery 3 β 44.86 ms β 45.11 ms β no change β β QQuery 4 β 26.77 ms β 26.41 ms β no change β β QQuery 5 β 72.18 ms β 73.65 ms β no change β β QQuery 6 β 19.67 ms β 19.50 ms β no change β β QQuery 7 β 144.25 ms β139.83 ms β no change β β QQuery 8 β 30.48 ms β 33.56 ms β 1.10x slower β β QQuery 9 β 83.26 ms β 82.54 ms β no change β β QQuery 10β 57.63 ms β 58.02 ms β no change β β QQuery 11β 42.63 ms β 40.83 ms β no change β β QQuery 12β 49.97 ms β 50.12 ms β no change β β QQuery 13β 44.87 ms β 46.68 ms β no change β β QQuery 14β 13.69 ms β 13.90 ms β no change β β QQuery 15β 23.59 ms β 23.69 ms β no change β β QQuery 16β 23.76 ms β 23.45 ms β no change β β QQuery 17β 141.23 ms β146.02 ms β no change β β QQuery 18β 318.80 ms β313.81 ms β no change β β QQuery 19β 48.11 ms β 34.80 ms β +1.38x faster β β QQuery 20β 60.59 ms β 47.85 ms β +1.27x faster β β QQuery 21β 224.40 ms β216.34 ms β no change β β QQuery 22β 20.00 ms β 19.36 ms β no change β ββββ΄ββββ΄βββ΄ββββ βββ³ββββ β Benchmark Summary β β β‘ββββββ© β Total Time (HEAD) β 1687.90ms β β Total Time (topk-filters) β 1648.24ms β β Average Time (HEAD) β 76.72ms β β Average Time (topk-filters) β 74.92ms β β Queries Faster β 2 β β Queries Slower β 1 β β Queries with No Change β19 β β Queries with Failureβ 0 β βββ΄ββββ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
alamb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3215012818 Looking at it now -- I kicked off the benchmarks again after making some changes to my gcp machine that hopefully will make the results more consistent -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
alamb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3215011892 π€ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.14.0-1014-gcp #15~24.04.1-Ubuntu SMP Fri Jul 25 23:26:08 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing topk-filters (0b1cb583b781e99e0163c6caa62b8f735bafdcf8) to f363e382661a4f45dad2912e9988f1703e46939b [diff](https://github.com/apache/datafusion/compare/f363e382661a4f45dad2912e9988f1703e46939b..0b1cb583b781e99e0163c6caa62b8f735bafdcf8) using: tpch_mem Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3214807633 @alamb I think you've reviewed but not approved, I thought you had approved. Can you take another look at this PR when you get a chance? I think I've addressed all of the feedback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3210820667 I've rebased this on `main` and plan to merge it once CI passes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3203042277 @alamb I think most of the feedback has been addressed, thank you for your review and improvements -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
alamb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3202553698 π€: Benchmark completed Details ``` Comparing HEAD and topk-filters Benchmark clickbench_extended.json ββββ³ββ³βββ³ββββ β QueryβHEAD β topk-filters βChange β β‘ββββββββββββ© β QQuery 0 β 1941.87 ms β 1911.66 ms β no change β β QQuery 1 β 745.79 ms β645.87 ms β +1.15x faster β β QQuery 2 β 1448.99 ms β 1286.31 ms β +1.13x faster β β QQuery 3 β 620.31 ms β614.76 ms β no change β β QQuery 4 β 1293.11 ms β 1282.47 ms β no change β β QQuery 5 β 13859.62 ms β 13799.54 ms β no change β β QQuery 6 β 2359.44 ms β 2349.22 ms β no change β β QQuery 7 β 1800.51 ms β 1811.91 ms β no change β ββββ΄ββ΄βββ΄ββββ βββ³β β Benchmark Summary ββ β‘βββ© β Total Time (HEAD) β 24069.65ms β β Total Time (topk-filters) β 23701.73ms β β Average Time (HEAD) β 3008.71ms β β Average Time (topk-filters) β 2962.72ms β β Queries Faster β 2 β β Queries Slower β 0 β β Queries with No Change β 6 β β Queries with Failureβ 0 β βββ΄β Benchmark clickbench_partitioned.json ββββ³ββ³βββ³ββββ β QueryβHEAD β topk-filters βChange β β‘ββββββββββββ© β QQuery 0 β 2.47 ms β 2.23 ms β +1.11x faster β β QQuery 1 β29.14 ms β 29.27 ms β no change β β QQuery 2 β75.23 ms β 76.03 ms β no change β β QQuery 3 β95.06 ms β 89.60 ms β +1.06x faster β β QQuery 4 β 595.70 ms β594.73 ms β no change β β QQuery 5 β 862.32 ms β877.24 ms β no change β β QQuery 6 β 2.31 ms β 2.18 ms β +1.06x faster β β QQuery 7 β32.52 ms β 33.61 ms β no change β β QQuery 8 β 848.36 ms β863.85 ms β no change β β QQuery 9 β 1168.82 ms β 1152.74 ms β no change β β QQuery 10β 229.56 ms β225.21 ms β no change β β QQuery 11β 254.54 ms β249.47 ms β no change β β QQuery 12β 882.94 ms β836.94 ms β +1.05x faster β β QQuery 13β 1234.12 ms β 1189.38 ms β no change β β QQuery 14β 805.61 ms β789.22 ms β no change β β QQuery 15β 783.32 ms β783.07 ms β no change β β QQuery 16β 1593.42 ms β 1584.35 ms β no change β β QQuery 17β 1586.06 ms β 1578.98 ms β no change β β QQuery 18β 2818.83 ms β 2850.55 ms β no change β β QQuery 19β82.25 ms β 79.62 ms β no change β β QQuery 20β 1204.54 ms β 1212.60 ms β no change β β QQuery 21β 1379.57 ms β 1350.79 ms β no change β β QQuery 22β 2278.24 ms β 2259.20 ms β no change β β QQuery 23β 7743.86 ms β 6870.55 ms β +1.13x faster β β QQuery 24β 416.53 ms β405.25 ms β no change β β QQuery 25β 294.09 ms β288.28 ms β no change β β QQuery 26β 416.90 ms β405.81 ms β no change β β QQuery 27β 1617.30 ms β 1592.63 ms β no change β β QQuery 28β 11970.14 ms β 11759.08 ms β no change β β QQuery 29β 508.33 ms β520.47 ms β no change β β QQuery 30β 787.97 ms β783.59 ms β no change β β QQuery 31β 787.57 ms β773.39 ms β no change β β QQuery 32β 2368.78 ms β 2381.14 ms β no change β β QQuery 33β 3177.45 ms β 3187.93 ms β no change β β QQuery 34β 3188.98 ms β 3216.57 ms β no change β β QQuery 35β 1215.92 ms β 1294.23 ms β 1.06x slower β β QQuery 36β 129.68 ms β128.39 ms β no change β β QQuery 37β56.08 ms β 56.33 ms β no change β β QQuery 38β 124.60 ms β127.86 ms β no change β β QQuery 39β 198.97 ms β209.04 ms β 1.05x slower β β QQuery 40β41.21 ms β 42.17 ms β no change β β QQuery 41β38.62 ms β 39.69 ms β no change β β QQuery 42β31.94 ms β 33.52 ms β no change β ββββ΄ββ΄βββ΄ββββ βββ³β β Benchmark Summary ββ β‘βββ© β Total Time (HEAD) β 53959.86ms β β Total Time
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2286514826
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -121,13 +122,37 @@ pub struct TopK {
/// Common sort prefix between the input and the sort expressions to allow
early exit optimization
common_sort_prefix: Arc<[PhysicalSortExpr]>,
/// Filter matching the state of the `TopK` heap used for dynamic filter
pushdown
-filter: Option>,
+filter: TopKDynamicFilters,
/// If true, indicates that all rows of subsequent batches are guaranteed
/// to be greater (by byte order, after row conversion) than the top K,
/// which means the top K won't change and the computation can be finished
early.
pub(crate) finished: bool,
}
+#[derive(Debug, Clone)]
+pub struct TopKDynamicFilters {
+/// The current *global* threshold for the dynamic filter.
+/// This is shared across all partitions and is updated by any of them.
+/// Stored as row bytes for efficient comparison.
+threshold_row: Arc>>,
+/// The expression used to evaluate the dynamic filter
+expr: Arc,
Review Comment:
Yep good call! I think otherwise we're bound to run into bugs /
synchronization issues. Did it in bcc0dcdc4.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2286494820
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -319,19 +342,84 @@ impl TopK {
/// (a > 2 OR (a = 2 AND b < 3))
/// ```
fn update_filter(&mut self) -> Result<()> {
-let Some(filter) = &self.filter else {
+// If the heap doesn't have k elements yet, we can't create thresholds
+let Some(max_row) = self.heap.max() else {
return Ok(());
};
-let Some(thresholds) = self.heap.get_threshold_values(&self.expr)?
else {
-return Ok(());
+
+let new_threshold_row = &max_row.row;
+
+// Extract scalar values BEFORE acquiring lock to reduce critical
section
+let thresholds = match self.heap.get_threshold_values(&self.expr)? {
+Some(t) => t,
+None => return Ok(()),
};
+// Extract filter expression reference before entering critical section
+let filter_expr = Arc::clone(&self.filter.expr);
+
+// Check if we need to update the threshold (lock-free read)
+let current_threshold = self.filter.threshold_row.load();
+let needs_update = match current_threshold.as_ref() {
+Some(current_row) => {
+// new < current means new threshold is more selective
+current_row.as_slice().cmp(new_threshold_row) ==
Ordering::Greater
+}
+None => true, // No current threshold, so we need to set one
+};
+
+// Only proceed if we need to update
+if needs_update {
+// Build the filter expression OUTSIDE any synchronization
+let predicate = Self::build_filter_expression(&self.expr,
thresholds)?;
+let new_threshold_arc = Arc::new(new_threshold_row.to_vec());
+
+// Atomically update the threshold using compare-and-swap
+let old_threshold = self.filter.threshold_row.compare_and_swap(
+Β€t_threshold,
+Some(Arc::clone(&new_threshold_arc)),
+);
+
+// Only update filter if we successfully updated the threshold
+// (or if there was no previous threshold and we're the first)
+let should_update_filter =
+match (old_threshold.as_ref(), current_threshold.as_ref()) {
+// We successfully swapped
+(Some(old), Some(expected)) if Arc::ptr_eq(old, expected)
=> true,
+// We were the first to set it
+(None, None) => true,
+// Another thread updated before us, check if our
threshold is still better
+(Some(actual_old), _) => {
+actual_old.as_slice().cmp(new_threshold_row) ==
Ordering::Greater
Review Comment:
I think you resolved this in your commits, just confirming
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3202471933 > π€: Benchmark completed > > Details Hmm quite a bit worse than https://github.com/apache/datafusion/pull/16433#issuecomment-3201715769. I still think we should move forward with this sans ArcSwap and then we can look at that in isolation, otherwise it's too hard to know what's responsible for performance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
alamb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3202450803 π€ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.11.0-1016-gcp #16~24.04.1-Ubuntu SMP Wed May 28 02:40:52 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing topk-filters (ffe68565b55ba640f9c5197a4f38106ab0460807) to 8f15991f33bf6aca9d4da8958141b59d196b2ed6 [diff](https://github.com/apache/datafusion/compare/8f15991f33bf6aca9d4da8958141b59d196b2ed6..ffe68565b55ba640f9c5197a4f38106ab0460807) using: tpch_mem clickbench_partitioned clickbench_extended Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
alamb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3202450580 π€: Benchmark completed Details ``` Comparing HEAD and topk-filters Benchmark run_topk_tpch.json ββββ³ββββ³βββ³ββββ β Queryβ HEAD β topk-filters βChange β β‘ββββββββββββββ© β Q1 β 24.01 ms β 8.64 ms β +2.78x faster β β Q2 β 30.33 ms β 30.83 ms β no change β β Q3 β 88.89 ms β 98.89 ms β 1.11x slower β β Q4 β 34.17 ms β 39.35 ms β 1.15x slower β β Q5 β 23.42 ms β 27.51 ms β 1.17x slower β β Q6 β 48.61 ms β 48.06 ms β no change β β Q7 β 121.79 ms β121.96 ms β no change β β Q8 β 90.35 ms β 69.09 ms β +1.31x faster β β Q9 β 93.10 ms β 91.78 ms β no change β β Q10 β 163.82 ms β170.23 ms β no change β β Q11 β 78.22 ms β 13.29 ms β +5.89x faster β ββββ΄ββββ΄βββ΄ββββ βββ³βββ β Benchmark Summary β β β‘βββββ© β Total Time (HEAD) β 796.69ms β β Total Time (topk-filters) β 719.62ms β β Average Time (HEAD) β 72.43ms β β Average Time (topk-filters) β 65.42ms β β Queries Faster β3 β β Queries Slower β3 β β Queries with No Change β5 β β Queries with Failureβ0 β βββ΄βββ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
alamb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3202449463 π€ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.11.0-1016-gcp #16~24.04.1-Ubuntu SMP Wed May 28 02:40:52 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing topk-filters (ffe68565b55ba640f9c5197a4f38106ab0460807) to 8f15991f33bf6aca9d4da8958141b59d196b2ed6 [diff](https://github.com/apache/datafusion/compare/8f15991f33bf6aca9d4da8958141b59d196b2ed6..ffe68565b55ba640f9c5197a4f38106ab0460807) using: topk_tpch Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
alamb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3202449210 π€: Benchmark completed Details ``` Comparing HEAD and topk-filters Benchmark tpch_mem_sf1.json ββββ³ββββ³βββ³ββββ β Queryβ HEAD β topk-filters βChange β β‘ββββββββββββββ© β QQuery 1 β 98.51 ms β 98.91 ms β no change β β QQuery 2 β 20.91 ms β 20.89 ms β no change β β QQuery 3 β 31.83 ms β 32.16 ms β no change β β QQuery 4 β 18.38 ms β 18.19 ms β no change β β QQuery 5 β 49.14 ms β 49.23 ms β no change β β QQuery 6 β 11.96 ms β 11.79 ms β no change β β QQuery 7 β 88.37 ms β 85.17 ms β no change β β QQuery 8 β 24.71 ms β 23.62 ms β no change β β QQuery 9 β 52.90 ms β 53.36 ms β no change β β QQuery 10β 40.20 ms β 39.90 ms β no change β β QQuery 11β 39.34 ms β 38.67 ms β no change β β QQuery 12β 29.89 ms β 30.23 ms β no change β β QQuery 13β 25.64 ms β 26.43 ms β no change β β QQuery 14β 9.81 ms β 9.54 ms β no change β β QQuery 15β 19.13 ms β 18.48 ms β no change β β QQuery 16β 17.07 ms β 17.31 ms β no change β β QQuery 17β 94.55 ms β 93.25 ms β no change β β QQuery 18β 189.44 ms β188.32 ms β no change β β QQuery 19β 23.73 ms β 24.38 ms β no change β β QQuery 20β 31.93 ms β 30.80 ms β no change β β QQuery 21β 141.87 ms β139.60 ms β no change β β QQuery 22β 13.79 ms β 14.01 ms β no change β ββββ΄ββββ΄βββ΄ββββ βββ³ββββ β Benchmark Summary β β β‘ββββββ© β Total Time (HEAD) β 1073.08ms β β Total Time (topk-filters) β 1064.25ms β β Average Time (HEAD) β 48.78ms β β Average Time (topk-filters) β 48.37ms β β Queries Faster β 0 β β Queries Slower β 0 β β Queries with No Change β22 β β Queries with Failureβ 0 β βββ΄ββββ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
alamb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3202448125 π€ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.11.0-1016-gcp #16~24.04.1-Ubuntu SMP Wed May 28 02:40:52 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing topk-filters (ffe68565b55ba640f9c5197a4f38106ab0460807) to 8f15991f33bf6aca9d4da8958141b59d196b2ed6 [diff](https://github.com/apache/datafusion/compare/8f15991f33bf6aca9d4da8958141b59d196b2ed6..ffe68565b55ba640f9c5197a4f38106ab0460807) using: tpch_mem Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
alamb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2286474109
##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -137,7 +139,7 @@ impl DynamicFilterPhysicalExpr {
Self {
children,
remapped_children: None, // Initially no remapped children
-inner: Arc::new(RwLock::new(Inner::new(inner))),
+inner: Arc::new(ArcSwap::from_pointee(Inner::new(inner))),
Review Comment:
I maybe have been burned one too many times by random maintainers on the
internet deciding to disappear, etc and I am trying to keep the dependency load
down for DataFusion. I realize I may be over sensitive on this topic
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2286456332
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -121,13 +122,37 @@ pub struct TopK {
/// Common sort prefix between the input and the sort expressions to allow
early exit optimization
common_sort_prefix: Arc<[PhysicalSortExpr]>,
/// Filter matching the state of the `TopK` heap used for dynamic filter
pushdown
-filter: Option>,
+filter: TopKDynamicFilters,
/// If true, indicates that all rows of subsequent batches are guaranteed
/// to be greater (by byte order, after row conversion) than the top K,
/// which means the top K won't change and the computation can be finished
early.
pub(crate) finished: bool,
}
+#[derive(Debug, Clone)]
+pub struct TopKDynamicFilters {
Review Comment:
Yes because it gets passed into `TopK::try_new` which is currently public.
Maybe `TopK` should be `pub(crate)` then we could make the whole thing
`pub(crate)` but as it stands that's a breaking change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2286456332
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -121,13 +122,37 @@ pub struct TopK {
/// Common sort prefix between the input and the sort expressions to allow
early exit optimization
common_sort_prefix: Arc<[PhysicalSortExpr]>,
/// Filter matching the state of the `TopK` heap used for dynamic filter
pushdown
-filter: Option>,
+filter: TopKDynamicFilters,
/// If true, indicates that all rows of subsequent batches are guaranteed
/// to be greater (by byte order, after row conversion) than the top K,
/// which means the top K won't change and the computation can be finished
early.
pub(crate) finished: bool,
}
+#[derive(Debug, Clone)]
+pub struct TopKDynamicFilters {
Review Comment:
Yes because it gets passed into `TopK::try_new`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2286448656
##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -137,7 +139,7 @@ impl DynamicFilterPhysicalExpr {
Self {
children,
remapped_children: None, // Initially no remapped children
-inner: Arc::new(RwLock::new(Inner::new(inner))),
+inner: Arc::new(ArcSwap::from_pointee(Inner::new(inner))),
Review Comment:
> it seems to have plenty of dependencies
https://crates.io/crates/arc-swap/1.7.1/dependencies
I see no required dependencies. Only dev deps.
> And its last release was over a year ago:
https://crates.io/crates/arc-swap/1.7.1
Personally I don't see any issue with this: I'd look at the ratio of open
issues / obvious bugs vs. releases. If there's no features to add and no major
bugs having few releases can be a good thing π
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
alamb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2286140587
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -214,41 +239,39 @@ impl TopK {
let mut selected_rows = None;
-if let Some(filter) = self.filter.as_ref() {
-// If a filter is provided, update it with the new rows
-let filter = filter.current()?;
-let filtered = filter.evaluate(&batch)?;
-let num_rows = batch.num_rows();
-let array = filtered.into_array(num_rows)?;
-let mut filter = array.as_boolean().clone();
-let true_count = filter.true_count();
-if true_count == 0 {
-// nothing to filter, so no need to update
-return Ok(());
+// If a filter is provided, update it with the new rows
+let filter = self.filter.expr.current()?;
+let filtered = filter.evaluate(&batch)?;
Review Comment:
A minor nit for a follow on would be to move this code into its own function
like `TopK::update_dynamic_filter` or something to make it easier to understand
##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -137,7 +139,7 @@ impl DynamicFilterPhysicalExpr {
Self {
children,
remapped_children: None, // Initially no remapped children
-inner: Arc::new(RwLock::new(Inner::new(inner))),
+inner: Arc::new(ArcSwap::from_pointee(Inner::new(inner))),
Review Comment:
I made a PR to show what this looks like without arc_swap
- https://github.com/apache/datafusion/pull/17245
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -121,13 +122,37 @@ pub struct TopK {
/// Common sort prefix between the input and the sort expressions to allow
early exit optimization
common_sort_prefix: Arc<[PhysicalSortExpr]>,
/// Filter matching the state of the `TopK` heap used for dynamic filter
pushdown
-filter: Option>,
+filter: TopKDynamicFilters,
/// If true, indicates that all rows of subsequent batches are guaranteed
/// to be greater (by byte order, after row conversion) than the top K,
/// which means the top K won't change and the computation can be finished
early.
pub(crate) finished: bool,
}
+#[derive(Debug, Clone)]
+pub struct TopKDynamicFilters {
Review Comment:
does it need to be pub?
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -319,19 +342,84 @@ impl TopK {
/// (a > 2 OR (a = 2 AND b < 3))
/// ```
fn update_filter(&mut self) -> Result<()> {
-let Some(filter) = &self.filter else {
+// If the heap doesn't have k elements yet, we can't create thresholds
+let Some(max_row) = self.heap.max() else {
return Ok(());
};
-let Some(thresholds) = self.heap.get_threshold_values(&self.expr)?
else {
-return Ok(());
+
+let new_threshold_row = &max_row.row;
+
+// Extract scalar values BEFORE acquiring lock to reduce critical
section
+let thresholds = match self.heap.get_threshold_values(&self.expr)? {
+Some(t) => t,
+None => return Ok(()),
};
+// Extract filter expression reference before entering critical section
+let filter_expr = Arc::clone(&self.filter.expr);
+
+// Check if we need to update the threshold (lock-free read)
+let current_threshold = self.filter.threshold_row.load();
+let needs_update = match current_threshold.as_ref() {
+Some(current_row) => {
+// new < current means new threshold is more selective
+current_row.as_slice().cmp(new_threshold_row) ==
Ordering::Greater
+}
+None => true, // No current threshold, so we need to set one
+};
+
+// Only proceed if we need to update
+if needs_update {
+// Build the filter expression OUTSIDE any synchronization
+let predicate = Self::build_filter_expression(&self.expr,
thresholds)?;
+let new_threshold_arc = Arc::new(new_threshold_row.to_vec());
+
+// Atomically update the threshold using compare-and-swap
+let old_threshold = self.filter.threshold_row.compare_and_swap(
+Β€t_threshold,
+Some(Arc::clone(&new_threshold_arc)),
+);
+
+// Only update filter if we successfully updated the threshold
+// (or if there was no previous threshold and we're the first)
+let should_update_filter =
+match (old_threshold.as_ref(), current_threshold.as_ref()) {
+// We successfully swapped
+(Some(old), Some(expected)) if Arc::ptr_eq(old, expected)
=> true,
+// We were the first to set it
+(None, None) => true,
+
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
alamb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3201878567 πΆοΈ πΆοΈ πΆοΈ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3201759854 > Q11 β 80.40 ms β 11.82 ms β +6.80x faster  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
alamb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3201715769 π€: Benchmark completed Details ``` Comparing HEAD and topk-filters Benchmark run_topk_tpch.json ββββ³ββββ³βββ³ββββ β Queryβ HEAD β topk-filters βChange β β‘ββββββββββββββ© β Q1 β 24.21 ms β 7.05 ms β +3.43x faster β β Q2 β 31.06 ms β 30.96 ms β no change β β Q3 β 100.83 ms β 92.74 ms β +1.09x faster β β Q4 β 40.93 ms β 33.97 ms β +1.20x faster β β Q5 β 23.65 ms β 27.52 ms β 1.16x slower β β Q6 β 45.75 ms β 49.21 ms β 1.08x slower β β Q7 β 119.32 ms β125.34 ms β 1.05x slower β β Q8 β 93.13 ms β 71.32 ms β +1.31x faster β β Q9 β 94.83 ms β 92.51 ms β no change β β Q10 β 166.33 ms β160.07 ms β no change β β Q11 β 80.40 ms β 11.82 ms β +6.80x faster β ββββ΄ββββ΄βββ΄ββββ βββ³βββ β Benchmark Summary β β β‘βββββ© β Total Time (HEAD) β 820.45ms β β Total Time (topk-filters) β 702.52ms β β Average Time (HEAD) β 74.59ms β β Average Time (topk-filters) β 63.87ms β β Queries Faster β5 β β Queries Slower β3 β β Queries with No Change β3 β β Queries with Failureβ0 β βββ΄βββ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
alamb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3201681965 Given the nice performance improvements this seems to provide, I plan to review this PR carefully in the next day or two -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
alamb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3201680033 π€ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.11.0-1016-gcp #16~24.04.1-Ubuntu SMP Wed May 28 02:40:52 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing topk-filters (3945aba663062debc24757c857a8bd679ad8b223) to 8f15991f33bf6aca9d4da8958141b59d196b2ed6 [diff](https://github.com/apache/datafusion/compare/8f15991f33bf6aca9d4da8958141b59d196b2ed6..3945aba663062debc24757c857a8bd679ad8b223) using: topk_tpch Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3197984660 > QQuery 23β 7751.39 ms β 6855.54 ms β +1.13x faster > QQuery 23β 7690.27 ms β 6835.65 ms β +1.13x faster I'm guessing this is mostly real π -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
alamb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3197955377 π€: Benchmark completed Details ``` Comparing HEAD and topk-filters Benchmark clickbench_extended.json ββββ³ββ³βββ³ββββ β QueryβHEAD β topk-filters βChange β β‘ββββββββββββ© β QQuery 0 β 2053.78 ms β 1974.31 ms β no change β β QQuery 1 β 735.78 ms β660.02 ms β +1.11x faster β β QQuery 2 β 1460.75 ms β 1307.73 ms β +1.12x faster β β QQuery 3 β 623.71 ms β617.23 ms β no change β β QQuery 4 β 1285.31 ms β 1298.10 ms β no change β β QQuery 5 β 13863.88 ms β 14128.38 ms β no change β β QQuery 6 β 2383.77 ms β 2341.30 ms β no change β β QQuery 7 β 1951.39 ms β 1953.50 ms β no change β ββββ΄ββ΄βββ΄ββββ βββ³β β Benchmark Summary ββ β‘βββ© β Total Time (HEAD) β 24358.37ms β β Total Time (topk-filters) β 24280.57ms β β Average Time (HEAD) β 3044.80ms β β Average Time (topk-filters) β 3035.07ms β β Queries Faster β 2 β β Queries Slower β 0 β β Queries with No Change β 6 β β Queries with Failureβ 0 β βββ΄β Benchmark clickbench_partitioned.json ββββ³ββ³βββ³ββββ β QueryβHEAD β topk-filters βChange β β‘ββββββββββββ© β QQuery 0 β 2.30 ms β 2.32 ms β no change β β QQuery 1 β29.71 ms β 28.77 ms β no change β β QQuery 2 β77.53 ms β 76.14 ms β no change β β QQuery 3 β94.53 ms β 90.05 ms β no change β β QQuery 4 β 597.57 ms β609.85 ms β no change β β QQuery 5 β 880.91 ms β881.20 ms β no change β β QQuery 6 β 2.22 ms β 2.35 ms β 1.05x slower β β QQuery 7 β31.73 ms β 33.48 ms β 1.06x slower β β QQuery 8 β 861.09 ms β883.18 ms β no change β β QQuery 9 β 1167.22 ms β 1144.09 ms β no change β β QQuery 10β 233.16 ms β227.48 ms β no change β β QQuery 11β 263.75 ms β259.39 ms β no change β β QQuery 12β 865.53 ms β865.13 ms β no change β β QQuery 13β 1180.94 ms β 1231.48 ms β no change β β QQuery 14β 809.24 ms β809.07 ms β no change β β QQuery 15β 775.27 ms β777.78 ms β no change β β QQuery 16β 1619.25 ms β 1639.14 ms β no change β β QQuery 17β 1608.47 ms β 1643.10 ms β no change β β QQuery 18β 2875.25 ms β 3002.07 ms β no change β β QQuery 19β79.14 ms β 81.14 ms β no change β β QQuery 20β 1227.30 ms β 1234.70 ms β no change β β QQuery 21β 1377.70 ms β 1356.98 ms β no change β β QQuery 22β 2268.17 ms β 2238.49 ms β no change β β QQuery 23β 7690.27 ms β 6835.65 ms β +1.13x faster β β QQuery 24β 426.90 ms β417.85 ms β no change β β QQuery 25β 296.39 ms β294.12 ms β no change β β QQuery 26β 428.93 ms β417.70 ms β no change β β QQuery 27β 1618.21 ms β 1573.12 ms β no change β β QQuery 28β 12002.03 ms β 12016.65 ms β no change β β QQuery 29β 520.45 ms β523.19 ms β no change β β QQuery 30β 774.86 ms β761.23 ms β no change β β QQuery 31β 794.38 ms β783.65 ms β no change β β QQuery 32β 2461.37 ms β 2452.40 ms β no change β β QQuery 33β 3218.56 ms β 3214.72 ms β no change β β QQuery 34β 3200.35 ms β 3223.54 ms β no change β β QQuery 35β 1267.63 ms β 1276.62 ms β no change β β QQuery 36β 125.57 ms β121.93 ms β no change β β QQuery 37β53.62 ms β 53.02 ms β no change β β QQuery 38β 127.07 ms β124.19 ms β no change β β QQuery 39β 206.77 ms β199.55 ms β no change β β QQuery 40β41.71 ms β 41.96 ms β no change β β QQuery 41β39.44 ms β 38.61 ms β no change β β QQuery 42β32.99 ms β 32.51 ms β no change β ββββ΄ββ΄βββ΄ββββ βββ³β β Benchmark Summary ββ β‘βββ© β Total Time (HEAD) β 54255.46ms β β Total Time
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
Dandandan commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3197945496 Really nice π -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3197906730 @alamb I feel like the interesting benchmark to run is `topk_tpch` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
alamb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3197868163 π€ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.11.0-1016-gcp #16~24.04.1-Ubuntu SMP Wed May 28 02:40:52 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing topk-filters (3945aba663062debc24757c857a8bd679ad8b223) to 8f15991f33bf6aca9d4da8958141b59d196b2ed6 [diff](https://github.com/apache/datafusion/compare/8f15991f33bf6aca9d4da8958141b59d196b2ed6..3945aba663062debc24757c857a8bd679ad8b223) using: tpch_mem clickbench_partitioned clickbench_extended Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2282817078
##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -137,7 +139,7 @@ impl DynamicFilterPhysicalExpr {
Self {
children,
remapped_children: None, // Initially no remapped children
-inner: Arc::new(RwLock::new(Inner::new(inner))),
+inner: Arc::new(ArcSwap::from_pointee(Inner::new(inner))),
Review Comment:
Well I stumbled upon it because we originally had some performance
regressions and I suspected they might be due to hitting the `Arc>`s a
lot heavier. It seems that `ArcSwap` is the solution to that but I haven't
benchmarked to confirm the performance impact / if it's that or other
refactors. From a micro benchmark sitauation it makes sense. I can make a
branch / another PR without ArcSwap and we can see if there's a measurable
change at the macro level, but I expect it may be hard to measure.
That said it's a well maintained dep from a reputable maintainer, has a lot
of usage and has no dependencies of its own.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2282817078
##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -137,7 +139,7 @@ impl DynamicFilterPhysicalExpr {
Self {
children,
remapped_children: None, // Initially no remapped children
-inner: Arc::new(RwLock::new(Inner::new(inner))),
+inner: Arc::new(ArcSwap::from_pointee(Inner::new(inner))),
Review Comment:
Well I stumbled upon it because we originally had some performance
regressions and I suspected they might be due to hitting the `Arc>`s a
lot heavier. It seems that `ArcSwap` is the solution to that but I haven't
benchmarked to confirm the performance impact / if it's that or other
refactors. From a micro benchmark sitauation it makes sense. I can make a
branch / another PR without ArcSwap and we can see if there's a measurable
change at the macro level.
That said it's a well maintained dep from a reputable maintainer, has a lot
of usage and has no dependencies of its own.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
alamb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3197557763 π€: Benchmark completed Details ``` Comparing HEAD and topk-filters Benchmark clickbench_extended.json ββββ³ββ³βββ³ββββ β QueryβHEAD β topk-filters βChange β β‘ββββββββββββ© β QQuery 0 β 2098.75 ms β 2062.77 ms β no change β β QQuery 1 β 736.19 ms β636.32 ms β +1.16x faster β β QQuery 2 β 1447.86 ms β 1301.32 ms β +1.11x faster β β QQuery 3 β 640.39 ms β605.76 ms β +1.06x faster β β QQuery 4 β 1297.18 ms β 1346.75 ms β no change β β QQuery 5 β 13751.05 ms β 14220.36 ms β no change β β QQuery 6 β 2400.54 ms β 2335.19 ms β no change β β QQuery 7 β 1933.90 ms β 2000.55 ms β no change β ββββ΄ββ΄βββ΄ββββ βββ³β β Benchmark Summary ββ β‘βββ© β Total Time (HEAD) β 24305.85ms β β Total Time (topk-filters) β 24509.01ms β β Average Time (HEAD) β 3038.23ms β β Average Time (topk-filters) β 3063.63ms β β Queries Faster β 3 β β Queries Slower β 0 β β Queries with No Change β 5 β β Queries with Failureβ 0 β βββ΄β Benchmark clickbench_partitioned.json ββββ³ββ³βββ³ββββ β QueryβHEAD β topk-filters βChange β β‘ββββββββββββ© β QQuery 0 β 2.29 ms β 2.43 ms β 1.06x slower β β QQuery 1 β28.83 ms β 29.37 ms β no change β β QQuery 2 β75.39 ms β 76.90 ms β no change β β QQuery 3 β88.02 ms β 90.54 ms β no change β β QQuery 4 β 585.22 ms β605.52 ms β no change β β QQuery 5 β 849.36 ms β886.24 ms β no change β β QQuery 6 β 2.32 ms β 2.36 ms β no change β β QQuery 7 β31.99 ms β 33.28 ms β no change β β QQuery 8 β 830.49 ms β866.81 ms β no change β β QQuery 9 β 1110.42 ms β 1160.29 ms β no change β β QQuery 10β 221.99 ms β221.95 ms β no change β β QQuery 11β 255.21 ms β260.43 ms β no change β β QQuery 12β 826.88 ms β862.45 ms β no change β β QQuery 13β 1186.96 ms β 1159.18 ms β no change β β QQuery 14β 767.58 ms β799.01 ms β no change β β QQuery 15β 717.27 ms β794.54 ms β 1.11x slower β β QQuery 16β 1552.61 ms β 1604.86 ms β no change β β QQuery 17β 1542.14 ms β 1589.89 ms β no change β β QQuery 18β 2895.45 ms β 2868.94 ms β no change β β QQuery 19β79.79 ms β 81.57 ms β no change β β QQuery 20β 1278.75 ms β 1211.54 ms β +1.06x faster β β QQuery 21β 1374.67 ms β 1346.29 ms β no change β β QQuery 22β 2277.06 ms β 2246.93 ms β no change β β QQuery 23β 7751.39 ms β 6855.54 ms β +1.13x faster β β QQuery 24β 423.18 ms β406.23 ms β no change β β QQuery 25β 294.80 ms β289.58 ms β no change β β QQuery 26β 421.41 ms β404.45 ms β no change β β QQuery 27β 1609.35 ms β 1548.29 ms β no change β β QQuery 28β 12027.18 ms β 11975.67 ms β no change β β QQuery 29β 499.72 ms β516.83 ms β no change β β QQuery 30β 762.44 ms β768.76 ms β no change β β QQuery 31β 788.02 ms β800.18 ms β no change β β QQuery 32β 2404.71 ms β 2486.31 ms β no change β β QQuery 33β 3305.56 ms β 3284.68 ms β no change β β QQuery 34β 3362.30 ms β 3402.04 ms β no change β β QQuery 35β 1299.53 ms β 1264.01 ms β no change β β QQuery 36β 133.03 ms β130.08 ms β no change β β QQuery 37β54.36 ms β 56.59 ms β no change β β QQuery 38β 124.72 ms β124.72 ms β no change β β QQuery 39β 210.60 ms β203.82 ms β no change β β QQuery 40β46.04 ms β 42.45 ms β +1.08x faster β β QQuery 41β39.97 ms β 40.83 ms β no change β β QQuery 42β33.44 ms β 33.46 ms β no change β ββββ΄ββ΄βββ΄ββββ βββ³β β Benchmark Summary ββ β‘βββ© β Total Time (HEAD) β 54172.42ms β β Total Time
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2282817078
##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -137,7 +139,7 @@ impl DynamicFilterPhysicalExpr {
Self {
children,
remapped_children: None, // Initially no remapped children
-inner: Arc::new(RwLock::new(Inner::new(inner))),
+inner: Arc::new(ArcSwap::from_pointee(Inner::new(inner))),
Review Comment:
Well I stumbled upon it because we originally had some performance
regressions and I suspected they might be due to hitting the `Arc>`s a
lot heavier. It seems that `ArcSwap` is the solution to that but I haven't
benchmarked to confirm the performance impact / if it's that or other
refactors. From a micro benchmark sitauation it makes sense. I can make a
branch / another PR without ArcSwap and we can confirm from there if we want.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
alamb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3197411815 π€ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.11.0-1016-gcp #16~24.04.1-Ubuntu SMP Wed May 28 02:40:52 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing topk-filters (b7ac320c0d9fcc72f2bb3750cf19ea7be12ff320) to 8f15991f33bf6aca9d4da8958141b59d196b2ed6 [diff](https://github.com/apache/datafusion/compare/8f15991f33bf6aca9d4da8958141b59d196b2ed6..b7ac320c0d9fcc72f2bb3750cf19ea7be12ff320) using: tpch_mem clickbench_partitioned clickbench_extended Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
alamb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2282739532
##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -137,7 +139,7 @@ impl DynamicFilterPhysicalExpr {
Self {
children,
remapped_children: None, // Initially no remapped children
-inner: Arc::new(RwLock::new(Inner::new(inner))),
+inner: Arc::new(ArcSwap::from_pointee(Inner::new(inner))),
Review Comment:
do we have evidence that using ArcSwap is needed for performance?
I think we could use `parking_lot` mutexes if we wanted to make the code
cleaner (not check for Lock poisioning)
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -1158,7 +1153,10 @@ impl ExecutionPlan for SortExec {
context.session_config().batch_size(),
context.runtime_env(),
&self.metrics_set,
-self.filter.clone(),
+self.filter
+.as_ref()
+.expect("Filter should be set when fetch is Some")
+.clone(),
Review Comment:
I recommend turning this into an internal error so that if someone does hit
this for some reason the symptom is less severe
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3193850159 Okay I've re-run after 310100b and am now seeing only improvements across the board. @Dandandan I've requested another review from you since I think this may be in a good place now. ``` Comparing main and topk-filters Benchmark run_topk_tpch.json ββββ³βββ³βββ³ββββ β Queryβ main β topk-filters βChange β β‘βββββββββββββ© β Q1 β 11.22 ms β 9.41 ms β +1.19x faster β β Q2 β 13.90 ms β 11.78 ms β +1.18x faster β β Q3 β 33.71 ms β 29.55 ms β +1.14x faster β β Q4 β 13.81 ms β 12.89 ms β +1.07x faster β β Q5 β 8.65 ms β 7.04 ms β +1.23x faster β β Q6 β 14.67 ms β 14.65 ms β no change β β Q7 β 45.67 ms β 38.13 ms β +1.20x faster β β Q8 β 27.73 ms β 25.61 ms β +1.08x faster β β Q9 β 35.25 ms β 34.09 ms β no change β β Q10 β 56.27 ms β 52.07 ms β +1.08x faster β β Q11 β 32.06 ms β 30.42 ms β +1.05x faster β ββββ΄βββ΄βββ΄ββββ βββ³βββ β Benchmark Summary β β β‘βββββ© β Total Time (main) β 292.93ms β β Total Time (topk-filters) β 265.65ms β β Average Time (main) β 26.63ms β β Average Time (topk-filters) β 24.15ms β β Queries Faster β9 β β Queries Slower β0 β β Queries with No Change β2 β β Queries with Failureβ0 β βββ΄βββ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3184720872 ClickBench results for partitioned datasets look much better: ``` ββββ³β³βββ³ββββ β Queryβ main β topk-filters βChange β β‘βββββββββββ© β QQuery 0 β0.92 ms β 1.15 ms β 1.25x slower β β QQuery 1 β 12.45 ms β 11.01 ms β +1.13x faster β β QQuery 2 β 42.41 ms β 35.19 ms β +1.21x faster β β QQuery 3 β 40.63 ms β 33.55 ms β +1.21x faster β β QQuery 4 β 337.67 ms β267.36 ms β +1.26x faster β β QQuery 5 β 434.44 ms β362.25 ms β +1.20x faster β β QQuery 6 β1.21 ms β 1.05 ms β +1.15x faster β β QQuery 7 β 14.20 ms β 11.94 ms β +1.19x faster β β QQuery 8 β 413.55 ms β317.13 ms β +1.30x faster β β QQuery 9 β 601.85 ms β480.06 ms β +1.25x faster β β QQuery 10β 100.43 ms β 84.75 ms β +1.18x faster β β QQuery 11β 108.91 ms β 91.61 ms β +1.19x faster β β QQuery 12β 419.13 ms β323.82 ms β +1.29x faster β β QQuery 13β 570.70 ms β481.27 ms β +1.19x faster β β QQuery 14β 428.36 ms β310.63 ms β +1.38x faster β β QQuery 15β 426.88 ms β309.22 ms β +1.38x faster β β QQuery 16β 1251.45 ms β729.69 ms β +1.72x faster β β QQuery 17β 1175.59 ms β738.35 ms β +1.59x faster β β QQuery 18β 2670.84 ms β 1945.33 ms β +1.37x faster β β QQuery 19β 41.71 ms β 32.77 ms β +1.27x faster β β QQuery 20β 1002.98 ms β738.13 ms β +1.36x faster β β QQuery 21β 969.39 ms β763.04 ms β +1.27x faster β β QQuery 22β 1506.05 ms β 1214.86 ms β +1.24x faster β β QQuery 23β 4418.24 ms β 4008.72 ms β +1.10x faster β β QQuery 24β 198.37 ms β196.65 ms β no change β β QQuery 25β 150.97 ms β150.72 ms β no change β β QQuery 26β 200.45 ms β191.65 ms β no change β β QQuery 27β 1007.57 ms β893.24 ms β +1.13x faster β β QQuery 28β 7214.47 ms β 7065.58 ms β no change β β QQuery 29β 356.81 ms β327.34 ms β +1.09x faster β β QQuery 30β 323.66 ms β310.25 ms β no change β β QQuery 31β 366.94 ms β346.37 ms β +1.06x faster β β QQuery 32β 2028.11 ms β 1692.80 ms β +1.20x faster β β QQuery 33β 2286.38 ms β 1775.46 ms β +1.29x faster β β QQuery 34β 1978.34 ms β 2305.40 ms β 1.17x slower β β QQuery 35β 555.06 ms β552.82 ms β no change β β QQuery 36β 61.16 ms β 62.54 ms β no change β β QQuery 37β 24.59 ms β 23.99 ms β no change β β QQuery 38β 61.70 ms β 63.87 ms β no change β β QQuery 39β 102.67 ms β103.98 ms β no change β β QQuery 40β 17.16 ms β 18.43 ms β 1.07x slower β β QQuery 41β 16.16 ms β 17.05 ms β 1.06x slower β β QQuery 42β 12.72 ms β 13.14 ms β no change β ββββ΄β΄βββ΄ββββ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3184612966 I tried some stuff. Still not an obvious improvement: ``` /bench.sh compare main topk-filters Comparing main and topk-filters Benchmark run_topk_tpch.json ββββ³βββ³βββ³ββββ β Queryβ main β topk-filters βChange β β‘βββββββββββββ© β Q1 β 12.46 ms β 10.55 ms β +1.18x faster β β Q2 β 14.02 ms β 12.06 ms β +1.16x faster β β Q3 β 29.39 ms β 30.10 ms β no change β β Q4 β 13.74 ms β 12.79 ms β +1.07x faster β β Q5 β 7.44 ms β 7.09 ms β no change β β Q6 β 15.87 ms β 13.79 ms β +1.15x faster β β Q7 β 37.22 ms β 44.24 ms β 1.19x slower β β Q8 β 27.51 ms β 25.17 ms β +1.09x faster β β Q9 β 33.04 ms β 31.28 ms β +1.06x faster β β Q10 β 55.26 ms β 58.96 ms β 1.07x slower β β Q11 β 28.74 ms β 29.75 ms β no change β ββββ΄βββ΄βββ΄ββββ βββ³βββ β Benchmark Summary β β β‘βββββ© β Total Time (main) β 274.68ms β β Total Time (topk-filters) β 275.79ms β β Average Time (main) β 24.97ms β β Average Time (topk-filters) β 25.07ms β β Queries Faster β6 β β Queries Slower β2 β β Queries with No Change β3 β β Queries with Failureβ0 β βββ΄βββ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
Dandandan commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2186109216
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -319,19 +342,75 @@ impl TopK {
/// (a > 2 OR (a = 2 AND b < 3))
/// ```
fn update_filter(&mut self) -> Result<()> {
-let Some(filter) = &self.filter else {
+// If the heap doesn't have k elements yet, we can't create thresholds
+let Some(max_row) = self.heap.max() else {
return Ok(());
};
-let Some(thresholds) = self.heap.get_threshold_values(&self.expr)?
else {
-return Ok(());
+
+let new_threshold_row = &max_row.row;
+
+// Extract filter expression reference before entering critical section
+let filter_expr = Arc::clone(&self.filter.expr);
+
+// Check if we need to update and do both threshold and filter update
atomically
+{
+let mut threshold_guard = self.filter.threshold_row.write();
+if let Some(current_row) = threshold_guard.as_ref() {
+match current_row.as_slice().cmp(new_threshold_row) {
+Ordering::Greater => {
+// new < current, so new threshold is more selective
Review Comment:
I guess we should try changing it to the global heap thing - write updates
to the shared/global heap and update the filter based on global heap.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
Dandandan commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2186107501
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -319,19 +342,75 @@ impl TopK {
/// (a > 2 OR (a = 2 AND b < 3))
/// ```
fn update_filter(&mut self) -> Result<()> {
-let Some(filter) = &self.filter else {
+// If the heap doesn't have k elements yet, we can't create thresholds
+let Some(max_row) = self.heap.max() else {
return Ok(());
};
-let Some(thresholds) = self.heap.get_threshold_values(&self.expr)?
else {
-return Ok(());
+
+let new_threshold_row = &max_row.row;
+
+// Extract filter expression reference before entering critical section
+let filter_expr = Arc::clone(&self.filter.expr);
+
+// Check if we need to update and do both threshold and filter update
atomically
+{
+let mut threshold_guard = self.filter.threshold_row.write();
+if let Some(current_row) = threshold_guard.as_ref() {
+match current_row.as_slice().cmp(new_threshold_row) {
+Ordering::Greater => {
+// new < current, so new threshold is more selective
Review Comment:
The current solution seems roughly on par with main, I don't observe a
speedup.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
Dandandan commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2186106170
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -319,19 +342,75 @@ impl TopK {
/// (a > 2 OR (a = 2 AND b < 3))
/// ```
fn update_filter(&mut self) -> Result<()> {
-let Some(filter) = &self.filter else {
+// If the heap doesn't have k elements yet, we can't create thresholds
+let Some(max_row) = self.heap.max() else {
return Ok(());
};
-let Some(thresholds) = self.heap.get_threshold_values(&self.expr)?
else {
-return Ok(());
+
+let new_threshold_row = &max_row.row;
+
+// Extract filter expression reference before entering critical section
+let filter_expr = Arc::clone(&self.filter.expr);
+
+// Check if we need to update and do both threshold and filter update
atomically
+{
+let mut threshold_guard = self.filter.threshold_row.write();
+if let Some(current_row) = threshold_guard.as_ref() {
+match current_row.as_slice().cmp(new_threshold_row) {
+Ordering::Greater => {
+// new < current, so new threshold is more selective
Review Comment:
I think this was wrong before @adriangb - in the heap lower means more
selective
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
Dandandan commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3037193167 I am taking a look now, see if I can find a thing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2185669579
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -214,41 +238,39 @@ impl TopK {
let mut selected_rows = None;
-if let Some(filter) = self.filter.as_ref() {
-// If a filter is provided, update it with the new rows
-let filter = filter.current()?;
-let filtered = filter.evaluate(&batch)?;
-let num_rows = batch.num_rows();
-let array = filtered.into_array(num_rows)?;
-let mut filter = array.as_boolean().clone();
-let true_count = filter.true_count();
-if true_count == 0 {
-// nothing to filter, so no need to update
-return Ok(());
+// If a filter is provided, update it with the new rows
Review Comment:
Agreed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
Dandandan commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2185350825
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -214,41 +238,39 @@ impl TopK {
let mut selected_rows = None;
-if let Some(filter) = self.filter.as_ref() {
-// If a filter is provided, update it with the new rows
-let filter = filter.current()?;
-let filtered = filter.evaluate(&batch)?;
-let num_rows = batch.num_rows();
-let array = filtered.into_array(num_rows)?;
-let mut filter = array.as_boolean().clone();
-let true_count = filter.true_count();
-if true_count == 0 {
-// nothing to filter, so no need to update
-return Ok(());
+// If a filter is provided, update it with the new rows
Review Comment:
Ah yeah I understand... maybe we should check if it is unset / true as
additional "fast path".
But I think that's not the source of the slow down.
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -214,41 +238,39 @@ impl TopK {
let mut selected_rows = None;
-if let Some(filter) = self.filter.as_ref() {
-// If a filter is provided, update it with the new rows
-let filter = filter.current()?;
-let filtered = filter.evaluate(&batch)?;
-let num_rows = batch.num_rows();
-let array = filtered.into_array(num_rows)?;
-let mut filter = array.as_boolean().clone();
-let true_count = filter.true_count();
-if true_count == 0 {
-// nothing to filter, so no need to update
-return Ok(());
+// If a filter is provided, update it with the new rows
Review Comment:
Ah yeah I understand... maybe we should check if it is unset / returning
constant true as additional "fast path".
But I think that's not the source of the slow down.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2185318951
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -214,41 +238,39 @@ impl TopK {
let mut selected_rows = None;
-if let Some(filter) = self.filter.as_ref() {
-// If a filter is provided, update it with the new rows
-let filter = filter.current()?;
-let filtered = filter.evaluate(&batch)?;
-let num_rows = batch.num_rows();
-let array = filtered.into_array(num_rows)?;
-let mut filter = array.as_boolean().clone();
-let true_count = filter.true_count();
-if true_count == 0 {
-// nothing to filter, so no need to update
-return Ok(());
+// If a filter is provided, update it with the new rows
Review Comment:
I think Some(filter) was always passed into the constructor
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
Dandandan commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2185314221
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -214,41 +238,39 @@ impl TopK {
let mut selected_rows = None;
-if let Some(filter) = self.filter.as_ref() {
-// If a filter is provided, update it with the new rows
-let filter = filter.current()?;
-let filtered = filter.evaluate(&batch)?;
-let num_rows = batch.num_rows();
-let array = filtered.into_array(num_rows)?;
-let mut filter = array.as_boolean().clone();
-let true_count = filter.true_count();
-if true_count == 0 {
-// nothing to filter, so no need to update
-return Ok(());
+// If a filter is provided, update it with the new rows
Review Comment:
Hm.. it was used, as in the initial phase of topk there might not be
initialized (heap not yet filled) for larger values for limit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-3036131650 I did a bench run, confounding results: ``` ββββ³βββ³βββ³ββββ β Queryβ main β topk-filters βChange β β‘βββββββββββββ© β Q1 β 9.96 ms β 10.70 ms β 1.07x slower β β Q2 β 12.63 ms β 15.30 ms β 1.21x slower β β Q3 β 33.45 ms β 35.22 ms β 1.05x slower β β Q4 β 15.22 ms β 16.82 ms β 1.11x slower β β Q5 β 9.87 ms β 8.85 ms β +1.12x faster β β Q6 β 16.54 ms β 22.16 ms β 1.34x slower β β Q7 β 43.41 ms β 44.00 ms β no change β β Q8 β 22.91 ms β 34.56 ms β 1.51x slower β β Q9 β 38.68 ms β 39.60 ms β no change β β Q10 β 65.73 ms β 73.24 ms β 1.11x slower β β Q11 β 34.78 ms β 37.14 ms β 1.07x slower β ββββ΄βββ΄βββ΄ββββ βββ³βββ β Benchmark Summary β β β‘βββββ© β Total Time (main) β 303.17ms β β Total Time (topk-filters) β 337.58ms β β Average Time (main) β 27.56ms β β Average Time (topk-filters) β 30.69ms β β Queries Faster β1 β β Queries Slower β8 β β Queries with No Change β2 β β Queries with Failureβ0 β βββ΄βββ ``` Mostly... slower -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-2989360115 > how would you compute the shared heap on the fly? I was thinking you'd compute the top K of the top K * partitions on the fly. But maybe your proposal makes more sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
Dandandan commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-2989336247 > We could try a shared heap. It might work? I guess it will be a sort of balance between lock contention and better selectivity. Maybe we can balance it by having distinct heaps for writes with no locks but read only references to all of them so that when we do reads we compute on the fly the "combined" heap? Then we don't need any locks. The cost is that computations on the heap are larger but as long as `k ~ constant` then it should be fine. how would you compute the shared heap on the fly? I was thinking something similar: each write to a own heap, only write the ones that updated the local heap to the shared heap (limiting the lock access time). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-2988328335 We could try a shared heap. It might work? I guess it will be a sort of balance between lock contention and better selectivity. Maybe we can balance it by having distinct heaps for writes with no locks but read only references to all of them so that when we do reads we compute on the fly the "combined" heap? Then we don't need any locks. The cost is that computations on the heap are larger but as long as `k ~ constant` then it should be fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
Dandandan commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-2988289302 > > Hm my earlier benchmarks didn't seem correct. not sure where the earlier run came from π€ > > What do the current ones show? Not much improvement? Yes about the same compared to main. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-2987967895 > Hm my earlier benchmarks didn't seem correct. not sure where the earlier run came from π€ What do the current ones show? Not much improvement? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-2987966449 > I am wondering if we can somehow synchronize the values in the heap efficiently in order to make the filter for a topk(n*partitions) as efficient as topk(n) π€ I think the only way to do that would be to make a global heap and put a lock on it, similar to what we do with the filters? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2156917768
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -214,41 +238,39 @@ impl TopK {
let mut selected_rows = None;
-if let Some(filter) = self.filter.as_ref() {
-// If a filter is provided, update it with the new rows
-let filter = filter.current()?;
-let filtered = filter.evaluate(&batch)?;
-let num_rows = batch.num_rows();
-let array = filtered.into_array(num_rows)?;
-let mut filter = array.as_boolean().clone();
-let true_count = filter.true_count();
-if true_count == 0 {
-// nothing to filter, so no need to update
-return Ok(());
+// If a filter is provided, update it with the new rows
Review Comment:
Yeah I don't think that was useful /being used as a code path?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
Dandandan commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-2987413335 (Removed a non-working proposal). I am wondering if we can somehow synchronize the values in the heap efficiently in order to make the filter for a `topk(n*partitions)` as efficient as `topk(n)` π€ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
Dandandan commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-2987392519 WDYT @adriangb ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
Dandandan commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-2987381689 Synchronizing the max between all partitions will help a bit - but I wonder if we shouldn't just synchronize it based on the true TopK in SortPreservingMergeExec ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
Dandandan commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-2987370039 In hindsight I think actually another fact that we don't see topk being as effective with more partitions is that spreading them over partitions will essentially make `topk(n)` into a `topk(n*partitions)`. So a limit of `10` will be `200` with 20 partitions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
Dandandan commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-2986650188 Hm my earlier benchmarks didn't seem correct. not sure where the earlier run came from π€ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
Dandandan commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2156139567
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -319,13 +341,87 @@ impl TopK {
/// (a > 2 OR (a = 2 AND b < 3))
/// ```
fn update_filter(&mut self) -> Result<()> {
-let Some(filter) = &self.filter else {
-return Ok(());
-};
let Some(thresholds) = self.heap.get_threshold_values(&self.expr)?
else {
return Ok(());
};
+// Are the new thresholds more selective than our existing ones?
+let should_update = {
+let mut current = self.filter.thresholds.write();
Review Comment:
I think this would be a good idea anyway to simplify the code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
Dandandan commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2156133514
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -319,13 +341,87 @@ impl TopK {
/// (a > 2 OR (a = 2 AND b < 3))
/// ```
fn update_filter(&mut self) -> Result<()> {
-let Some(filter) = &self.filter else {
-return Ok(());
-};
let Some(thresholds) = self.heap.get_threshold_values(&self.expr)?
else {
return Ok(());
};
+// Are the new thresholds more selective than our existing ones?
+let should_update = {
+let mut current = self.filter.thresholds.write();
Review Comment:
Perhaps this lock takes too long + overhead of doing this for all updates +
all partitions quite often?
We could also store a `Row` instead of `thresholds` to make comparison much
quicker (should also be able to avoid allocations)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
Dandandan commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2156106525
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -214,41 +238,39 @@ impl TopK {
let mut selected_rows = None;
-if let Some(filter) = self.filter.as_ref() {
-// If a filter is provided, update it with the new rows
-let filter = filter.current()?;
-let filtered = filter.evaluate(&batch)?;
-let num_rows = batch.num_rows();
-let array = filtered.into_array(num_rows)?;
-let mut filter = array.as_boolean().clone();
-let true_count = filter.true_count();
-if true_count == 0 {
-// nothing to filter, so no need to update
-return Ok(());
+// If a filter is provided, update it with the new rows
Review Comment:
`if let Some(filter)` condition is gone?
This provided the "unitialized filter" case before (that just runs the
normal topk without any filtering overhead).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-2985967718 Seems like a bug in my implementation right? I'd be surprised if the update checks I added are that heavy compared to other work... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
Dandandan commented on PR #16433: URL: https://github.com/apache/datafusion/pull/16433#issuecomment-2985955904 It seems in some cases it's faster: ``` ββββ³ββ³βββ³ββββ β Queryβ topk-dynamic-filter β topk-filters βChange β β‘ββββββββββββ© β Q1 β16.17 ms β 16.89 ms β no change β β Q2 β27.60 ms β 20.22 ms β +1.36x faster β β Q3 β55.41 ms β 54.45 ms β no change β β Q4 β22.26 ms β 22.07 ms β no change β β Q5 β11.98 ms β 12.10 ms β no change β β Q6 β26.25 ms β 27.26 ms β no change β β Q7 β71.56 ms β 67.86 ms β +1.05x faster β β Q8 β90.02 ms β 46.15 ms β +1.95x faster β β Q9 β61.64 ms β 58.91 ms β no change β β Q10 β 101.13 ms β 98.24 ms β no change β β Q11 β58.43 ms β 56.85 ms β no change β ββββ΄ββ΄βββ΄ββββ ``` And in other cases it's mixed / slower: ```ββββ³ββ³βββ³ββββ β Queryβ topk-dynamic-filter β topk-filters βChange β β‘ββββββββββββ© β QQuery 0 β14.00 ms β 13.07 ms β +1.07x faster β β QQuery 1 β20.55 ms β 18.81 ms β +1.09x faster β β QQuery 2 β56.58 ms β 56.33 ms β no change β β QQuery 3 β56.30 ms β 54.94 ms β no change β β QQuery 4 β 468.00 ms β516.71 ms β 1.10x slower β β QQuery 5 β 626.85 ms β659.29 ms β 1.05x slower β β QQuery 6 β15.31 ms β 15.26 ms β no change β β QQuery 7 β30.44 ms β 21.48 ms β +1.42x faster β β QQuery 8 β 547.48 ms β569.67 ms β no change β β QQuery 9 β 744.50 ms β778.67 ms β no change β β QQuery 10β 182.53 ms β176.03 ms β no change β β QQuery 11β 217.10 ms β190.23 ms β +1.14x faster β β QQuery 12β 832.33 ms β715.38 ms β +1.16x faster β β QQuery 13β 1102.37 ms β914.96 ms β +1.20x faster β β QQuery 14β 823.97 ms β576.94 ms β +1.43x faster β β QQuery 15β 574.53 ms β496.51 ms β +1.16x faster β β QQuery 16β 1324.12 ms β 1150.68 ms β +1.15x faster β β QQuery 17β 1324.29 ms β 1245.55 ms β +1.06x faster β β QQuery 18β 2603.07 ms β 2248.86 ms β +1.16x faster β β QQuery 19β48.17 ms β 49.60 ms β no change β β QQuery 20β 978.10 ms β964.90 ms β no change β β QQuery 21β 1021.47 ms β 1050.51 ms β no change β β QQuery 22β 1958.07 ms β 1660.81 ms β +1.18x faster β β QQuery 23β 760.66 ms β 5353.24 ms β 7.04x slower β β QQuery 24β 525.78 ms β342.39 ms β +1.54x faster β β QQuery 25β 480.36 ms β278.59 ms β +1.72x faster β β QQuery 26β 542.13 ms β342.32 ms β +1.58x faster β β QQuery 27β 1826.05 ms β 1260.73 ms β +1.45x faster β β QQuery 28β 11094.31 ms β 10685.64 ms β no change β β QQuery 29β 427.18 ms β432.16 ms β no change β β QQuery 30β 826.43 ms β539.39 ms β +1.53x faster β β QQuery 31β 789.89 ms β553.49 ms β +1.43x faster β β QQuery 32β 2580.93 ms β 2339.06 ms β +1.10x faster β β QQuery 33β 2660.40 ms β 2761.14 ms β no change β β QQuery 34β 2948.08 ms β 2923.44 ms β no change β β QQuery 35β 886.56 ms β810.35 ms β +1.09x faster β β QQuery 36β18.59 ms β 83.97 ms β 4.52x slower β β QQuery 37β18.67 ms β 35.52 ms β 1.90x slower β β QQuery 38β18.10 ms β 81.86 ms β 4.52x slower β β QQuery 39β17.89 ms β135.04 ms β 7.55x slower β β QQuery 40β18.30 ms β 27.70 ms β 1.51x slower β β QQuery 41β18.15 ms β 26.07 ms β 1.44x slower β β QQuery 42β17.78 ms β 22.56 ms β 1.27x slower β ββββ΄ββ΄βββ΄ββββ ββ³β ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
Dandandan commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2154078467
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -319,13 +341,88 @@ impl TopK {
/// (a > 2 OR (a = 2 AND b < 3))
/// ```
fn update_filter(&mut self) -> Result<()> {
-let Some(filter) = &self.filter else {
-return Ok(());
-};
let Some(thresholds) = self.heap.get_threshold_values(&self.expr)?
else {
return Ok(());
};
+// Are the new thresholds more selective than our existing ones?
+let should_update = {
+if let Some(current) = self.filter.thresholds.write().as_mut() {
+assert!(current.len() == thresholds.len());
+// Check if new thresholds are more selective than current ones
+let mut more_selective = false;
+for ((current_value, new_value), sort_expr) in
+current.iter().zip(thresholds.iter()).zip(self.expr.iter())
+{
+// Handle null cases
+let (current_is_null, new_is_null) =
+(current_value.is_null(), new_value.is_null());
Review Comment:
Can't we use `ScalarValue::partial_cmp`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Only update TopK dynamic filters if the new ones are more selective [datafusion]
adriangb commented on code in PR #16433:
URL: https://github.com/apache/datafusion/pull/16433#discussion_r2153281386
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -846,8 +846,10 @@ pub struct SortExec {
common_sort_prefix: Vec,
/// Cache holding plan properties like equivalences, output partitioning
etc.
cache: PlanProperties,
-/// Filter matching the state of the sort for dynamic filter pushdown
-filter: Option>,
+/// Filter matching the state of the sort for dynamic filter pushdown.
+/// If `fetch` is `Some`, this will also be set and a TopK operator may be
used.
+/// If `fetch` is `None`, this will be `None`.
+filter: Option,
Review Comment:
I feel like there's some further refactoring that could happen here, e.g.
split up SortExec, leaving for another day.
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -1158,7 +1153,10 @@ impl ExecutionPlan for SortExec {
context.session_config().batch_size(),
context.runtime_env(),
&self.metrics_set,
-self.filter.clone(),
+self.filter
+.as_ref()
+.expect("Filter should be set when fetch is Some")
+.clone(),
Review Comment:
I refactored so that the `TopK` struct always expects this parameter which
better reflects the reality of execution. But since it's strangely tied to the
`fetch` param I am doing an `expect` assertion here. It should never fail at
runtime.
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -935,11 +940,6 @@ impl SortExec {
}
}
-pub fn with_filter(mut self, filter: Arc) ->
Self {
-self.filter = Some(filter);
-self
-}
Review Comment:
This was unused and had slipped through the cracks. I can make a new PR to
just remove these methods.
##
datafusion/physical-plan/src/topk/mod.rs:
##
@@ -319,13 +341,88 @@ impl TopK {
/// (a > 2 OR (a = 2 AND b < 3))
/// ```
fn update_filter(&mut self) -> Result<()> {
-let Some(filter) = &self.filter else {
-return Ok(());
-};
let Some(thresholds) = self.heap.get_threshold_values(&self.expr)?
else {
return Ok(());
};
+// Are the new thresholds more selective than our existing ones?
+let should_update = {
+if let Some(current) = self.filter.thresholds.write().as_mut() {
+assert!(current.len() == thresholds.len());
+// Check if new thresholds are more selective than current ones
+let mut more_selective = false;
+for ((current_value, new_value), sort_expr) in
+current.iter().zip(thresholds.iter()).zip(self.expr.iter())
+{
+// Handle null cases
+let (current_is_null, new_is_null) =
+(current_value.is_null(), new_value.is_null());
+
+match (current_is_null, new_is_null) {
+(true, true) => {
+// Both null, continue checking next values
+}
+(true, false) => {
+// Current is null, new is not null
+// For nulls_first: null < non-null, so new value
is less selective
+// For nulls_last: null > non-null, so new value
is more selective
+more_selective = !sort_expr.options.nulls_first;
+break;
+}
+(false, true) => {
+// Current is not null, new is null
+// For nulls_first: non-null > null, so new value
is more selective
+// For nulls_last: non-null < null, so new value
is less selective
+more_selective = sort_expr.options.nulls_first;
+break;
+}
+(false, false) => {
+// Neither is null, compare values
+match current_value.partial_cmp(new_value) {
+Some(ordering) => {
+match ordering {
+Ordering::Equal => {
+// Continue checking next values
+}
+Ordering::Less => {
+// For descending sort: new >
current means more selective
+// For ascending sort: new >
current means less selective
+more_selective =
sort_expr.
