Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2917574821 Thanks for all your help @Rachelint and congratulations on the new job -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2917518553 Thanks @adriangb @Dandandan . I just start my new job this week and a bit busy, and I will continue to push it forward this weekend. The new targets for this one may be - Mainly as a base of better aggregation spilling - Slightly improve performance So the rest works I think: - File an issue about the new aggregation spilling proposal based on this one - Support `block mode` for `NullState` again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Dandandan commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2911070606 I agree with @adriangb , even when it doesn't provide performance improvements still super valuable to push it forward. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
adriangb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2910963654 > Part of the better aggregation spilling (I will file a issue to discuss it maybe tonight) Without having a full understanding of this PR (I have just been following the conversation because the change is exciting) my 2Β’ is: for us memory management is currently one of the biggest thorns with DataFusion. It is quite hard at the moment to run with a fixed memory budget given the mix of exceeding memory through under accounting and refusing to run operators that can't spill / try to claim more memory than they actually use. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2910945348 > > My biggest concern is if we can get more obvious improvement to make the change worthy... > > As I understood the way to get a bigger improvement would be to implement the chunked approach for more group storage / aggregates so that more queries in our benchmarks (like ClickBench) could use the new code path > > Though of course that would make this PR even bigger > > We could also make a "POC" type PR with some more implementation to prove out the performance and then break it into smaller pieces for review π€ Yes, I am trying to implement it for `count`, and see if some improvements in `q4` and `q15`. But according to the flamegraph, I found all such queries' bottleneck is actually `hashtable`. I think performance improvement from this pr will be not obvious before we overcome `hashtable`(I am experimenting about clickhouse like hashtable). I think the benefit from this one currently is: - Part of the better aggregation spilling (I will file a issue to discuss it maybe tonight) - Lower memory usage due to faster freeing batch by batch - Can make really high cardinality query around 1.1 faster Do you think it still worthy continuing to push forward for above benefits? I plan to sort out codes tonight, and we just make a simplest implementation in first stage (now is a bit complex due to some experiment about continue improving performance)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2909172790 > My biggest concern is if we can get more obvious improvement to make the change worthy... As I understood the way to get a bigger improvement would be to implement the chunked approach for more group storage / aggregates so that more queries in our benchmarks (like ClickBench) could use the new code path -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2907882330 > I wonder what the plan is for this PR? > > From what I understand, it currently improves performance for aggregates with large numbers of groups, but (slightly) slows down aggregates for smaller numbers of groups. I think this is due to accessing group storage via two indirections (block index / actual index) > > It seems like the [proposal](https://github.com/apache/datafusion/pull/15591#issuecomment-2890775426) is to have some sort of adaptive structure that uses one part indexes for small numbers of groups and then switches to two part indexes for larger numbers. I am experimenting if something we can do basing on this pr to improve performance more, like [memory reuse] (https://github.com/apache/datafusion/pull/15591#issuecomment-2890663260). Actually #16135 is part of the attempt. My biggest concern is if we can get more obvious improvement to make the change worthy... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2907769450 I wonder what the plan is for this PR? From what I understand, it currently improves performance for aggregates with large numbers of groups, but (slightly) slows down aggregates for smaller numbers of groups. I think this is due to accessing group storage via two indirections (block index / actual index) It seems like the [proposal](https://github.com/apache/datafusion/pull/15591#issuecomment-2890775426) is to have some sort of adaptive structure that uses one part indexes for small numbers of groups and then switches to two part indexes for larger numbers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2890443367 π€ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.11.0-1013-gcp #13~24.04.1-Ubuntu SMP Wed Apr 2 16:34:16 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing intermeidate-result-blocked-approach (600318f348908f3aa3a80ddc3c8cb6d9c76deabc) to 07fe23f03dcb53b2715d3f3b3e0c476d54202c00 [diff](https://github.com/apache/datafusion/compare/07fe23f03dcb53b2715d3f3b3e0c476d54202c00..600318f348908f3aa3a80ddc3c8cb6d9c76deabc) Benchmarks: clickbench_1 tpch_mem Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Dandandan commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2890775426 > We make the block size large enough For the first block, we still perform resizing at firstly But after it grow large enough, we switch to blocked approach? Yes - exactly! Alternatively maybe we even can switch from flat to block approach after reaching a certain threshold (for example 1 million groups) to avoid overhead of the block approach on lower cardinality cases? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2890634742 > I wonder what happens if we make it more like at least 1 million or 1MiB so the effect on cache-friendliness is smaller? > We could optimize a growing strategy for the first allocated Vec if memory usage / overhead of first block is a concern. I have tried to larger the block size(8 * batch, 16 * batch...), but it seems make slight difference to the performance. So after experiement, I think `single vector + resizing` is efficient enough actually... - It is more efficient for random access - Resizing will only happen a few times, so it is acceptable actually -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2890735910 >I wonder what happens if we make it more like at least 1 million or 1MiB so the effect on cache-friendliness is smaller? We could optimize a growing strategy for the first allocated Vec if memory usage / overhead of first block is a concern. > I think we should try to minimize the impact of this on low-cardinality cases (e.g. make sure they fit in one array, minimize the overhead of blocks)... If I don't misunderstand, does it mean strategy like that: - We make the block size large enough - For the first block, we still perform `resizing` at firstly - But after it grow large enough, we switch to `blocked approach`? > Yeah it is quite efficient, although problematic for large inputs Agree. It also leads to large memory usage, because we only release memory after all the batches are returned(we hold the really single batch, and only return slice of it now). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Dandandan commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2890700369 > Yes, in current implementation, it only help to performance with large amount of intermediate groups, because the slice will be called many many time and the cost become unacceptable. And for query with only small groups, it nearly make no difference. Yeah I think that was expected. I think we should try to minimize the impact of this on low-cardinality cases (e.g. make sure they fit in one array, minimize the overhead of blocks)... > So after experiement, I think single vector + resizing is efficient enough actually... Yeah it is quite efficient, although problematic for large inputs * Offset out of bounds for utf8 / binary data. * Overallocation due to exponential allocation strategy So even with roughly the same performance I think we should still strive to make the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2890663260 > Me too -- I looked at the flamegraph you provided and I agree it seems like almost half the allocation time is spent with pagefaults / zeroing memory. However, I can't tell if that is because there is slowness with the underlying Vec that wasn't initialized or if there is something else going on. I think I nearly understand why about this, it is possibly be led by `lto`, `lto` seems found the initialization is unnecessary actually, so it remove it(just like calling `set_len` manually). > I suspect you already know this, but I think you can get back the original Vec from an array via Got it! > From those numbers, is it a fair assessment that the blocked approach improves performance when there is a large number of intermediate groups, but does not when there is a small number of groups? I think it maybe be possible in this situation? - When `input` for `accumualtor` and `group values` is consumed, we collect them, and transform them back to `Vec`. - Then we push them in `accumualtor` and `group values`. - Finally we reuse them in next round computation of `accumualtor` and `group values`? > From those numbers, is it a fair assessment that the blocked approach improves performance when there is a large number of intermediate groups, but does not when there is a small number of groups? Yes, in current implementation, it only help to performance with `large amount` of intermediate groups, because the `slice` will be called many many time and the cost become unacceptable. And for query with only small groups, it nearly make no difference. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2890562106 > BTW, I am confused about why so many `page_fault` in the `blocked accumualte`. Me too -- I looked at the flamegraph you provided and I agree it seems like almost half the allocation time is spent with pagefaults / zeroing memory. However, I can't tell if that is because there is slowness with the underlying Vec that wasn't initialized or if there is something else going on.  > * But it maybe not really help much for performance (performance improve mainly due to removal of usage of expansive `slice`) currently. Yes, that was my understanding -- that blocked aggregation would only help performance when the number of intermediate groups was large (which forced additional memory allocations) > But inspired by the `batch_size` based memory allocation, I am thinking can we have some ways to reuse memory? And I am trying it today. I suspect you already know this, but I think you can get back the original Vec from an array via 1. `PrimitiveArray::into_parts()` --> get a `ScalarBuffer` 2. `ScalarBuffer::into_inner()` --> get a `Buffer` 3. `[Buffer::into_vec()](https://docs.rs/arrow/latest/arrow/buffer/struct.Buffer.html#method.into_vec)` However, in the high cardinality case, I am not sure there are buffers to reuse during aggregation (the buffers are all held until the output is needed, and then once output is needed they don't get re-created) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2890589301 > I think still need some time to evaluate what benefits can this brings I think I am somewhat lost with the current state. Your comment on https://github.com/apache/datafusion/pull/15591#issuecomment-2823308363 states > I add a query in extened.sql, the blocked approach can get a obvious improvement as expected. > > I have confidence it can even improve more in some other queries according to poc https://github.com/apache/datafusion/pull/11943 And the most recent benchmark run https://github.com/apache/datafusion/pull/15591#issuecomment-2890539606 seems to confirm this finding: > ββββ³β³βββ³ββββ β Queryβ HEAD β intermeidate-result-blocked-approach β Change β β‘βββββββββββ© β QQuery 0 β 1916.94ms β1866.52ms β no change β β QQuery 1 β 709.88ms β 673.73ms β +1.05x faster β β QQuery 2 β 1482.93ms β1391.57ms β +1.07x faster β β QQuery 3 β 701.88ms β 721.01ms β no change β β QQuery 4 β 1494.91ms β1494.55ms β no change β β QQuery 5 β 15627.54ms β 15418.84ms β no change β β QQuery 6 β 2070.08ms β2042.39ms β no change β β QQuery 7 β 2108.54ms β1952.66ms β +1.08x faster β ββββ΄β΄βββ΄ββββ However, some of the results on shorter running queries (few groups) shows perhaps a slowdown: https://github.com/apache/datafusion/pull/15591#issuecomment-2890527702 From those numbers, is it a fair assessment that the blocked approach improves performance when there is a large number of intermediate groups, but does not when there is a small number of groups? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2095456466
##
datafusion/functions-aggregate-common/benches/null_state_accumulate.rs:
##
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+extern crate criterion;
+
+use std::sync::Arc;
+
+use arrow::array::ArrowNativeTypeOp;
+use arrow::{
+array::{ArrayRef, AsArray, BooleanArray, Int64Array},
+datatypes::Int64Type,
+};
+use criterion::{criterion_group, criterion_main, Criterion};
+use
datafusion_functions_aggregate_common::aggregate::groups_accumulator::group_index_operations::FlatGroupIndexOperations;
+use datafusion_functions_aggregate_common::aggregate::groups_accumulator::{
+accumulate::{self, accumulate_indices, NullStateAdapter},
+blocks::GeneralBlocks,
+group_index_operations::{BlockedGroupIndexOperations,
GroupIndexOperations},
+};
+
+fn generate_group_indices(len: usize) -> Vec {
+(0..len).collect()
Review Comment:
Does using indices `0..len` mean this benchmark is testing the case where
each input has a new unique group?
If this is the case, then I think it would make sense that the benchmark
shows lots of time allocating / zeroing memory: the bulk of the work will be
copying each value into a new accumulator slot.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Dandandan commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2890549663 > * We perform few large memory allocation in `flat`, but will much more small memory allocations in `blocked` > * The memory no continuous anymore, I think it may be not so friendly for cpu(like cache prefetch?) I wonder what happens if we make it more like at least 1 million or 1MiB (e.g. one) so the effect on cache is smaller? We could optimize a growing strategy for the first allocated `Vec` if memory usage / overhead of first block is a concern. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2890539606 π€: Benchmark completed Details ``` Comparing HEAD and intermeidate-result-blocked-approach Benchmark clickbench_extended.json ββββ³β³βββ³ββββ β Queryβ HEAD β intermeidate-result-blocked-approach β Change β β‘βββββββββββ© β QQuery 0 β 1916.94ms β1866.52ms β no change β β QQuery 1 β 709.88ms β 673.73ms β +1.05x faster β β QQuery 2 β 1482.93ms β1391.57ms β +1.07x faster β β QQuery 3 β 701.88ms β 721.01ms β no change β β QQuery 4 β 1494.91ms β1494.55ms β no change β β QQuery 5 β 15627.54ms β 15418.84ms β no change β β QQuery 6 β 2070.08ms β2042.39ms β no change β β QQuery 7 β 2108.54ms β1952.66ms β +1.08x faster β ββββ΄β΄βββ΄ββββ βββ³β β Benchmark Summary ββ β‘βββ© β Total Time (HEAD) β 26112.69ms β β Total Time (intermeidate-result-blocked-approach) β 25561.26ms β β Average Time (HEAD) β 3264.09ms β β Average Time (intermeidate-result-blocked-approach) β 3195.16ms β β Queries Faster β 3 β β Queries Slower β 0 β β Queries with No Change β 5 β βββ΄β ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2890527834 π€ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.11.0-1013-gcp #13~24.04.1-Ubuntu SMP Wed Apr 2 16:34:16 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing intermeidate-result-blocked-approach (600318f348908f3aa3a80ddc3c8cb6d9c76deabc) to 07fe23f03dcb53b2715d3f3b3e0c476d54202c00 [diff](https://github.com/apache/datafusion/compare/07fe23f03dcb53b2715d3f3b3e0c476d54202c00..600318f348908f3aa3a80ddc3c8cb6d9c76deabc) Benchmarks: clickbench_extended Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2890527702 π€: Benchmark completed Details ``` Comparing HEAD and intermeidate-result-blocked-approach Benchmark clickbench_1.json ββββ³β³βββ³ββββ β Queryβ HEAD β intermeidate-result-blocked-approach β Change β β‘βββββββββββ© β QQuery 0 β 0.64ms β 0.61ms β +1.06x faster β β QQuery 1 β73.32ms β 77.79ms β 1.06x slower β β QQuery 2 β 116.11ms β 121.93ms β 1.05x slower β β QQuery 3 β 127.64ms β 128.08ms β no change β β QQuery 4 β 650.79ms β 627.02ms β no change β β QQuery 5 β 862.07ms β 864.44ms β no change β β QQuery 6 β 0.76ms β 0.71ms β +1.07x faster β β QQuery 7 β89.64ms β 96.33ms β 1.07x slower β β QQuery 8 β 945.07ms β 945.95ms β no change β β QQuery 9 β 1243.07ms β1248.13ms β no change β β QQuery 10β 294.76ms β 302.60ms β no change β β QQuery 11β 327.74ms β 342.77ms β no change β β QQuery 12β 876.40ms β 901.09ms β no change β β QQuery 13β 1301.86ms β1359.55ms β no change β β QQuery 14β 864.23ms β 850.71ms β no change β β QQuery 15β 845.06ms β 852.93ms β no change β β QQuery 16β 1750.37ms β1757.44ms β no change β β QQuery 17β 1605.74ms β1621.73ms β no change β β QQuery 18β 3082.30ms β3123.52ms β no change β β QQuery 19β 124.11ms β 124.83ms β no change β β QQuery 20β 1179.74ms β1168.95ms β no change β β QQuery 21β 1413.11ms β1400.95ms β no change β β QQuery 22β 2537.54ms β2451.09ms β no change β β QQuery 23β 8489.89ms β8532.97ms β no change β β QQuery 24β 511.43ms β 505.62ms β no change β β QQuery 25β 427.56ms β 425.53ms β no change β β QQuery 26β 577.57ms β 577.86ms β no change β β QQuery 27β 1707.08ms β1709.85ms β no change β β QQuery 28β 12952.06ms β 13194.62ms β no change β β QQuery 29β 573.95ms β 570.25ms β no change β β QQuery 30β 846.22ms β 855.72ms β no change β β QQuery 31β 903.90ms β 896.13ms β no change β β QQuery 32β 2682.87ms β2680.35ms β no change β β QQuery 33β 3413.44ms β3409.35ms β no change β β QQuery 34β 3453.88ms β3441.08ms β no change β β QQuery 35β 1287.13ms β1278.56ms β no change β β QQuery 36β 175.07ms β 178.36ms β no change β β QQuery 37β 104.82ms β 103.96ms β no change β β QQuery 38β 174.73ms β 172.98ms β no change β β QQuery 39β 261.50ms β 258.02ms β no change β β QQuery 40β87.21ms β 89.21ms β no change β β QQuery 41β86.33ms β 85.57ms β no change β β QQuery 42β82.95ms β 79.43ms β no change β ββββ΄β΄βββ΄ββββ βββ³β β Benchmark Summary ββ β‘βββ© β Total Time (HEAD) β 59111.65ms β β Total Time (intermeidate-result-blocked-approach) β 59414.57ms β β Average Time (HEAD) β 1374.69ms β β Average Time (intermeidate-result-blocked-approach) β 1381.73ms β β
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2890524255 > Will do Thanks! > No worries -- I am sorry I have not given it as much attention as I would like. Is there anything else I can do to help (like implement one of the grouped memory examples)? I am about to file a ticket to start organizing the follow on work I think still need some time to evaluate what benefits can this brings After experiment, I found: - It can really help to reduce memory usage as @joroKr21 mentioned in #9562 - But it maybe not really help much for performance (performance improve mainly due to removal of usage of expansive `slice`) currently. But inspired by the `batch_size` based memory allocation, I am thinking can we have some ways to reuse memory? And I am trying it today. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Dandandan commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2890507769 > @alamb @Dandandan It is sorry that, I still working on other some new ideas about this pr, so switch to draft again. No worries -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2890475454 > @alamb Hello, is it ok to trigger the benchmark again? Will do > @alamb @Dandandan It is sorry that, I still working on other some new ideas about this pr, so switch to draft again. No worries -- I am sorry I have not given it as much attention as I would like. Is there anything else I can do to help (like implement one of the grouped memory examples)? I am about to file a ticket to start organizing the follow on work -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2890398419 @alamb @Dandandan It is sorry that, I still working on other some new ideas about this pr, so switch to draft again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2889120962 @alamb Hello, is it ok to trigger the benchmark again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-229262 My plan is to adjust the `block size`, make `block size = N * batch size`. We allocate larger memory in once, also make cpu easier to prefetch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-227881   -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-227007 I make a dedicated benchmark for `accumulate`. As in the total flamegraph, actually no improvement for `blocked accumulate`, although resizing cost is reduced (the query performance increased is my local, I guess it is due to `removing for expansive array slice`) . I guess the reasons may be: - We perform few large memory allocation in `flat`, but will much more small memory allocations in `blocked` - The memory no continuous anymore, I think it may be not so friendly for cpu(like cache prefetch?) ``` Flat accumulate time: [135.16 ms 135.20 ms 135.25 ms] change: [-1.9077% -1.8671% -1.8244%] (p = 0.00 < 0.05) Performance has improved. Blocked accumulate time: [139.89 ms 139.91 ms 139.92 ms] (even a bit slower) change: [+0.3130% +0.3246% +0.3359%] (p = 0.00 < 0.05) Change within noise threshold. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2888275782 > we could enforce blocksize being a power of two so we can avoid the expensive div and mod operations? Yes, I am trying to do it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Dandandan commented on PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2888271159
> OK... I guess it is due to the computation of `block_id` and
`block_offset`.
>
> I found a machine with similar inteal cores like the testing machine and
profiled.
>
> Then I found in the old and not so good intel core, the `/` and `%` are
really expansive.
>
> I am trying to optimize it, but still not sure if it the most important
bottleneck
>
> ```rust
> impl GroupIndexOperations for BlockedGroupIndexOperations {
> fn get_block_id(group_index: usize, block_size: usize) -> usize {
> group_index / block_size
> }
>
> fn get_block_offset(group_index: usize, block_size: usize) -> usize {
> group_index % block_size
> }
> }
> ```
we could enforce blocksize being a power of two so we can avoid the
expensive div and mod operations?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2888248445
OK... I guess it is due to the computation of `block_id` and `block_offset`.
I found a machine with similar cores and profiling.
Then I found in the old and so good intel core, the `/` and `%` are really
expansive.
I am trying to optimize it.
```rust
impl GroupIndexOperations for BlockedGroupIndexOperations {
fn get_block_id(group_index: usize, block_size: usize) -> usize {
group_index / block_size
}
fn get_block_offset(group_index: usize, block_size: usize) -> usize {
group_index % block_size
}
}
```
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2888248441
OK... I guess it is due to the computation of `block_id` and `block_offset`.
I found a machine with similar cores and profiling.
Then I found in the old and so good intel core, the `/` and `%` are really
expansive.
I am trying to optimize it.
```rust
impl GroupIndexOperations for BlockedGroupIndexOperations {
fn get_block_id(group_index: usize, block_size: usize) -> usize {
group_index / block_size
}
fn get_block_offset(group_index: usize, block_size: usize) -> usize {
group_index % block_size
}
}
```
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2888023128 > π€: Benchmark completed > Details > > ``` > Comparing HEAD and intermeidate-result-blocked-approach > > Benchmark clickbench_extended.json > > ββββ³β³βββ³βββ > β Queryβ HEAD β intermeidate-result-blocked-approach β Change β > β‘ββββββββββ© > β QQuery 0 β 1952.39ms β2165.55ms β 1.11x slower β > β QQuery 1 β 725.35ms β 776.75ms β 1.07x slower β > β QQuery 2 β 1474.77ms β1525.72ms βno change β > β QQuery 3 β 720.28ms β 733.30ms βno change β > β QQuery 4 β 1491.93ms β1549.46ms βno change β > β QQuery 5 β 15503.37ms β 15748.81ms βno change β > β QQuery 6 β 2118.87ms β2106.19ms βno change β > β QQuery 7 β 2215.33ms β2189.80ms βno change β > ββββ΄β΄βββ΄βββ > βββ³β > β Benchmark Summary ββ > β‘βββ© > β Total Time (HEAD) β 26202.29ms β > β Total Time (intermeidate-result-blocked-approach) β 26795.58ms β > β Average Time (HEAD) β 3275.29ms β > β Average Time (intermeidate-result-blocked-approach) β 3349.45ms β > β Queries Faster β 0 β > β Queries Slower β 2 β > β Queries with No Change β 6 β > βββ΄β > > Benchmark clickbench_partitioned.json > > ββββ³β³βββ³βββ > β Queryβ HEAD β intermeidate-result-blocked-approach β Change β > β‘ββββββββββ© > β QQuery 0 β 2.42ms β 2.74ms β 1.13x slower β > β QQuery 1 β36.72ms β 39.03ms β 1.06x slower β > β QQuery 2 β91.22ms β 95.52ms βno change β > β QQuery 3 β 101.22ms β 101.34ms βno change β > β QQuery 4 β 638.98ms β 639.74ms βno change β > β QQuery 5 β 906.48ms β 875.22ms βno change β > β QQuery 6 β 2.31ms β 2.46ms β 1.06x slower β > β QQuery 7 β43.69ms β 43.48ms βno change β > β QQuery 8 β 960.12ms β 947.47ms βno change β > β QQuery 9 β 1290.26ms β1263.99ms βno change β > β QQuery 10β 273.55ms β 273.89ms βno change β > β QQuery 11β 306.30ms β 319.30ms βno change β > β QQuery 12β 969.05ms β 954.35ms βno change β > β QQuery 13β 1311.18ms β1265.49ms βno change β > β QQuery 14β 891.85ms β 867.59ms βno change β > β QQuery 15β 860.97ms β 883.83ms βno change β > β QQuery 16β 1764.37ms β1767.56ms βno change β > β QQuery 17β 1653.48ms β1634.83ms βno change β > β QQuery 18β 3151.14ms β3177.39ms βno change β > β QQuery 19β86.57ms β 83.49ms βno change β > β QQuery 20β 1164.58ms β1161.57ms βno change β > β QQuery 21β 1380.48ms β1337.39ms βno change β > β QQuery 22β 2295.06ms β2227.17ms βno change β > β QQuery 23β 8437.28ms β8619.52ms βno change β > β QQuery 24β 487.41ms β 475.78ms βno change β > β QQuery 25β 403.06ms β 401.45ms βno change β > β QQuery 26β 537.58ms β 540.40ms β
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2887720153 π€: Benchmark completed Details ``` Comparing HEAD and intermeidate-result-blocked-approach Benchmark clickbench_extended.json ββββ³β³βββ³βββ β Queryβ HEAD β intermeidate-result-blocked-approach β Change β β‘ββββββββββ© β QQuery 0 β 1952.39ms β2165.55ms β 1.11x slower β β QQuery 1 β 725.35ms β 776.75ms β 1.07x slower β β QQuery 2 β 1474.77ms β1525.72ms βno change β β QQuery 3 β 720.28ms β 733.30ms βno change β β QQuery 4 β 1491.93ms β1549.46ms βno change β β QQuery 5 β 15503.37ms β 15748.81ms βno change β β QQuery 6 β 2118.87ms β2106.19ms βno change β β QQuery 7 β 2215.33ms β2189.80ms βno change β ββββ΄β΄βββ΄βββ βββ³β β Benchmark Summary ββ β‘βββ© β Total Time (HEAD) β 26202.29ms β β Total Time (intermeidate-result-blocked-approach) β 26795.58ms β β Average Time (HEAD) β 3275.29ms β β Average Time (intermeidate-result-blocked-approach) β 3349.45ms β β Queries Faster β 0 β β Queries Slower β 2 β β Queries with No Change β 6 β βββ΄β Benchmark clickbench_partitioned.json ββββ³β³βββ³βββ β Queryβ HEAD β intermeidate-result-blocked-approach β Change β β‘ββββββββββ© β QQuery 0 β 2.42ms β 2.74ms β 1.13x slower β β QQuery 1 β36.72ms β 39.03ms β 1.06x slower β β QQuery 2 β91.22ms β 95.52ms βno change β β QQuery 3 β 101.22ms β 101.34ms βno change β β QQuery 4 β 638.98ms β 639.74ms βno change β β QQuery 5 β 906.48ms β 875.22ms βno change β β QQuery 6 β 2.31ms β 2.46ms β 1.06x slower β β QQuery 7 β43.69ms β 43.48ms βno change β β QQuery 8 β 960.12ms β 947.47ms βno change β β QQuery 9 β 1290.26ms β1263.99ms βno change β β QQuery 10β 273.55ms β 273.89ms βno change β β QQuery 11β 306.30ms β 319.30ms βno change β β QQuery 12β 969.05ms β 954.35ms βno change β β QQuery 13β 1311.18ms β1265.49ms βno change β β QQuery 14β 891.85ms β 867.59ms βno change β β QQuery 15β 860.97ms β 883.83ms βno change β β QQuery 16β 1764.37ms β1767.56ms βno change β β QQuery 17β 1653.48ms β1634.83ms βno change β β QQuery 18β 3151.14ms β3177.39ms βno change β β QQuery 19β86.57ms β 83.49ms βno change β β QQuery 20β 1164.58ms β1161.57ms βno change β β QQuery 21β 1380.48ms β1337.39ms βno change β β QQuery 22β 2295.06ms β2227.17ms βno change β β QQuery 23β 8437.28ms β8619.52ms βno change β β QQuery 24β 487.41ms β 475.78ms βno change β β QQuery 25β 403.06ms β 401.45ms βno change β β QQuery 26β 537.58ms β 540.40ms βno change β β QQuery 27β 1608.99ms β1597.83ms βno change β β QQuery 28β
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2887661504 π€ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.11.0-1013-gcp #13~24.04.1-Ubuntu SMP Wed Apr 2 16:34:16 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing intermeidate-result-blocked-approach (7869779cc59494baa88fb94b096c995eabd87535) to 07fe23f03dcb53b2715d3f3b3e0c476d54202c00 [diff](https://github.com/apache/datafusion/compare/07fe23f03dcb53b2715d3f3b3e0c476d54202c00..7869779cc59494baa88fb94b096c995eabd87535) Benchmarks: tpch_mem clickbench_partitioned clickbench_extended Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2887592362 Thank you so much for this PR btw @Rachelint -- it is really really nice. I can't wait to see how the performance looks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2093618262
##
datafusion/physical-plan/src/aggregates/row_hash.rs:
##
@@ -339,6 +344,35 @@ impl SkipAggregationProbe {
/// β 2 β 2 β 3.0 ββ 2 β 2 β 3.0 β ββ
/// ββββββ
/// ```
+///
+/// # Blocked approach for intermediate results
+///
+/// An important optimization for [`group_values`] and [`accumulators`]
+/// is to manage such intermediate results using the blocked approach.
+///
+/// In the original method, intermediate results are managed within a single
large block
Review Comment:
Is it possible (not in this PR) to eventually remove the original, single
block approach?
##
datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs:
##
@@ -205,20 +284,462 @@ where
Some(_) => self.null_group.take(),
None => None,
};
-let mut split = self.values.split_off(n);
-std::mem::swap(&mut self.values, &mut split);
+
+let single_block = self.values.last_mut().unwrap();
+let mut split = single_block.split_off(n);
+mem::swap(single_block, &mut split);
build_primitive(split, null_group)
}
+
+// ===
+// Emitting in blocked mode
+// ===
+EmitTo::NextBlock => {
+let (total_num_groups, block_size) = if !self.is_emitting() {
+// Similar as `EmitTo:All`, we will clear the old index
infos(like `map`)
Review Comment:
is this still a todo item?
##
datafusion/physical-plan/src/aggregates/row_hash.rs:
##
@@ -639,6 +686,53 @@ pub(crate) fn create_group_accumulator(
}
}
+/// Check if we can enable the blocked optimization for `GroupValues` and
`GroupsAccumulator`s.
+/// The blocked optimization will be enabled when:
+/// - When `enable_aggregation_blocked_groups` is true(default to true)
+/// - It is not streaming aggregation(because blocked mode can't support
Emit::first(exact n))
+/// - The spilling is disabled(still need to consider more to support it
efficiently)
+/// - The accumulator is not empty(I am still not sure about logic in this
case)
+/// - [`GroupValues::supports_blocked_groups`] and all
[`GroupsAccumulator::supports_blocked_groups`] are true
+///
+/// [`GroupValues::supports_blocked_groups`]:
crate::aggregates::group_values::GroupValues::supports_blocked_groups
+/// [`GroupsAccumulator::supports_blocked_groups`]:
datafusion_expr::GroupsAccumulator::supports_blocked_groups
+///
+// TODO: support blocked optimization in streaming, spilling, and maybe empty
accumulators case?
Review Comment:
yes, I think the goal should be to support blocked optimizations in all
these cases (so we can remove the existing code)
I can file tickets to help organize this work (obviously you don't have to
do so)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2887367750 > @alamb @Dandandan Hi, this pr is ready again now. I will do so as soon as possible (hopefully later today but probably more like Sunday morning as I am away Saturday / tomorrow morning) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2877460331 Plan to sort out codes and make it ready again today. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2863608739 BTW, I also simplified codes although not help to performance. like `NullState`, I found we actually don't need to introduce blocked approach for it(even will lead to slight regression if we do so). I remove the related codes to make it nearly unchanged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2863584067 The current benchmark results in my 16cores local: - target_partitions = 8 (amazing 1.71x faster) ``` // main Query 7 iteration 0 took 4256.6 ms and returned 10 rows Query 7 iteration 1 took 4265.5 ms and returned 10 rows Query 7 iteration 2 took 3822.5 ms and returned 10 rows Query 7 iteration 3 took 4136.2 ms and returned 10 rows Query 7 iteration 4 took 3851.7 ms and returned 10 rows Query 7 avg time: 4066.49 ms // blocked Query 7 iteration 0 took 2278.9 ms and returned 10 rows Query 7 iteration 1 took 2123.4 ms and returned 10 rows Query 7 iteration 2 took 2400.0 ms and returned 10 rows Query 7 iteration 3 took 2315.6 ms and returned 10 rows Query 7 iteration 4 took 2716.3 ms and returned 10 rows Query 7 avg time: 2366.87 ms ``` - target_partitions = 16(also 1.35x faster) ``` // main Query 7 iteration 0 took 1955.1 ms and returned 10 rows Query 7 iteration 1 took 2002.4 ms and returned 10 rows Query 7 iteration 2 took 1762.5 ms and returned 10 rows Query 7 iteration 3 took 1805.2 ms and returned 10 rows Query 7 iteration 4 took 1758.7 ms and returned 10 rows Query 7 avg time: 1856.78 ms // blocked Query 7 iteration 0 took 1392.8 ms and returned 10 rows Query 7 iteration 1 took 1357.3 ms and returned 10 rows Query 7 iteration 2 took 1378.4 ms and returned 10 rows Query 7 iteration 3 took 1376.6 ms and returned 10 rows Query 7 iteration 4 took 1367.1 ms and returned 10 rows Query 7 avg time: 1374.47 ms ``` - target_partitions = 32(still 1.10x faster) ``` // main Query 7 iteration 0 took 1471.3 ms and returned 10 rows Query 7 iteration 1 took 1350.5 ms and returned 10 rows Query 7 iteration 2 took 1361.4 ms and returned 10 rows Query 7 iteration 3 took 1393.6 ms and returned 10 rows Query 7 iteration 4 took 1427.4 ms and returned 10 rows Query 7 avg time: 1400.82 ms // blocked Query 7 iteration 0 took 1289.6 ms and returned 10 rows Query 7 iteration 1 took 1281.9 ms and returned 10 rows Query 7 iteration 2 took 1293.4 ms and returned 10 rows Query 7 iteration 3 took 1258.8 ms and returned 10 rows Query 7 iteration 4 took 1279.4 ms and returned 10 rows Query 7 avg time: 1280.62 ms ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2854413639 Marking as draft as @Rachelint works on the next set of things -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2854306764 > π€: Benchmark completed @alamb I think the faster queries here mainly led by the hash reusing optimization. I have submitted a new pr here: https://github.com/apache/datafusion/pull/15962 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2854295196 π€: Benchmark completed Details ``` Comparing HEAD and intermeidate-result-blocked-approach Benchmark clickbench_extended.json ββββ³β³βββ³ββββ β Queryβ HEAD β intermeidate-result-blocked-approach β Change β β‘βββββββββββ© β QQuery 0 β 1940.88ms β1976.66ms β no change β β QQuery 1 β 760.85ms β 766.84ms β no change β β QQuery 2 β 1501.65ms β1542.77ms β no change β β QQuery 3 β 719.82ms β 727.87ms β no change β β QQuery 4 β 1484.50ms β1494.48ms β no change β β QQuery 5 β 15426.35ms β 15206.88ms β no change β β QQuery 6 β 2069.76ms β2128.21ms β no change β β QQuery 7 β 2708.23ms β2071.61ms β +1.31x faster β ββββ΄β΄βββ΄ββββ βββ³β β Benchmark Summary ββ β‘βββ© β Total Time (HEAD) β 26612.04ms β β Total Time (intermeidate-result-blocked-approach) β 25915.33ms β β Average Time (HEAD) β 3326.50ms β β Average Time (intermeidate-result-blocked-approach) β 3239.42ms β β Queries Faster β 1 β β Queries Slower β 0 β β Queries with No Change β 7 β βββ΄β Benchmark clickbench_partitioned.json ββββ³β³βββ³ββββ β Queryβ HEAD β intermeidate-result-blocked-approach β Change β β‘βββββββββββ© β QQuery 0 β 2.25ms β 2.20ms β no change β β QQuery 1 β36.40ms β 39.03ms β 1.07x slower β β QQuery 2 β90.29ms β 91.70ms β no change β β QQuery 3 β99.54ms β 102.47ms β no change β β QQuery 4 β 803.59ms β 611.45ms β +1.31x faster β β QQuery 5 β 874.76ms β 860.26ms β no change β β QQuery 6 β 2.29ms β 2.29ms β no change β β QQuery 7 β43.79ms β 45.26ms β no change β β QQuery 8 β 902.84ms β 913.05ms β no change β β QQuery 9 β 1166.55ms β1218.38ms β no change β β QQuery 10β 265.93ms β 276.09ms β no change β β QQuery 11β 307.22ms β 315.94ms β no change β β QQuery 12β 929.85ms β 927.87ms β no change β β QQuery 13β 1362.99ms β1364.08ms β no change β β QQuery 14β 831.28ms β 855.27ms β no change β β QQuery 15β 1027.92ms β 841.50ms β +1.22x faster β β QQuery 16β 1764.73ms β1738.44ms β no change β β QQuery 17β 1595.54ms β1615.47ms β no change β β QQuery 18β 3163.56ms β3090.69ms β no change β β QQuery 19β83.33ms β 84.19ms β no change β β QQuery 20β 1148.16ms β1163.96ms β no change β β QQuery 21β 1335.22ms β1362.75ms β no change β β QQuery 22β 2173.36ms β2251.51ms β no change β β QQuery 23β 8305.24ms β8570.07ms β no change β β QQuery 24β 481.58ms β 478.64ms β no change β β QQuery 25β 400.16ms β 404.96ms β no change β β QQuery 26β 542.87ms β 547.19ms β no change β β QQuery 27β 1569.96ms β1610.
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2854237216 > FWIW the machine I am running this benchmark on has 16 not very good cores OK, it may be really related to `target_partitions`. I also found only slight improvement in my local when `target_partitions = 16` (default to cpu num). The `target_partitions` is large, so `intermediate results Vec` in `partition` will be small, so `blocked approach` make few difference. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2854200337 π€ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.11.0-1013-gcp #13~24.04.1-Ubuntu SMP Wed Apr 2 16:34:16 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing intermeidate-result-blocked-approach (62157a9ab896014c17927c7ae97d2177d20e9ec6) to 6cc49533f9c1c60c1737b7139d867b6544caa993 [diff](https://github.com/apache/datafusion/compare/6cc49533f9c1c60c1737b7139d867b6544caa993..62157a9ab896014c17927c7ae97d2177d20e9ec6) Benchmarks: tpch_mem clickbench_partitioned clickbench_extended Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2854199522 > > π€: Benchmark completed > > Details > > It is really suprised that it show slower? FWIW the machine I am running this benchmark on has 16 not very good cores: ``` cat /proc/cpuinfo ... processor: 15 vendor_id: GenuineIntel cpu family : 6 model: 85 model name : Intel(R) Xeon(R) CPU @ 3.10GHz stepping : 7 microcode: 0x cpu MHz : 3100.320 cache size : 25344 KB physical id : 0 siblings : 16 core id : 7 cpu cores: 8 apicid : 15 initial apicid : 15 fpu : yes fpu_exception: yes cpuid level : 13 wp : yes flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ss ht syscall nx pdpe1gb rdtscp lm constant_tsc rep_good nopl xtopology nonstop_tsc cpuid tsc_known_freq pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm abm 3dnowprefetch ssbd ibrs ibpb stibp ibrs_enhanced fsgsbase tsc_adjust bmi1 hle avx2 smep bmi2 erms invpcid rtm mpx avx512f avx512dq rdseed adx smap clflushopt clwb avx512cd avx512bw avx512vl xsaveopt xsavec xgetbv1 xsaves arat avx512_vnni md_clear arch_capabilities bugs : spectre_v1 spectre_v2 spec_store_bypass swapgs taa mmio_stale_data retbleed eibrs_pbrsb bhi bogomips : 6200.64 clflush size : 64 cache_alignment : 64 address sizes: 46 bits physical, 48 bits virtual power management: ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Dandandan commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2075240973
##
datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs:
##
@@ -81,28 +85,52 @@ hash_float!(f16, f32, f64);
pub struct GroupValuesPrimitive {
/// The data type of the output array
data_type: DataType,
+
/// Stores the group index based on the hash of its value
///
/// We don't store the hashes as hashing fixed width primitives
/// is fast enough for this not to benefit performance
-map: HashTable,
+map: HashTable<(u64, u64)>,
+
/// The group index of the null value if any
-null_group: Option,
+null_group: Option,
+
/// The values for each group index
-values: Vec,
+values: Vec>,
Review Comment:
Could we use
```suggestion
values: Vec>,
```
here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2075218539
##
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs:
##
@@ -93,20 +94,27 @@ where
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
Review Comment:
> Ah I see that batch size is used by default :). Yeah I think it makes
sense to test it a bit further, maybe for this a slightly larger value (e.g.
2x, 4x batch size) will be beneficial when the cardinality is above the batch
size.
>
> Also at some point might make sense to think of it in size in memory
instead of number of elements (e.g. block of `u8` values might hold 16x more
values than u128).
It is really a good idea to split block by size rather than num rows, I will
experiment on it after trying some exist ideas later.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2075198294
##
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs:
##
@@ -93,20 +94,27 @@ where
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
Review Comment:
I found the hash compute for primitive is actually non-trivial...
And in high cardinality case, the duplicated hash compute led by rehash is
really expansive!
I am experimenting about also save hash in hashtable like multi group by.
Actually I have found improvement in the new added query in `extended.sql`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Dandandan commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2075197407
##
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs:
##
@@ -93,20 +94,27 @@ where
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
Review Comment:
Ah I see that batch size is used by default :).
Yeah I think it makes sense to test it a bit further, maybe for this a
slightly larger value (e.g. 2x, 4x batch size) will be beneficial when the
cardinality is above the batch size.
Also at some point might make sense to think of it in size in memory instead
of number of elements (e.g. block of `u8` values might hold 16x more values
than u128).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2075191981
##
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs:
##
@@ -93,20 +94,27 @@ where
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
Review Comment:
But it make sense, we make `block_size` a dedicated config.
I think it can get improvement in case that `batch_size` is set to a too
samll value, and the group by cardinality is actually large.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2075154369
##
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs:
##
@@ -93,20 +94,27 @@ where
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
Review Comment:
> yes, i mean block size. I would expect something like batchsize (4-8k), or
maybe even bigger to have lower overhead? Did you run some experiments?
Yes, I try it.
Now I set `block_size = batch_size`.
I try the smaller `batch_size` like 1024, and `this pr` show improvement
compared to `main`.
It is due to `this pr` can also eliminating the call of `Array::slice`,
which is non-trivial due to the computation of null count. Detail can see:
https://github.com/apache/arrow-rs/pull/6155
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2075154369
##
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs:
##
@@ -93,20 +94,27 @@ where
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
Review Comment:
> yes, i mean block size. I would expect something like batchsize (4-8k), or
maybe even bigger to have lower overhead? Did you run some experiments?
Yes, I try it.
Now I set `block_size = batch_size`.
I try the smaller `batch_size` like 1024, and `this pr` show improvement
compared to `main`.
It is due to `this pr` can also eliminating the call of `Array::slice`,
which is non-trivial due to the computation of null count.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Dandandan commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2075117009
##
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs:
##
@@ -93,20 +94,27 @@ where
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
Review Comment:
yes, i mean block size. I would expect something like batchsize (4-8k), or
maybe even bigger to have lower overhead? Did you run some experiments?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2075095403
##
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs:
##
@@ -93,20 +94,27 @@ where
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
Review Comment:
> this seems rather small? won't larger values have lower overhead?
Does it mean block size?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Dandandan commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2075086531
##
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs:
##
@@ -93,20 +94,27 @@ where
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
Review Comment:
this seems rather small? won't larger values have lower overhead?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2853367261
@jayzhan211 @alamb I guess I nearly understand why
It is related to `target_partitions`, we maintain the `intermediate results`
respectively in each `partition`.
So when `target_partitions` is larger, amount of `intermediate results` in
one `partition` is smaller.
And when `intermediate results` assigned to `partition` is small enough, the
blocked approach will almost make no difference (assume the extreme case, the
`Vec` used to store results never resize).
Here is my benchmark result in a production machine (x86_64 + 16core +
3699.178CPU MHz + 64G RAM) with different `target_partitions`
- target_partitions = 4 (amazing improvement!)
```
// blocked approach
Q7: SELECT "WatchID", MIN("ResolutionWidth") as wmin, MAX("ResolutionWidth")
as wmax, SUM("IsRefresh") as srefresh FROM hits GROUP BY "WatchID" ORDER BY
"WatchID" DESC LIMIT 10;
Query 7 iteration 0 took 4996.7 ms and returned 10 rows
Query 7 iteration 1 took 5177.6 ms and returned 10 rows
Query 7 iteration 2 took 5335.4 ms and returned 10 rows
Query 7 iteration 3 took 5232.7 ms and returned 10 rows
Query 7 iteration 4 took 5350.5 ms and returned 10 rows
Query 7 avg time: 5218.59 ms
// main
Q7: SELECT "WatchID", MIN("ResolutionWidth") as wmin, MAX("ResolutionWidth")
as wmax, SUM("IsRefresh") as srefresh FROM hits GROUP BY "WatchID" ORDER BY
"WatchID" DESC LIMIT 10;
Query 7 iteration 0 took 8484.9 ms and returned 10 rows
Query 7 iteration 1 took 8194.4 ms and returned 10 rows
Query 7 iteration 2 took 8317.9 ms and returned 10 rows
Query 7 iteration 3 took 8183.2 ms and returned 10 rows
Query 7 iteration 4 took 8225.7 ms and returned 10 rows
Query 7 avg time: 8281.19 ms
```
- target_partitions = 8 (emmm... some improvement)
```
// blocked approach
Q7: SELECT "WatchID", MIN("ResolutionWidth") as wmin, MAX("ResolutionWidth")
as wmax, SUM("IsRefresh") as srefresh FROM hits GROUP BY "WatchID" ORDER BY
"WatchID" DESC LIMIT 10;
Query 7 iteration 0 took 2599.2 ms and returned 10 rows
Query 7 iteration 1 took 2857.9 ms and returned 10 rows
Query 7 iteration 2 took 3046.3 ms and returned 10 rows
Query 7 iteration 3 took 2766.0 ms and returned 10 rows
Query 7 iteration 4 took 2830.8 ms and returned 10 rows
Query 7 avg time: 2820.04 ms
// main
Q7: SELECT "WatchID", MIN("ResolutionWidth") as wmin, MAX("ResolutionWidth")
as wmax, SUM("IsRefresh") as srefresh FROM hits GROUP BY "WatchID" ORDER BY
"WatchID" DESC LIMIT 10;
Query 7 iteration 0 took 3820.0 ms and returned 10 rows
Query 7 iteration 1 took 3716.6 ms and returned 10 rows
Query 7 iteration 2 took 3728.3 ms and returned 10 rows
Query 7 iteration 3 took 3628.2 ms and returned 10 rows
Query 7 iteration 4 took 3912.6 ms and returned 10 rows
Query 7 avg time: 3761.15 ms
```
- target_partitions = 32 (sadly, almost no improvement...)
```
// blocked approach
Q7: SELECT "WatchID", MIN("ResolutionWidth") as wmin, MAX("ResolutionWidth")
as wmax, SUM("IsRefresh") as srefresh FROM hits GROUP BY "WatchID" ORDER BY
"WatchID" DESC LIMIT 10;
Query 7 iteration 0 took 1383.7 ms and returned 10 rows
Query 7 iteration 1 took 1274.3 ms and returned 10 rows
Query 7 iteration 2 took 1321.7 ms and returned 10 rows
Query 7 iteration 3 took 1308.1 ms and returned 10 rows
Query 7 iteration 4 took 1310.6 ms and returned 10 rows
Query 7 avg time: 1319.68 ms
// main
Q7: SELECT "WatchID", MIN("ResolutionWidth") as wmin, MAX("ResolutionWidth")
as wmax, SUM("IsRefresh") as srefresh FROM hits GROUP BY "WatchID" ORDER BY
"WatchID" DESC LIMIT 10;
Query 7 iteration 0 took 1440.7 ms and returned 10 rows
Query 7 iteration 1 took 1430.3 ms and returned 10 rows
Query 7 iteration 2 took 1357.5 ms and returned 10 rows
Query 7 iteration 3 took 1352.6 ms and returned 10 rows
Query 7 iteration 4 took 1342.2 ms and returned 10 rows
Query 7 avg time: 1384.64 ms
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2853164420 I suspect if it is due to the row-level random access of `VecDeque`? I have replace `VecDeque` with `Vec`. And I am trying to run the benchmark in more enviorments rather than only my dev machine (x86 + centos + 6core). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2852844732 > > It is really suprised that it show slower? > > This is my only concern too. Yes, I think we should find out the reason, it is really suprised and it actually show reproducable improvement in my local -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
jayzhan211 commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2852827157 > It is really suprised that it show slower? This is my only concern too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2852744312 > π€: Benchmark completed > Details It is really suprised that it show slower? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2852381080 π€: Benchmark completed Details ``` Comparing HEAD and intermeidate-result-blocked-approach Benchmark clickbench_extended.json ββββ³β³βββ³ββββ β Queryβ HEAD β intermeidate-result-blocked-approach β Change β β‘βββββββββββ© β QQuery 0 β 1941.28ms β2043.89ms β 1.05x slower β β QQuery 1 β 772.84ms β 712.97ms β +1.08x faster β β QQuery 2 β 1522.71ms β1443.21ms β +1.06x faster β β QQuery 3 β 719.60ms β 764.04ms β 1.06x slower β β QQuery 4 β 1468.51ms β1528.06ms β no change β β QQuery 5 β 15673.72ms β 15342.97ms β no change β β QQuery 6 β 2080.95ms β2074.26ms β no change β β QQuery 7 β 2533.15ms β3035.39ms β 1.20x slower β ββββ΄β΄βββ΄ββββ βββ³β β Benchmark Summary ββ β‘βββ© β Total Time (HEAD) β 26712.76ms β β Total Time (intermeidate-result-blocked-approach) β 26944.80ms β β Average Time (HEAD) β 3339.09ms β β Average Time (intermeidate-result-blocked-approach) β 3368.10ms β β Queries Faster β 2 β β Queries Slower β 3 β β Queries with No Change β 3 β βββ΄β Benchmark clickbench_partitioned.json ββββ³β³βββ³βββ β Queryβ HEAD β intermeidate-result-blocked-approach β Change β β‘ββββββββββ© β QQuery 0 β 2.40ms β 2.62ms β 1.09x slower β β QQuery 1 β36.42ms β 39.04ms β 1.07x slower β β QQuery 2 β89.04ms β 93.59ms β 1.05x slower β β QQuery 3 β98.34ms β 97.56ms βno change β β QQuery 4 β 798.63ms β 793.86ms βno change β β QQuery 5 β 857.58ms β 838.99ms βno change β β QQuery 6 β 2.29ms β 2.20ms βno change β β QQuery 7 β43.03ms β 43.70ms βno change β β QQuery 8 β 926.38ms β 940.06ms βno change β β QQuery 9 β 1200.58ms β1264.19ms β 1.05x slower β β QQuery 10β 266.44ms β 271.84ms βno change β β QQuery 11β 312.69ms β 308.31ms βno change β β QQuery 12β 921.14ms β 915.99ms βno change β β QQuery 13β 1246.19ms β1360.62ms β 1.09x slower β β QQuery 14β 861.03ms β 849.35ms βno change β β QQuery 15β 1031.14ms β1065.46ms βno change β β QQuery 16β 1724.44ms β1734.01ms βno change β β QQuery 17β 1600.58ms β1619.84ms βno change β β QQuery 18β 3083.17ms β3112.74ms βno change β β QQuery 19β86.39ms β 84.08ms βno change β β QQuery 20β 1152.51ms β1142.67ms βno change β β QQuery 21β 1340.71ms β1335.70ms βno change β β QQuery 22β 2206.78ms β2189.52ms βno change β β QQuery 23β 8437.51ms β8423.58ms βno change β β QQuery 24β 482.26ms β 471.91ms βno change β β QQuery 25β 389.71ms β 384.09ms βno change β β QQuery 26β 539.21ms β 533.60ms βno change β β QQuery 27β 1582.07ms β1608.14ms βno change β β QQ
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2074192086
##
datafusion/core/tests/fuzz_cases/aggregation_fuzzer/context_generator.rs:
##
@@ -146,11 +147,14 @@ impl SessionContextGenerator {
(provider, false)
};
+let enable_aggregation_blocked_groups = rng.gen_bool(0.5);
Review Comment:
I wonder if there is any value to testing the old code path
(`enable_aggregation_blocked_groups = false`) if our goal is to remove it
eventually.
I recommend only testing with the flag set to the default value
##
datafusion/expr-common/src/groups_accumulator.rs:
##
@@ -39,6 +43,9 @@ impl EmitTo {
/// remaining values in `v`.
///
/// This avoids copying if Self::All
+///
+/// NOTICE: only support emit strategies: `Self::All` and `Self::First`
Review Comment:
```suggestion
/// NOTICE: only support emit strategies: `Self::All` and `Self::First`
/// Will call `panic` if called with `Self::NextBlock`
```
##
datafusion/physical-plan/src/aggregates/row_hash.rs:
##
@@ -982,6 +1099,9 @@ impl GroupedHashAggregateStream {
&& self.update_memory_reservation().is_err()
{
assert_ne!(self.mode, AggregateMode::Partial);
+// TODO: support spilling when blocked group optimization is on
Review Comment:
I think spilling with blocks is actually likely to be much better performing
anyways (as we can spill each block potentially in parallel, for example)
##
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs:
##
@@ -212,25 +234,261 @@ impl NullState {
///
/// resets the internal state appropriately
pub fn build(&mut self, emit_to: EmitTo) -> NullBuffer {
-let nulls: BooleanBuffer = self.seen_values.finish();
+self.seen_values.emit(emit_to)
+}
+}
-let nulls = match emit_to {
-EmitTo::All => nulls,
-EmitTo::First(n) => {
-// split off the first N values in seen_values
-//
-// TODO make this more efficient rather than two
-// copies and bitwise manipulation
-let first_n_null: BooleanBuffer =
nulls.iter().take(n).collect();
-// reset the existing seen buffer
-for seen in nulls.iter().skip(n) {
-self.seen_values.append(seen);
+/// Adapter for supporting dynamic dispatching of [`FlatNullState`] and
[`BlockedNullState`].
+/// For performance, the cost of batch-level dynamic dispatching is acceptable.
+#[derive(Debug)]
+pub enum NullStateAdapter {
+Flat(FlatNullState),
+Blocked(BlockedNullState),
+}
+
+impl NullStateAdapter {
+pub fn new(block_size: Option) -> Self {
+if let Some(blk_size) = block_size {
+Self::Blocked(BlockedNullState::new(blk_size))
+} else {
+Self::Flat(FlatNullState::new())
+}
+}
+
+pub fn accumulate(
+&mut self,
+group_indices: &[usize],
+values: &PrimitiveArray,
+opt_filter: Option<&BooleanArray>,
+total_num_groups: usize,
+value_fn: F,
+) where
+T: ArrowPrimitiveType + Send,
+F: FnMut(u32, u64, T::Native) + Send,
+{
+match self {
+NullStateAdapter::Flat(null_state) => null_state.accumulate(
+group_indices,
+values,
+opt_filter,
+total_num_groups,
+value_fn,
+),
+NullStateAdapter::Blocked(null_state) => null_state.accumulate(
+group_indices,
+values,
+opt_filter,
+total_num_groups,
+value_fn,
+),
+}
+}
+
+pub fn accumulate_boolean(
+&mut self,
+group_indices: &[usize],
+values: &BooleanArray,
+opt_filter: Option<&BooleanArray>,
+total_num_groups: usize,
+value_fn: F,
+) where
+F: FnMut(u32, u64, bool) + Send,
+{
+match self {
+NullStateAdapter::Flat(null_state) =>
null_state.accumulate_boolean(
+group_indices,
+values,
+opt_filter,
+total_num_groups,
+value_fn,
+),
+NullStateAdapter::Blocked(null_state) =>
null_state.accumulate_boolean(
+group_indices,
+values,
+opt_filter,
+total_num_groups,
+value_fn,
+),
+}
+}
+
+pub fn build(&mut self, emit_to: EmitTo) -> NullBuffer {
+match self {
+NullStateAdapter::Flat(null_state) => null_state.build(emit_to),
+NullStateAdapter::Blocked(null_state) => null_state.build(emit_to),
+}
+}
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-285229 π€ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.11.0-1013-gcp #13~24.04.1-Ubuntu SMP Wed Apr 2 16:34:16 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing intermeidate-result-blocked-approach (7542b49a2add7acae700de6e88a7cf8227cc6735) to 6cc49533f9c1c60c1737b7139d867b6544caa993 [diff](https://github.com/apache/datafusion/compare/6cc49533f9c1c60c1737b7139d867b6544caa993..7542b49a2add7acae700de6e88a7cf8227cc6735) Benchmarks: tpch_mem clickbench_partitioned clickbench_extended Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2850575408 Thanks @Rachelint -- I will give this PR another good look today -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2072992809
##
datafusion/physical-plan/src/aggregates/row_hash.rs:
##
@@ -982,6 +1099,9 @@ impl GroupedHashAggregateStream {
&& self.update_memory_reservation().is_err()
{
assert_ne!(self.mode, AggregateMode::Partial);
+// TODO: support spilling when blocked group optimization is on
Review Comment:
Yes, I will file it after this merged.
Actually there are also some other problems in spilling, and I will fix it
together when working on supporting blocked management in spilling path.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2072992809
##
datafusion/physical-plan/src/aggregates/row_hash.rs:
##
@@ -982,6 +1099,9 @@ impl GroupedHashAggregateStream {
&& self.update_memory_reservation().is_err()
{
assert_ne!(self.mode, AggregateMode::Partial);
+// TODO: support spilling when blocked group optimization is on
Review Comment:
Yes, I will file it.
Actually there are also some other problems in spilling, and I will fix it
together when working on supporting blocked management in spilling path.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2850176179 Hi @alamb , I have addressed most points mentioned in code review. And the new added sql in `extended.sql` I think can show the improvement has been merged. I checked it in my local, and it got improvment as expected (see Q7). ``` Benchmark clickbench_extended.json ββββ³β³βββ³ββββ β Queryβ main β intermeidate-result-blocked-approach β Change β β‘βββββββββββ© β QQuery 0 β 1963.28ms β1947.08ms β no change β β QQuery 1 β 1180.94ms β1135.65ms β no change β β QQuery 2 β 2420.21ms β2372.42ms β no change β β QQuery 3 β 968.18ms β1091.04ms β 1.13x slower β β QQuery 4 β 2955.90ms β2975.36ms β no change β β QQuery 5 β 32623.33ms β 32108.79ms β no change β β QQuery 6 β 5201.07ms β5205.19ms β no change β β QQuery 7 β 7865.80ms β7237.53ms β +1.09x faster β ββββ΄β΄βββ΄ββββ βββ³β β Benchmark Summary ββ β‘βββ© β Total Time (main) β 55178.69ms β β Total Time (intermeidate-result-blocked-approach) β 54073.06ms β β Average Time (main) β 6897.34ms β β Average Time (intermeidate-result-blocked-approach) β 6759.13ms β β Queries Faster β 1 β β Queries Slower β 1 β β Queries with No Change β 6 β βββ΄β ``` > My big concern is that we have a plan to eventually avoid both single and multi blocked management And here is my thoughts about how we remove `single block management` code path eventually: - The hard point is that `streaming aggreagtion` need `Emit::First(n)`, and as I see we should not support `Emit::First(n)` (mainly performance reason, it is expansive and complex now, and even more expansive and complex in `blocked management`). - However, I think it may be possible to remove the usage of `Emit::First(n)`, I am considering how to make it. And if we made it, the `single block management` path can be removed after we implement `blocked management` for all exists `GroupsAccumulator` and `GroupValues`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2072895547
##
datafusion/functions-aggregate/src/correlation.rs:
##
@@ -448,6 +448,9 @@ impl GroupsAccumulator for CorrelationGroupsAccumulator {
let n = match emit_to {
EmitTo::All => self.count.len(),
EmitTo::First(n) => n,
+EmitTo::NextBlock => {
+unreachable!("this accumulator still not support blocked
groups")
Review Comment:
Addressed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2072859880
##
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs:
##
@@ -507,3 +509,157 @@ pub(crate) fn slice_and_maybe_filter(
Ok(sliced_arrays)
}
}
+
+// ===
+// Useful tools for group index
+// ===
+
+/// Operations about group index parsing
+///
+/// There are mainly 2 `group index` needing parsing: `flat` and `blocked`.
+///
+/// # Flat group index
+/// `flat group index` format is like:
+///
+/// ```text
+/// | block_offset(64bit) |
+/// ```
+///
+/// It is used in `flat GroupValues/GroupAccumulator`, only a single block
+/// exists, so its `block_id` is always 0, and use all 64 bits to store the
+/// `block offset`.
+///
+/// # Blocked group index
+/// `blocked group index` format is like:
+///
+/// ```text
+/// | block_id(32bit) | block_offset(32bit)
+/// ```
+///
+/// It is used in `blocked GroupValues/GroupAccumulator`, multiple blocks
+/// exist, and we use high 32 bits to store `block_id`, and low 32 bit to
+/// store `block_offset`.
+///
+/// The `get_block_offset` method requires to return `block_offset` as u64,
+/// that is for compatible for `flat group index`'s parsing.
+///
+pub trait GroupIndexOperations: Debug {
Review Comment:
Addressed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2072859602
##
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs:
##
@@ -507,3 +509,157 @@ pub(crate) fn slice_and_maybe_filter(
Ok(sliced_arrays)
}
}
+
+// ===
+// Useful tools for group index
+// ===
+
+/// Operations about group index parsing
+///
+/// There are mainly 2 `group index` needing parsing: `flat` and `blocked`.
+///
+/// # Flat group index
+/// `flat group index` format is like:
+///
+/// ```text
+/// | block_offset(64bit) |
+/// ```
+///
+/// It is used in `flat GroupValues/GroupAccumulator`, only a single block
+/// exists, so its `block_id` is always 0, and use all 64 bits to store the
+/// `block offset`.
+///
+/// # Blocked group index
+/// `blocked group index` format is like:
+///
+/// ```text
+/// | block_id(32bit) | block_offset(32bit)
+/// ```
+///
+/// It is used in `blocked GroupValues/GroupAccumulator`, multiple blocks
+/// exist, and we use high 32 bits to store `block_id`, and low 32 bit to
+/// store `block_offset`.
+///
+/// The `get_block_offset` method requires to return `block_offset` as u64,
+/// that is for compatible for `flat group index`'s parsing.
+///
+pub trait GroupIndexOperations: Debug {
+fn pack_index(block_id: u32, block_offset: u64) -> u64;
+
+fn get_block_id(packed_index: u64) -> u32;
+
+fn get_block_offset(packed_index: u64) -> u64;
+}
+
+#[derive(Debug)]
+pub struct BlockedGroupIndexOperations;
+
+impl GroupIndexOperations for BlockedGroupIndexOperations {
+fn pack_index(block_id: u32, block_offset: u64) -> u64 {
+((block_id as u64) << 32) | block_offset
+}
+
+fn get_block_id(packed_index: u64) -> u32 {
+(packed_index >> 32) as u32
+}
+
+fn get_block_offset(packed_index: u64) -> u64 {
+(packed_index as u32) as u64
+}
+}
+
+#[derive(Debug)]
+pub struct FlatGroupIndexOperations;
+
+impl GroupIndexOperations for FlatGroupIndexOperations {
+fn pack_index(_block_id: u32, block_offset: u64) -> u64 {
+block_offset
+}
+
+fn get_block_id(_packed_index: u64) -> u32 {
+0
+}
+
+fn get_block_offset(packed_index: u64) -> u64 {
+packed_index
+}
+}
+
+// ===
+// Useful tools for block
+// ===
+pub(crate) fn ensure_room_enough_for_blocks(
+blocks: &mut VecDeque,
+total_num_groups: usize,
+block_size: usize,
+new_block: F,
+default_value: B::T,
+) where
+B: Block,
+F: Fn(usize) -> B,
+{
+// For resize, we need to:
+// 1. Ensure the blks are enough first
+// 2. and then ensure slots in blks are enough
+let (mut cur_blk_idx, exist_slots) = if !blocks.is_empty() {
+let cur_blk_idx = blocks.len() - 1;
+let exist_slots = (blocks.len() - 1) * block_size +
blocks.back().unwrap().len();
+
+(cur_blk_idx, exist_slots)
+} else {
+(0, 0)
+};
+
+// No new groups, don't need to expand, just return
+if exist_slots >= total_num_groups {
+return;
+}
+
+// 1. Ensure blks are enough
+let exist_blks = blocks.len();
+let new_blks = total_num_groups.div_ceil(block_size) - exist_blks;
+if new_blks > 0 {
+for _ in 0..new_blks {
+let block = new_block(block_size);
+blocks.push_back(block);
+}
+}
+
+// 2. Ensure slots are enough
+let mut new_slots = total_num_groups - exist_slots;
+
+// 2.1 Only fill current blk if it may be already enough
+let cur_blk_rest_slots = block_size - blocks[cur_blk_idx].len();
+if cur_blk_rest_slots >= new_slots {
+blocks[cur_blk_idx].fill_default_value(new_slots,
default_value.clone());
+return;
+}
+
+// 2.2 Fill current blk to full
+blocks[cur_blk_idx].fill_default_value(cur_blk_rest_slots,
default_value.clone());
+new_slots -= cur_blk_rest_slots;
+
+// 2.3 Fill complete blks
+let complete_blks = new_slots / block_size;
+for _ in 0..complete_blks {
+cur_blk_idx += 1;
+blocks[cur_blk_idx].fill_default_value(block_size,
default_value.clone());
+}
+
+// 2.4 Fill last blk if needed
+let rest_slots = new_slots % block_size;
+if rest_slots > 0 {
+blocks
+.back_mut()
+.unwrap()
+.fill_default_value(rest_slots, default_value);
+}
+}
+
+pub(crate) trait Block {
Review Comment:
Addressed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2072859439
##
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs:
##
@@ -198,4 +232,28 @@ where
fn size(&self) -> usize {
self.values.capacity() * size_of::() +
self.null_state.size()
}
+
+fn supports_blocked_groups(&self) -> bool {
+true
+}
+
+fn alter_block_size(&mut self, block_size: Option) -> Result<()> {
+self.values.clear();
+self.null_state = NullStateAdapter::new(block_size);
+self.block_size = block_size;
+
+Ok(())
+}
+}
+
+impl Block for Vec {
Review Comment:
Addressed.
##
datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs:
##
@@ -81,28 +85,49 @@ hash_float!(f16, f32, f64);
pub struct GroupValuesPrimitive {
/// The data type of the output array
data_type: DataType,
+
/// Stores the group index based on the hash of its value
///
/// We don't store the hashes as hashing fixed width primitives
/// is fast enough for this not to benefit performance
-map: HashTable,
+map: HashTable,
+
/// The group index of the null value if any
-null_group: Option,
+null_group: Option,
+
/// The values for each group index
-values: Vec,
+values: VecDeque>,
+
/// The random state used to generate hashes
random_state: RandomState,
+
+/// Block size of current `GroupValues` if exist:
+/// - If `None`, it means block optimization is disabled,
+/// all `group values`` will be stored in a single `Vec`
+///
+/// - If `Some(blk_size)`, it means block optimization is enabled,
+/// `group values` will be stored in multiple `Vec`s, and each
+/// `Vec` if of `blk_size` len, and we call it a `block`
+///
+block_size: Option,
Review Comment:
Addressed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2072857296
##
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs:
##
@@ -43,8 +46,8 @@ where
T: ArrowPrimitiveType + Send,
F: Fn(&mut T::Native, T::Native) + Send + Sync,
{
-/// values per group, stored as the native type
-values: Vec,
+/// Values per group, stored as the native type
Review Comment:
Addressed.
I have defined `Blocks` in a dedicated moduel `blocks.rs`, and simplify all
related codes using it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2072857296
##
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs:
##
@@ -43,8 +46,8 @@ where
T: ArrowPrimitiveType + Send,
F: Fn(&mut T::Native, T::Native) + Send + Sync,
{
-/// values per group, stored as the native type
-values: Vec,
+/// Values per group, stored as the native type
Review Comment:
Addressed.
I have defined `Blocks` in a dedicated module `blocks.rs`, and simplify all
related codes using it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2072440427
##
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs:
##
@@ -212,7 +229,66 @@ impl NullState {
///
/// resets the internal state appropriately
pub fn build(&mut self, emit_to: EmitTo) -> NullBuffer {
-let nulls: BooleanBuffer = self.seen_values.finish();
+self.seen_values.emit(emit_to)
+}
+}
+
+/// Structure marking if accumulating groups are seen at least one
+pub trait SeenValues: Default + Debug + Send {
+fn resize(&mut self, total_num_groups: usize, default_value: bool);
+
+fn set_bit(&mut self, block_id: u32, block_offset: u64, value: bool);
+
+fn emit(&mut self, emit_to: EmitTo) -> NullBuffer;
+
+fn capacity(&self) -> usize;
+}
+
+/// [`SeenValues`] for `flat groups input`
+///
+/// The `flat groups input` are organized like:
+///
+/// ```text
+/// row_0 group_index_0
+/// row_1 group_index_1
+/// row_2 group_index_2
+/// ...
+/// row_n group_index_n
+/// ```
+///
+/// If `row_x group_index_x` is not filtered(`group_index_x` is seen)
+/// `seen_values[group_index_x]` will be set to `true`.
+///
+/// For `set_bit(block_id, block_offset, value)`, `block_id` is unused,
Review Comment:
Addressed.
I add related comments in `GroupsAccumulator::supports_blocked_groups` and
`GroupValues::supports_blocked_groups`, also link it in `HashAggregateStream`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2072440252
##
datafusion/functions-aggregate/src/average.rs:
##
@@ -667,8 +668,8 @@ where
partial_counts,
opt_filter,
total_num_groups,
-|group_index, partial_count| {
-self.counts[group_index] += partial_count;
+|_, group_index, partial_count| {
Review Comment:
Addressed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2072440207
##
datafusion/expr-common/src/groups_accumulator.rs:
##
@@ -17,29 +17,52 @@
//! Vectorized [`GroupsAccumulator`]
+use std::collections::VecDeque;
+
use arrow::array::{ArrayRef, BooleanArray};
-use datafusion_common::{not_impl_err, Result};
+use datafusion_common::{not_impl_err, DataFusionError, Result};
/// Describes how many rows should be emitted during grouping.
#[derive(Debug, Clone, Copy)]
pub enum EmitTo {
-/// Emit all groups
+/// Emit all groups, will clear all existing group indexes
All,
/// Emit only the first `n` groups and shift all existing group
/// indexes down by `n`.
///
/// For example, if `n=10`, group_index `0, 1, ... 9` are emitted
/// and group indexes `10, 11, 12, ...` become `0, 1, 2, ...`.
First(usize),
+/// Emit next block in the blocked managed groups
+///
+/// Similar as `Emit::All`, will also clear all existing group indexes
+NextBlock,
}
impl EmitTo {
+/// Remove and return `needed values` from `values`.
+pub fn take_needed(
Review Comment:
Addressed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2072425201
##
datafusion/common/src/config.rs:
##
@@ -405,6 +405,18 @@ config_namespace! {
/// in joins can reduce memory usage when joining large
/// tables with a highly-selective join filter, but is also slightly
slower.
pub enforce_batch_size_in_joins: bool, default = false
+
Review Comment:
Yes, it is just a way for `going back + testing`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2072417977
##
datafusion/expr-common/src/groups_accumulator.rs:
##
@@ -250,4 +288,30 @@ pub trait GroupsAccumulator: Send {
/// This function is called once per batch, so it should be `O(n)` to
/// compute, not `O(num_groups)`
fn size(&self) -> usize;
+
+/// Returns `true` if this accumulator supports blocked groups.
+fn supports_blocked_groups(&self) -> bool {
+false
+}
+
+/// Alter the block size in the accumulator
+///
+/// If the target block size is `None`, it will use a single big
+/// block(can think it a `Vec`) to manage the state.
+///
+/// If the target block size` is `Some(blk_size)`, it will try to
+/// set the block size to `blk_size`, and the try will only success
+/// when the accumulator has supported blocked mode.
+///
+/// NOTICE: After altering block size, all data in previous will be
cleared.
Review Comment:
Yes, it will be usually used in `load back + merge` step in spilling:
- Emit the rest blocks at first
- Clear all stale data, and switch to `flat mode` and perform `sorted
aggregation`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2072417977
##
datafusion/expr-common/src/groups_accumulator.rs:
##
@@ -250,4 +288,30 @@ pub trait GroupsAccumulator: Send {
/// This function is called once per batch, so it should be `O(n)` to
/// compute, not `O(num_groups)`
fn size(&self) -> usize;
+
+/// Returns `true` if this accumulator supports blocked groups.
+fn supports_blocked_groups(&self) -> bool {
+false
+}
+
+/// Alter the block size in the accumulator
+///
+/// If the target block size is `None`, it will use a single big
+/// block(can think it a `Vec`) to manage the state.
+///
+/// If the target block size` is `Some(blk_size)`, it will try to
+/// set the block size to `blk_size`, and the try will only success
+/// when the accumulator has supported blocked mode.
+///
+/// NOTICE: After altering block size, all data in previous will be
cleared.
Review Comment:
Yes, it will be usually used in `load back + merge` step in spilling:
- Emit the rest blocks at first
- Then switch to `flat mode` and perform sorted aggregation
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2072414793
##
benchmarks/queries/clickbench/extended.sql:
##
@@ -5,3 +5,4 @@ SELECT "SocialSourceNetworkID", "RegionID", COUNT(*),
AVG("Age"), AVG("ParamPric
SELECT "ClientIP", "WatchID", COUNT(*) c, MIN("ResponseStartTiming") tmin,
MEDIAN("ResponseStartTiming") tmed, MAX("ResponseStartTiming") tmax FROM hits
WHERE "JavaEnable" = 0 GROUP BY "ClientIP", "WatchID" HAVING c > 1 ORDER BY
tmed DESC LIMIT 10;
SELECT "ClientIP", "WatchID", COUNT(*) c, MIN("ResponseStartTiming") tmin,
APPROX_PERCENTILE_CONT("ResponseStartTiming", 0.95) tp95,
MAX("ResponseStartTiming") tmax FROM 'hits' WHERE "JavaEnable" = 0 GROUP BY
"ClientIP", "WatchID" HAVING c > 1 ORDER BY tp95 DESC LIMIT 10;
SELECT COUNT(*) AS ShareCount FROM hits WHERE "IsMobile" = 1 AND
"MobilePhoneModel" LIKE 'iPhone%' AND "SocialAction" = 'share' AND
"SocialSourceNetworkID" IN (5, 12) AND "ClientTimeZone" BETWEEN -5 AND 5 AND
regexp_match("Referer", '\/campaign\/(spring|summer)_promo') IS NOT NULL AND
CASE WHEN split_part(split_part("URL", 'resolution=', 2), '&', 1) ~ '^\d+$'
THEN split_part(split_part("URL", 'resolution=', 2), '&', 1)::INT ELSE 0 END >
1920 AND levenshtein(CAST("UTMSource" AS STRING), CAST("UTMCampaign" AS
STRING)) < 3;
+SELECT "WatchID", MIN("ResolutionWidth"), MAX("ResolutionWidth"),
SUM("IsRefresh") FROM hits GROUP BY "WatchID" ORDER BY "WatchID" DESC LIMIT 10;
Review Comment:
Addressed with #15936
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2072414793
##
benchmarks/queries/clickbench/extended.sql:
##
@@ -5,3 +5,4 @@ SELECT "SocialSourceNetworkID", "RegionID", COUNT(*),
AVG("Age"), AVG("ParamPric
SELECT "ClientIP", "WatchID", COUNT(*) c, MIN("ResponseStartTiming") tmin,
MEDIAN("ResponseStartTiming") tmed, MAX("ResponseStartTiming") tmax FROM hits
WHERE "JavaEnable" = 0 GROUP BY "ClientIP", "WatchID" HAVING c > 1 ORDER BY
tmed DESC LIMIT 10;
SELECT "ClientIP", "WatchID", COUNT(*) c, MIN("ResponseStartTiming") tmin,
APPROX_PERCENTILE_CONT("ResponseStartTiming", 0.95) tp95,
MAX("ResponseStartTiming") tmax FROM 'hits' WHERE "JavaEnable" = 0 GROUP BY
"ClientIP", "WatchID" HAVING c > 1 ORDER BY tp95 DESC LIMIT 10;
SELECT COUNT(*) AS ShareCount FROM hits WHERE "IsMobile" = 1 AND
"MobilePhoneModel" LIKE 'iPhone%' AND "SocialAction" = 'share' AND
"SocialSourceNetworkID" IN (5, 12) AND "ClientTimeZone" BETWEEN -5 AND 5 AND
regexp_match("Referer", '\/campaign\/(spring|summer)_promo') IS NOT NULL AND
CASE WHEN split_part(split_part("URL", 'resolution=', 2), '&', 1) ~ '^\d+$'
THEN split_part(split_part("URL", 'resolution=', 2), '&', 1)::INT ELSE 0 END >
1920 AND levenshtein(CAST("UTMSource" AS STRING), CAST("UTMCampaign" AS
STRING)) < 3;
+SELECT "WatchID", MIN("ResolutionWidth"), MAX("ResolutionWidth"),
SUM("IsRefresh") FROM hits GROUP BY "WatchID" ORDER BY "WatchID" DESC LIMIT 10;
Review Comment:
Fixed with #15936
##
benchmarks/queries/clickbench/extended.sql:
##
@@ -5,3 +5,4 @@ SELECT "SocialSourceNetworkID", "RegionID", COUNT(*),
AVG("Age"), AVG("ParamPric
SELECT "ClientIP", "WatchID", COUNT(*) c, MIN("ResponseStartTiming") tmin,
MEDIAN("ResponseStartTiming") tmed, MAX("ResponseStartTiming") tmax FROM hits
WHERE "JavaEnable" = 0 GROUP BY "ClientIP", "WatchID" HAVING c > 1 ORDER BY
tmed DESC LIMIT 10;
SELECT "ClientIP", "WatchID", COUNT(*) c, MIN("ResponseStartTiming") tmin,
APPROX_PERCENTILE_CONT("ResponseStartTiming", 0.95) tp95,
MAX("ResponseStartTiming") tmax FROM 'hits' WHERE "JavaEnable" = 0 GROUP BY
"ClientIP", "WatchID" HAVING c > 1 ORDER BY tp95 DESC LIMIT 10;
SELECT COUNT(*) AS ShareCount FROM hits WHERE "IsMobile" = 1 AND
"MobilePhoneModel" LIKE 'iPhone%' AND "SocialAction" = 'share' AND
"SocialSourceNetworkID" IN (5, 12) AND "ClientTimeZone" BETWEEN -5 AND 5 AND
regexp_match("Referer", '\/campaign\/(spring|summer)_promo') IS NOT NULL AND
CASE WHEN split_part(split_part("URL", 'resolution=', 2), '&', 1) ~ '^\d+$'
THEN split_part(split_part("URL", 'resolution=', 2), '&', 1)::INT ELSE 0 END >
1920 AND levenshtein(CAST("UTMSource" AS STRING), CAST("UTMCampaign" AS
STRING)) < 3;
+SELECT "WatchID", MIN("ResolutionWidth"), MAX("ResolutionWidth"),
SUM("IsRefresh") FROM hits GROUP BY "WatchID" ORDER BY "WatchID" DESC LIMIT 10;
Review Comment:
Address with #15936
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2848629234 @alamb I have submitted an pr about new added query for this pr #15936 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2848573594 π€: Benchmark completed Details ``` Comparing HEAD and intermeidate-result-blocked-approach Benchmark clickbench_extended.json ββββ³β³βββ³βββ β Queryβ HEAD β intermeidate-result-blocked-approach β Change β β‘ββββββββββ© β QQuery 0 β 1937.34ms β1929.54ms βno change β β QQuery 1 β 664.33ms β 719.97ms β 1.08x slower β β QQuery 2 β 1395.20ms β1411.13ms βno change β β QQuery 3 β 712.50ms β 710.80ms βno change β β QQuery 4 β 1477.28ms β1475.06ms βno change β β QQuery 5 β 15310.28ms β 15186.79ms βno change β β QQuery 6 β 2049.34ms β2066.82ms βno change β ββββ΄β΄βββ΄βββ βββ³β β Benchmark Summary ββ β‘βββ© β Total Time (HEAD) β 23546.29ms β β Total Time (intermeidate-result-blocked-approach) β 23500.10ms β β Average Time (HEAD) β 3363.76ms β β Average Time (intermeidate-result-blocked-approach) β 3357.16ms β β Queries Faster β 0 β β Queries Slower β 1 β β Queries with No Change β 6 β βββ΄β Benchmark clickbench_partitioned.json ββββ³β³βββ³ββββ β Queryβ HEAD β intermeidate-result-blocked-approach β Change β β‘βββββββββββ© β QQuery 0 β 2.53ms β 2.09ms β +1.21x faster β β QQuery 1 β35.62ms β 39.76ms β 1.12x slower β β QQuery 2 β89.48ms β 92.73ms β no change β β QQuery 3 β 100.30ms β 100.22ms β no change β β QQuery 4 β 750.96ms β 754.32ms β no change β β QQuery 5 β 833.28ms β 857.42ms β no change β β QQuery 6 β 2.06ms β 2.08ms β no change β β QQuery 7 β40.36ms β 42.52ms β 1.05x slower β β QQuery 8 β 924.79ms β 902.95ms β no change β β QQuery 9 β 1209.71ms β1224.03ms β no change β β QQuery 10β 264.71ms β 273.51ms β no change β β QQuery 11β 302.87ms β 313.03ms β no change β β QQuery 12β 917.10ms β 899.06ms β no change β β QQuery 13β 1338.05ms β1255.47ms β +1.07x faster β β QQuery 14β 833.92ms β 846.94ms β no change β β QQuery 15β 1030.69ms β1073.94ms β no change β β QQuery 16β 1724.70ms β1732.73ms β no change β β QQuery 17β 1605.96ms β1581.98ms β no change β β QQuery 18β 3069.41ms β3072.93ms β no change β β QQuery 19β83.51ms β 85.65ms β no change β β QQuery 20β 1123.00ms β1140.04ms β no change β β QQuery 21β 1333.50ms β1336.50ms β no change β β QQuery 22β 2175.85ms β2185.26ms β no change β β QQuery 23β 8284.06ms β8470.20ms β no change β β QQuery 24β 461.57ms β 485.77ms β 1.05x slower β β QQuery 25β 384.10ms β 394.93ms β no change β β QQuery 26β 531.30ms β 545.07ms β no change β β QQuery 27β 1670.90ms β1705.20ms β no change β β QQuery 28β 12167.48ms β 12606.28ms β
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2848571591 > Given we have past evidence this approach will work I think we could merge it before the queries really sped up. My big concern is that we have a plan to eventually avoid both single and multi blocked management The mainly hard point for removing single management is the `sorted aggregation`: - We need `Emit::First(exactly n)` for `sorted aggregation` - Supporting `Emit::First(exactly n)` in blocked management is actually too expansive... Actually I encounter the same problem in #12996 ... I think maybe it is a common problem about how to support `sorted aggregation`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2848565268 > What would be required to improve the performance for one or more of the real clickbench queries? Implementing group management for other data types? Yes. Usually high cardinality aggregation can get improvement from, like `q16~q18` and `q32`(group by `UserID` or `WatchID`). But we need to implement group management for multiple data types for `GroupValues`, or for multiple `GroupAccumulator`. I think it may be too complex for the initial pr... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2848561993 π€ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.11.0-1013-gcp #13~24.04.1-Ubuntu SMP Wed Apr 2 16:34:16 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing intermeidate-result-blocked-approach (ff9c3adc4523e7df83ee635679926f083dda) to 74dc4196858784d7872b21bbfc97edc564e47c5e [diff](https://github.com/apache/datafusion/compare/74dc4196858784d7872b21bbfc97edc564e47c5e..ff9c3adc4523e7df83ee635679926f083dda) Benchmarks: tpch_mem clickbench_partitioned clickbench_extended Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2848559813 > I am back from holiday, and continue to work on this today. Welcome back! > Emmm... As expected the new added query is not convered, I think I should submit an new pr for adding the query to extended.sql for checking the improvement... Just wait a minute. Makes sense > And actually still don't have such a simple query can show the improvement now... What would be required to improve the performance for one or more of the real clickbench queries? Implementing group management for other data types? Given we have past evidence this approach will work I think we could merge it before the queries really sped up. My big concern is that we have a plan to eventually avoid both single and multi blocked management -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2848557998 > π€: Benchmark completed > Details > > ``` > Comparing HEAD and intermeidate-result-blocked-approach > > Benchmark clickbench_extended.json > > ββββ³β³βββ³βββ > β Queryβ HEAD β intermeidate-result-blocked-approach β Change β > β‘ββββββββββ© > β QQuery 0 β 1921.22ms β1898.23ms βno change β > β QQuery 1 β 657.26ms β 694.92ms β 1.06x slower β > β QQuery 2 β 1398.94ms β1419.32ms βno change β > β QQuery 3 β 713.54ms β 717.51ms βno change β > β QQuery 4 β 1498.02ms β1511.57ms βno change β > β QQuery 5 β 15341.63ms β 15578.65ms βno change β > β QQuery 6 β 2029.05ms β2049.81ms βno change β > ββββ΄β΄βββ΄βββ > βββ³β > β Benchmark Summary ββ > β‘βββ© > β Total Time (HEAD) β 23559.67ms β > β Total Time (intermeidate-result-blocked-approach) β 23870.00ms β > β Average Time (HEAD) β 3365.67ms β > β Average Time (intermeidate-result-blocked-approach) β 3410.00ms β > β Queries Faster β 0 β > β Queries Slower β 1 β > β Queries with No Change β 6 β > βββ΄β > > Benchmark clickbench_partitioned.json > > ββββ³β³βββ³ββββ > β Queryβ HEAD β intermeidate-result-blocked-approach β Change β > β‘βββββββββββ© > β QQuery 0 β 2.32ms β 2.31ms β no change β > β QQuery 1 β36.90ms β 38.43ms β no change β > β QQuery 2 β91.60ms β 93.54ms β no change β > β QQuery 3 β 100.24ms β 99.54ms β no change β > β QQuery 4 β 777.83ms β 801.76ms β no change β > β QQuery 5 β 860.11ms β 850.06ms β no change β > β QQuery 6 β 2.10ms β 2.13ms β no change β > β QQuery 7 β41.49ms β 43.64ms β 1.05x slower β > β QQuery 8 β 925.11ms β 877.46ms β +1.05x faster β > β QQuery 9 β 1201.13ms β1193.97ms β no change β > β QQuery 10β 262.49ms β 270.08ms β no change β > β QQuery 11β 302.47ms β 313.89ms β no change β > β QQuery 12β 902.20ms β 956.22ms β 1.06x slower β > β QQuery 13β 1325.06ms β1388.51ms β no change β > β QQuery 14β 826.08ms β 851.33ms β no change β > β QQuery 15β 1026.88ms β1061.12ms β no change β > β QQuery 16β 1720.69ms β1731.12ms β no change β > β QQuery 17β 1582.12ms β1599.34ms β no change β > β QQuery 18β 3066.54ms β3254.59ms β 1.06x slower β > β QQuery 19β84.76ms β 82.92ms β no change β > β QQuery 20β 1116.50ms β1178.43ms β 1.06x slower β > β QQuery 21β 1315.33ms β1318.33ms β no change β > β QQuery 22β 2190.24ms β2170.50ms β no change β > β QQuery 23β 8290.39ms β8370.27ms β no change β > β QQuery 24β 470.87ms β 486.49ms β no change β > β QQuery 25β 388.04ms β 399.37ms β no change β > β QQuery 26β 527.09ms β 542.36ms β no change β > β QQuery 27β 1686.91ms β
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2848554286 π€: Benchmark completed Details ``` Comparing HEAD and intermeidate-result-blocked-approach Benchmark clickbench_extended.json ββββ³β³βββ³βββ β Queryβ HEAD β intermeidate-result-blocked-approach β Change β β‘ββββββββββ© β QQuery 0 β 1921.22ms β1898.23ms βno change β β QQuery 1 β 657.26ms β 694.92ms β 1.06x slower β β QQuery 2 β 1398.94ms β1419.32ms βno change β β QQuery 3 β 713.54ms β 717.51ms βno change β β QQuery 4 β 1498.02ms β1511.57ms βno change β β QQuery 5 β 15341.63ms β 15578.65ms βno change β β QQuery 6 β 2029.05ms β2049.81ms βno change β ββββ΄β΄βββ΄βββ βββ³β β Benchmark Summary ββ β‘βββ© β Total Time (HEAD) β 23559.67ms β β Total Time (intermeidate-result-blocked-approach) β 23870.00ms β β Average Time (HEAD) β 3365.67ms β β Average Time (intermeidate-result-blocked-approach) β 3410.00ms β β Queries Faster β 0 β β Queries Slower β 1 β β Queries with No Change β 6 β βββ΄β Benchmark clickbench_partitioned.json ββββ³β³βββ³ββββ β Queryβ HEAD β intermeidate-result-blocked-approach β Change β β‘βββββββββββ© β QQuery 0 β 2.32ms β 2.31ms β no change β β QQuery 1 β36.90ms β 38.43ms β no change β β QQuery 2 β91.60ms β 93.54ms β no change β β QQuery 3 β 100.24ms β 99.54ms β no change β β QQuery 4 β 777.83ms β 801.76ms β no change β β QQuery 5 β 860.11ms β 850.06ms β no change β β QQuery 6 β 2.10ms β 2.13ms β no change β β QQuery 7 β41.49ms β 43.64ms β 1.05x slower β β QQuery 8 β 925.11ms β 877.46ms β +1.05x faster β β QQuery 9 β 1201.13ms β1193.97ms β no change β β QQuery 10β 262.49ms β 270.08ms β no change β β QQuery 11β 302.47ms β 313.89ms β no change β β QQuery 12β 902.20ms β 956.22ms β 1.06x slower β β QQuery 13β 1325.06ms β1388.51ms β no change β β QQuery 14β 826.08ms β 851.33ms β no change β β QQuery 15β 1026.88ms β1061.12ms β no change β β QQuery 16β 1720.69ms β1731.12ms β no change β β QQuery 17β 1582.12ms β1599.34ms β no change β β QQuery 18β 3066.54ms β3254.59ms β 1.06x slower β β QQuery 19β84.76ms β 82.92ms β no change β β QQuery 20β 1116.50ms β1178.43ms β 1.06x slower β β QQuery 21β 1315.33ms β1318.33ms β no change β β QQuery 22β 2190.24ms β2170.50ms β no change β β QQuery 23β 8290.39ms β8370.27ms β no change β β QQuery 24β 470.87ms β 486.49ms β no change β β QQuery 25β 388.04ms β 399.37ms β no change β β QQuery 26β 527.09ms β 542.36ms β no change β β QQuery 27β 1686.91ms β1727.53ms β no change β β QQuery 28β 12444.07ms β 12631.97ms β
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2848554248
> how is the benchmark triggered and can we run clickbench extended too?
>
> upd: I didn't find improvement for extended query locally
Also for the new added one?
```
SELECT "WatchID", MIN("ResolutionWidth"), MAX("ResolutionWidth"),
SUM("IsRefresh") FROM hits GROUP BY "WatchID" ORDER BY "WatchID" DESC LIMIT 10;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2848551270
> > π€: Benchmark completed
>
> (I am surprised this PR didn't yield better results, I am reruning now to
see if the results are reproducable
I am back from holiday, and continue to work on this today.
Because for simplicity for the initial pr, I just:
- Implement the blocked version `GroupValuesPrimitive` for `GroupValues`
- and `PrimitiveGroupsAccumulator` for `GroupAccumulator`
And actually still don't have such a simple query can show the improvement
now...
And for showing the improvement, I add an new query in
`clickbench/extended.sql`:
```sql
SELECT "WatchID", MIN("ResolutionWidth"), MAX("ResolutionWidth"),
SUM("IsRefresh") FROM hits GROUP BY "WatchID" ORDER BY "WatchID" DESC LIMIT 10;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
Rachelint commented on PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2848551267
> > π€: Benchmark completed
>
> (I am surprised this PR didn't yield better results, I am reruning now to
see if the results are reproducable
I am back from holiday, and continue to work on this today.
Because for simplicity for the initial pr, I just:
- Implement the blocked version `GroupValuesPrimitive` for `GroupValues`
- and `PrimitiveGroupsAccumulator` for `GroupAccumulator`
And actually still don't have such a simple query can show the improvement
now...
And for showing the improvement, I add an new query in
`clickbench/extended.sql`:
```sql
SELECT "WatchID", MIN("ResolutionWidth"), MAX("ResolutionWidth"),
SUM("IsRefresh") FROM hits GROUP BY "WatchID" ORDER BY "WatchID" DESC LIMIT 10;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2848548914 π€ `./gh_compare_branch.sh` [Benchmark Script](https://github.com/alamb/datafusion-benchmarking/blob/main/gh_compare_branch.sh) Running Linux aal-dev 6.11.0-1013-gcp #13~24.04.1-Ubuntu SMP Wed Apr 2 16:34:16 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux Comparing intermeidate-result-blocked-approach (ff9c3adc4523e7df83ee635679926f083dda) to 74dc4196858784d7872b21bbfc97edc564e47c5e [diff](https://github.com/apache/datafusion/compare/74dc4196858784d7872b21bbfc97edc564e47c5e..ff9c3adc4523e7df83ee635679926f083dda) Benchmarks: tpch_mem clickbench_partitioned clickbench_extended Results will be posted here when complete -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2848548029 > π€: Benchmark completed (I am surprised this PR didn't yield better results, I am reruning now to see) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2848547823 > how is the benchmark triggered and can we run clickbench extended too? I am using some scripts in https://github.com/alamb/datafusion-benchmarking on a gcp machine (I haven't figured out how we would do this for the community in general I didn't run the extended tests because they ran into the following issue (which is how I found it initially) - https://github.com/apache/datafusion/issues/15927 Now that they are fixed I think I can run extended tests and will kick them off -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
jayzhan211 commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2848409762 how is the benchmark triggered and can we run clickbench extended too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement intermediate result blocked approach to aggregation memory management [datafusion]
alamb commented on PR #15591: URL: https://github.com/apache/datafusion/pull/15591#issuecomment-2848062289 π€: Benchmark completed Details ``` Comparing HEAD and intermeidate-result-blocked-approach Benchmark clickbench_1.json ββββ³β³βββ³ββββ β Queryβ HEAD β intermeidate-result-blocked-approach β Change β β‘βββββββββββ© β QQuery 0 β 0.57ms β 0.55ms β no change β β QQuery 1 β73.94ms β 75.00ms β no change β β QQuery 2 β 113.27ms β 118.51ms β no change β β QQuery 3 β 126.34ms β 126.51ms β no change β β QQuery 4 β 754.71ms β 789.90ms β no change β β QQuery 5 β 834.40ms β 836.35ms β no change β β QQuery 6 β 0.70ms β 0.62ms β +1.13x faster β β QQuery 7 β87.54ms β 89.84ms β no change β β QQuery 8 β 921.68ms β 928.83ms β no change β β QQuery 9 β 1217.57ms β1197.21ms β no change β β QQuery 10β 291.89ms β 308.86ms β 1.06x slower β β QQuery 11β 334.85ms β 344.94ms β no change β β QQuery 12β 892.72ms β 900.44ms β no change β β QQuery 13β 1317.81ms β1308.76ms β no change β β QQuery 14β 840.55ms β 857.04ms β no change β β QQuery 15β 1039.30ms β1101.46ms β 1.06x slower β β QQuery 16β 1749.21ms β1742.09ms β no change β β QQuery 17β 1583.83ms β1614.13ms β no change β β QQuery 18β 3067.32ms β3093.76ms β no change β β QQuery 19β 123.92ms β 115.09ms β +1.08x faster β β QQuery 20β 1153.71ms β1169.49ms β no change β β QQuery 21β 1396.36ms β1397.95ms β no change β β QQuery 22β 2417.82ms β2395.93ms β no change β β QQuery 23β 8439.01ms β8627.38ms β no change β β QQuery 24β 502.46ms β 491.22ms β no change β β QQuery 25β 427.03ms β 433.83ms β no change β β QQuery 26β 558.27ms β 575.80ms β no change β β QQuery 27β 1822.79ms β1834.22ms β no change β β QQuery 28β 12992.87ms β 12962.28ms β no change β β QQuery 29β 567.61ms β 569.25ms β no change β β QQuery 30β 834.68ms β 849.39ms β no change β β QQuery 31β 874.85ms β 911.69ms β no change β β QQuery 32β 2620.91ms β2712.46ms β no change β β QQuery 33β 3374.68ms β3414.17ms β no change β β QQuery 34β 3508.23ms β3471.25ms β no change β β QQuery 35β 1299.74ms β1283.55ms β no change β β QQuery 36β 168.83ms β 178.94ms β 1.06x slower β β QQuery 37β 100.58ms β 104.48ms β no change β β QQuery 38β 169.06ms β 175.10ms β no change β β QQuery 39β 251.47ms β 256.55ms β no change β β QQuery 40β84.92ms β 88.19ms β no change β β QQuery 41β85.41ms β 83.15ms β no change β β QQuery 42β79.02ms β 77.87ms β no change β ββββ΄β΄βββ΄ββββ βββ³β β Benchmark Summary ββ β‘βββ© β Total Time (HEAD) β 59102.46ms β β Total Time (intermeidate-result-blocked-approach) β 59614.05ms β β Average Time (HEAD) β 1374.48ms β β Average Time (intermeidate-result-blocked-approach) β 1386.37ms β β
