alamb opened a new issue, #6937:
URL: https://github.com/apache/arrow-datafusion/issues/6937

   ### Is your feature request related to a problem or challenge?
   
   
   When running a query with "high cardinality" grouping in DataFusion, the 
memory usage increases linearly both with the number of groups (expected) but 
*also* with the number of cores.
   
   Is the root cause of @ychen7's observation that ClickBench q32 fails As 
https://github.com/apache/arrow-datafusion/issues/5276#issuecomment-1504574207
   
   To reproduce, get the ClickBench data 
https://github.com/ClickHouse/ClickBench/tree/main#data-loading and run this:
   
   ```sql
   CREATE EXTERNAL TABLE hits
   STORED AS PARQUET
   LOCATION 'hits.parquet';
   
   set datafusion.execution.target_partitions = 1;
   SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), 
AVG("ResolutionWidth") FROM hits GROUP BY "WatchID", "ClientIP" ORDER BY c DESC 
LIMIT 10;
   
   set datafusion.execution.target_partitions = 4;
   SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), 
AVG("ResolutionWidth") FROM hits GROUP BY "WatchID", "ClientIP" ORDER BY c DESC 
LIMIT 10;
   ```
   
   This is what the memory usage looks like:
   ![Screenshot 2023-07-12 at 4 07 10 
PM](https://github.com/apache/arrow-datafusion/assets/490673/8c0a7163-1f9c-4cde-a34d-8ac357c68a47)
   
   
   The reason for this behavior can be found in the plan and the multi-stage 
hash grouping that is done:
   
   ```
   explain SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), 
AVG("ResolutionWidth") FROM hits GROUP BY "WatchID", "ClientIP" ORDER BY c DESC 
LIMIT 10;
   ```
   
   
   ```text
   | physical_plan | GlobalLimitExec: skip=0, fetch=10                          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
     |
   |               |   SortPreservingMergeExec: [c@2 DESC]                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
     |
   |               |     SortExec: fetch=10, expr=[c@2 DESC]                    
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
     |
   |               |       ProjectionExec: expr=[WatchID@0 as WatchID, 
ClientIP@1 as ClientIP, COUNT(UInt8(1))@2 as c, SUM(hits.IsRefresh)@3 as 
SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)@4 as AVG(hits.ResolutionWidth)]  
                                                                                
                                                                                
                                                                                
                                                                                
                     |
   |               |         AggregateExec: mode=FinalPartitioned, 
gby=[WatchID@0 as WatchID, ClientIP@1 as ClientIP], aggr=[COUNT(UInt8(1)), 
SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)]                                 
                                                                                
                                                                                
                                                                                
                                                                                
                       |
   |               |           CoalesceBatchesExec: target_batch_size=8192      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
     |
   |               |             RepartitionExec: partitioning=Hash([WatchID@0, 
ClientIP@1], 16), input_partitions=16                                           
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
     |
   |               |               AggregateExec: mode=Partial, gby=[WatchID@0 
as WatchID, ClientIP@1 as ClientIP], aggr=[COUNT(UInt8(1)), 
SUM(hits.IsRefresh), AVG(hits.ResolutionWidth)]                                 
                                                                                
                                                                                
                                                                                
                                                                                
                          |
   |               |                 ParquetExec: file_groups={16 groups: 
[[Users/alamb/Software/clickbench_hits_compatible/hits.parquet:0..923748528], 
[Users/alamb/Software/clickbench_hits_compatible/hits.parquet:923748528..1847497056],
 
[Users/alamb/Software/clickbench_hits_compatible/hits.parquet:1847497056..2771245584],
 
[Users/alamb/Software/clickbench_hits_compatible/hits.parquet:2771245584..3694994112],
 
[Users/alamb/Software/clickbench_hits_compatible/hits.parquet:3694994112..4618742640],
 ...]}, projection=[WatchID, ClientIP, IsRefresh, ResolutionWidth] |
   |               |                                                            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
     |
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   ```
   
   Specifically since the groups are arbitrarily distributed in the files, the 
first `AggregateExec: mode=Partial` has to build a hash table that has entries 
for all groups. As the number of target partitions goes up, the number of ` 
AggregateExec: mode=Partial` goes up to and thus so does the number of copies 
of the data
   
   The `AggregateExec: mode=FinalPartitioned` only see a distinct subset of the 
keys and thus as the number of target partitions goes up  there are more 
`AggregateExec: mode=FinalPartitioned`  s each sees a smaller and smaller 
subset of the group keys
   
   In pictures:
   
   ```
                  ▲                          ▲                                  
                  
                  │                          │                                  
                  
                  │                          │                                  
                  
                  │                          │                                  
                  
                  │                          │                                  
                  
                  │                          │                                  
                  
      ┌───────────────────────┐  ┌───────────────────────┐       4. Each 
AggregateMode::Final     
      │GroupBy                │  │GroupBy                │       GroupBy has an 
entry for its     
      │(AggregateMode::Final) │  │(AggregateMode::Final) │       subset of 
groups (in this case   
      │                       │  │                       │       that means 
half the entries)     
      └───────────────────────┘  └───────────────────────┘                      
                  
                  ▲                          ▲                                  
                  
                  │                          │                                  
                  
                  └─────────────┬────────────┘                                  
                  
                                │                                               
                  
                                │                                               
                  
                                │                                               
                  
                   ┌─────────────────────────┐                   3. 
Repartitioning by hash(group  
                   │       Repartition       │                   keys) ensures 
that each distinct 
                   │         HASH(x)         │                   group key now 
appears in exactly 
                   └─────────────────────────┘                   one partition  
                  
                                ▲                                               
                  
                                │                                               
                  
                ┌───────────────┴─────────────┐                                 
                  
                │                             │                                 
                  
                │                             │                                 
                  
   ┌─────────────────────────┐  ┌──────────────────────────┐     2. Each 
AggregateMode::Partial   
   │        GroubyBy         │  │         GroubyBy         │     GroupBy has an 
entry for *all*   
   │(AggregateMode::Partial) │  │ (AggregateMode::Partial) │     the groups     
                  
   └─────────────────────────┘  └──────────────────────────┘                    
                  
                ▲                             ▲                                 
                  
                │                            ┌┘                                 
                  
                │                            │                                  
                  
           .─────────.                  .─────────.                             
                  
        ,─'           '─.            ,─'           '─.                          
                  
       ;      Input      :          ;      Input      :          1. Since input 
data is           
       :   Partition 0   ;          :   Partition 1   ;          arbitrarily or 
RoundRobin        
        ╲               ╱            ╲               ╱           distributed, 
each partition      
         '─.         ,─'              '─.         ,─'            likely has all 
distinct          
            `───────'                    `───────'                              
                                             
   ```
   
   Some example data:
   
   ```
                 ┌─────┐                ┌─────┐                                 
                
                 │  1  │                │  3  │                                 
                
                 ├─────┤                ├─────┤                                 
                
                 │  2  │                │  4  │                After 
repartitioning by          
                 └─────┘                └─────┘                hash(group 
keys), each distinct  
                 ┌─────┐                ┌─────┐                group key now 
appears in exactly 
                 │  1  │                │  3  │                one partition    
                
                 ├─────┤                ├─────┤                                 
                
                 │  2  │                │  4  │                                 
                
                 └─────┘                └─────┘                                 
                
                                                                                
                
                                                                                
                
   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                
                
                                                                                
                
                 ┌─────┐                ┌─────┐                                 
                
                 │  2  │                │  2  │                                 
                
                 ├─────┤                ├─────┤                                 
                
                 │  1  │                │  2  │                                 
                
                 ├─────┤                ├─────┤                                 
                
                 │  3  │                │  3  │                                 
                
                 ├─────┤                ├─────┤                                 
                
                 │  4  │                │  1  │                                 
                
                 └─────┘                └─────┘                Input data is 
arbitrarily or     
                   ...                    ...                  RoundRobin 
distributed, each     
                 ┌─────┐                ┌─────┐                partition likely 
has all         
                 │  1  │                │  4  │                distinct group 
keys              
                 ├─────┤                ├─────┤                                 
                
                 │  4  │                │  3  │                                 
                
                 ├─────┤                ├─────┤                                 
                
                 │  1  │                │  1  │                                 
                
                 ├─────┤                ├─────┤                                 
                
                 │  4  │                │  3  │                                 
                
                 └─────┘                └─────┘                                 
                
                                                                                
                
             group values           group values                                
                
             in partition 0         in partition 1                              
                
                                                                                
                
   ```
   
   ### Describe the solution you'd like
   
   TLDR is I would like to propose updating the `AggregateExec: mode=Partial` 
to emit their hash tables if they see more than some fixed size number of 
groups (I think @mingmwang  said DuckDB uses a value of `10,000` for this)
   
   This approach bounds the memory usage (to some fixed constant * the target 
partitioning) and also should perform quite well
   
   In the literature I think this approach could be called "dynamic 
partitioning" as it switches approaches based on the actual cardinality of the 
groups in the dataset
   
   
   
   ### Describe alternatives you've considered
   
   One potential thing that might be suggested is simply to repartition the 
input to `AggregateExec: mode=Partial`
   
   This approach would definitely reduce the memory requirements, but it would 
mean that we would have to hash repartition *all* the input rows so the number 
of input values that need to be hashed / copied would likely be  **much** 
higher (at least as long as the group hasher and hash repartitioner can't share 
the hashes, which is the case today)
   
   The current strategy actually works very well for low cardinality group bys 
because the `AggregateExec: mode=Partial` can reduce the size of the 
intermediate result that needs to be hash repartitioned to a very small size
   
   ### Additional context
   
   We saw this in IOx while working on some tracing queries that look very 
similar to the ClickBench query, something like the following to get the top 
ten traces
   
   ```sql
   SELECT trace_id, max(time)
   FROM traces
   GROUP BY trace_id
   ORDER BY max(time)
   LIMIT 10;
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to