alamb opened a new issue, #6937: URL: https://github.com/apache/arrow-datafusion/issues/6937
### Is your feature request related to a problem or challenge? When running a query with "high cardinality" grouping in DataFusion, the memory usage increases linearly both with the number of groups (expected) but *also* with the number of cores. Is the root cause of @ychen7's observation that ClickBench q32 fails As https://github.com/apache/arrow-datafusion/issues/5276#issuecomment-1504574207 To reproduce, get the ClickBench data https://github.com/ClickHouse/ClickBench/tree/main#data-loading and run this: ```sql CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION 'hits.parquet'; set datafusion.execution.target_partitions = 1; SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWidth") FROM hits GROUP BY "WatchID", "ClientIP" ORDER BY c DESC LIMIT 10; set datafusion.execution.target_partitions = 4; SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWidth") FROM hits GROUP BY "WatchID", "ClientIP" ORDER BY c DESC LIMIT 10; ``` This is what the memory usage looks like:  The reason for this behavior can be found in the plan and the multi-stage hash grouping that is done: ``` explain SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWidth") FROM hits GROUP BY "WatchID", "ClientIP" ORDER BY c DESC LIMIT 10; ``` ```text | physical_plan | GlobalLimitExec: skip=0, fetch=10 | | | SortPreservingMergeExec: [c@2 DESC] | | | SortExec: fetch=10, expr=[c@2 DESC] | | | ProjectionExec: expr=[WatchID@0 as WatchID, ClientIP@1 as ClientIP, COUNT(UInt8(1))@2 as c, SUM(hits.IsRefresh)@3 as SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)@4 as AVG(hits.ResolutionWidth)] | | | AggregateExec: mode=FinalPartitioned, gby=[WatchID@0 as WatchID, ClientIP@1 as ClientIP], aggr=[COUNT(UInt8(1)), SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)] | | | CoalesceBatchesExec: target_batch_size=8192 | | | RepartitionExec: partitioning=Hash([WatchID@0, ClientIP@1], 16), input_partitions=16 | | | AggregateExec: mode=Partial, gby=[WatchID@0 as WatchID, ClientIP@1 as ClientIP], aggr=[COUNT(UInt8(1)), SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)] | | | ParquetExec: file_groups={16 groups: [[Users/alamb/Software/clickbench_hits_compatible/hits.parquet:0..923748528], [Users/alamb/Software/clickbench_hits_compatible/hits.parquet:923748528..1847497056], [Users/alamb/Software/clickbench_hits_compatible/hits.parquet:1847497056..2771245584], [Users/alamb/Software/clickbench_hits_compatible/hits.parquet:2771245584..3694994112], [Users/alamb/Software/clickbench_hits_compatible/hits.parquet:3694994112..4618742640], ...]}, projection=[WatchID, ClientIP, IsRefresh, ResolutionWidth] | | | | +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ``` Specifically since the groups are arbitrarily distributed in the files, the first `AggregateExec: mode=Partial` has to build a hash table that has entries for all groups. As the number of target partitions goes up, the number of ` AggregateExec: mode=Partial` goes up to and thus so does the number of copies of the data The `AggregateExec: mode=FinalPartitioned` only see a distinct subset of the keys and thus as the number of target partitions goes up there are more `AggregateExec: mode=FinalPartitioned` s each sees a smaller and smaller subset of the group keys In pictures: ``` ▲ ▲ │ │ │ │ │ │ │ │ │ │ ┌───────────────────────┐ ┌───────────────────────┐ 4. Each AggregateMode::Final │GroupBy │ │GroupBy │ GroupBy has an entry for its │(AggregateMode::Final) │ │(AggregateMode::Final) │ subset of groups (in this case │ │ │ │ that means half the entries) └───────────────────────┘ └───────────────────────┘ ▲ ▲ │ │ └─────────────┬────────────┘ │ │ │ ┌─────────────────────────┐ 3. Repartitioning by hash(group │ Repartition │ keys) ensures that each distinct │ HASH(x) │ group key now appears in exactly └─────────────────────────┘ one partition ▲ │ ┌───────────────┴─────────────┐ │ │ │ │ ┌─────────────────────────┐ ┌──────────────────────────┐ 2. Each AggregateMode::Partial │ GroubyBy │ │ GroubyBy │ GroupBy has an entry for *all* │(AggregateMode::Partial) │ │ (AggregateMode::Partial) │ the groups └─────────────────────────┘ └──────────────────────────┘ ▲ ▲ │ ┌┘ │ │ .─────────. .─────────. ,─' '─. ,─' '─. ; Input : ; Input : 1. Since input data is : Partition 0 ; : Partition 1 ; arbitrarily or RoundRobin ╲ ╱ ╲ ╱ distributed, each partition '─. ,─' '─. ,─' likely has all distinct `───────' `───────' ``` Some example data: ``` ┌─────┐ ┌─────┐ │ 1 │ │ 3 │ ├─────┤ ├─────┤ │ 2 │ │ 4 │ After repartitioning by └─────┘ └─────┘ hash(group keys), each distinct ┌─────┐ ┌─────┐ group key now appears in exactly │ 1 │ │ 3 │ one partition ├─────┤ ├─────┤ │ 2 │ │ 4 │ └─────┘ └─────┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌─────┐ ┌─────┐ │ 2 │ │ 2 │ ├─────┤ ├─────┤ │ 1 │ │ 2 │ ├─────┤ ├─────┤ │ 3 │ │ 3 │ ├─────┤ ├─────┤ │ 4 │ │ 1 │ └─────┘ └─────┘ Input data is arbitrarily or ... ... RoundRobin distributed, each ┌─────┐ ┌─────┐ partition likely has all │ 1 │ │ 4 │ distinct group keys ├─────┤ ├─────┤ │ 4 │ │ 3 │ ├─────┤ ├─────┤ │ 1 │ │ 1 │ ├─────┤ ├─────┤ │ 4 │ │ 3 │ └─────┘ └─────┘ group values group values in partition 0 in partition 1 ``` ### Describe the solution you'd like TLDR is I would like to propose updating the `AggregateExec: mode=Partial` to emit their hash tables if they see more than some fixed size number of groups (I think @mingmwang said DuckDB uses a value of `10,000` for this) This approach bounds the memory usage (to some fixed constant * the target partitioning) and also should perform quite well In the literature I think this approach could be called "dynamic partitioning" as it switches approaches based on the actual cardinality of the groups in the dataset ### Describe alternatives you've considered One potential thing that might be suggested is simply to repartition the input to `AggregateExec: mode=Partial` This approach would definitely reduce the memory requirements, but it would mean that we would have to hash repartition *all* the input rows so the number of input values that need to be hashed / copied would likely be **much** higher (at least as long as the group hasher and hash repartitioner can't share the hashes, which is the case today) The current strategy actually works very well for low cardinality group bys because the `AggregateExec: mode=Partial` can reduce the size of the intermediate result that needs to be hash repartitioned to a very small size ### Additional context We saw this in IOx while working on some tracing queries that look very similar to the ClickBench query, something like the following to get the top ten traces ```sql SELECT trace_id, max(time) FROM traces GROUP BY trace_id ORDER BY max(time) LIMIT 10; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
