alamb opened a new issue #790:
URL: https://github.com/apache/arrow-datafusion/issues/790


   # Rationale
   
   1. The current GroupByHash operator does not take NULL into account and thus 
produces incorrect answers when grouping on columns that contain NULL, as 
described in #782 and #781.
   2. Without additional changes, adding support for NULL in grouping will 
likely both slow down group by hashing as well as increase the memory overhead 
per group (e.g. see #786)
   
   Thus this ticket proposes to rearrange the GroupByHash code to be more 
efficient in both space and time, thus providing us with a performance budget 
to add NULL support without an overall regression
   
   ## Overview of Current GroupByHash
   
   This section explains the current state of GroupByHash on master at 
https://github.com/apache/arrow-datafusion/commit/54163410da05e8e6c68af55d699bf6a89e229bb6
   
   At a high level, the group by hash does the following for each input row, in 
a vectorized fashion:
   1. Compute the group by key values (the expressions that appear in the GROUP 
BY clause)
   2. Form a key out of the group by values
   2. Find/Create an entry in hash map of (key values) --> (accumulators)
   3. Update the accumulators (one for each aggregate, such as COUNT that 
appears in the query) with the arguments
   
   When all the input has been processed, then the hash table is drained, 
producing one row for each entry in the hash table, in the following manner:
   
   ```
   (group key1, group key2, ...) (aggregate1, aggregrate2, ...)
   ```
   
   So for example, given a query such as
   ```sql
   SELECT SUM(c1)
   FROM t
   GROUP BY k1, abs(k2)
   ```
   
   This looks something like
   
   
   ```
    ┌────┐ ┌────┐               ┌────┐ ┌────┐         ┌─────────────┐
    │    │ │    │               │    │ │    │────────▶│  key 1234   │
    │    │ │    │               │    │ │    │         └─────────────┘
    │    │ │    │               │    │ │    │
    │    │ │    │               │    │ │    │         ┌─────────────┐
    │    │ │    │               │    │ │    │────────▶│   key 23    │
    │    │ │    │               │    │ │abs │         └─────────────┘
    │ k1 │ │ k2 │──────────────▶│ k1 │ │(k2)│
    │    │ │    │               │    │ │    │
    │    │ │    │               │    │ │    │        ...
    │    │ │    │               │    │ │    │
    │    │ │    │               │    │ │    │
    │    │ │    │               │    │ │    │         ┌─────────────┐
    │    │ │    │               │    │ │    │────────▶│   key 321   │
    └────┘ └────┘               └────┘ └────┘         └─────────────┘
                                 group by key
     input data                     values
                     step 1:    (group_values)        step 2:
                    evaluate               create a variable sized hash
                    gby exprs                 table key for each row
   ```
   
   The hash table that is formed looks something like this:
   
   ```
   ┌───────────────┐         ┌──────────────────┬────────────┬─────────────────┐
   │┌─────────────┐│         │    Group Keys    │Accumulator │   row_indices   │
   ││  key 1234   ├┼────────▶│Box[GroupByScalar]│  for SUM   │    Vec<u32>     │
   │└─────────────┘│         │ formed key 1234  │            │ (scratch space) │
   │               │         └──────────────────┴────────────┴─────────────────┘
   │               │
   │     ...       │
   │               │                  ...
   │               │
   │               │
   │               │
   │               │         ┌──────────────────┬────────────┬─────────────────┐
   │┌─────────────┐│         │    Group Keys    │Accumulator │   row_indices   │
   ││   key 321   │├────────▶│Box[GroupByScalar]│  for SUM   │    Vec<u32>     │
   │└─────────────┘│         │  formed key 321  │            │ (scratch space) │
   └───────────────┘         └──────────────────┴────────────┴─────────────────┘
     hash table
   "accumulators"
   
   
              Step 3:                   NOTE:  Each entry in the hash table has
    The keys are used to find an        1. The original group keys
      entry in the hash table           2. The accumulators
       which then are mapped            3. row_indexes scratch space
   ```
   
   
   ## Key Formation
   
   The current state of the art, introduced in  
93de66ae67a33764ac4029b0f825c415b9b2e92d /  
https://github.com/apache/arrow/pull/8863 by @Dandandan is quite clever. The 
code in 
[`create_key`](https://github.com/apache/arrow-datafusion/blob/30693df8961dca300306dfd0c8fca130375b50b3/datafusion/src/physical_plan/hash_aggregate.rs#L647),
 packs data from the group keys together into a single `mut Vec` which is then 
used as the key for the hash table
   
   For example, if the input row was:
   ```
   {
     k1: "foo"
     k2: 0x1234 as u16
   }
   ```
   
   The resuling key is a 13 byte `Vec`, 11 bytes for "foo" (8 bytes for the 
length + 3 bytes for "foo") and 2 bytes for 0x1234, a 16 bit integer:
   
   ```
                           │        │
        string len                      0x1234
       (as usize le)       │  "foo" │    as le
   ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐
   │00│00│00│00│00│00│00│03│"f│"o│"o│34│12│
   └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘   byte
    0  1  2  3  4  5  6  7 │8  9  10│11 12   offset
   
                           │        │
   
   ```
   
   However, there are at least a few downsides of this approach:
   1. There is no way to represent NULL as mentioned by @Dandandan on 
https://github.com/apache/arrow-datafusion/issues/782#issuecomment-887698068
   2. The data for each group key value is currently stored twice -- once in 
the `Vec` key and once in the values as a `GroupByScalar` used to produce the 
output -- resulting in memory overhead, especially for variable length (e.g. 
string) values
   
   # Proposal
   
   Modeled after what I think @Dandandan  is suggesting in 
https://github.com/apache/arrow-datafusion/pull/786#issuecomment-888229063:
   
   The HashTable would not store the key or aggregate accumulators directly, 
but instead would map "signatures" computed from the group by keys to list 
offsets in a mutable storage area that contained the values and aggregates.
   
   The "signature" is simply a hash of the values (and validitiy bit) of the 
group key values. The term "signature" is used to avoid confusion with the hash 
used in the hash table. It would be computed as a `u64` or similar directly 
from the group by key values
   ```
   ┌────┐ ┌────┐         ┌─────────────┐
   │    │ │    │────────▶│   0x1133    │
   │    │ │    │         └─────────────┘
   │    │ │    │         ┌─────────────┐
   │    │ │    │────────▶│   0x432A    │
   │    │ │    │         └─────────────┘
   │    │ │abs │
   │ k1 │ │(k2)│
   │    │ │    │         ...
   │    │ │    │
   │    │ │    │
   │    │ │    │
   │    │ │    │         ┌─────────────┐
   │    │ │    │────────▶│   0x432A    │
   └────┘ └────┘         └─────────────┘
    group by key
       values
   (group_values)
   
   Step 2: Create a FIXED LENGTH signature
     (e.g. u64) by hashing the values in
               the group by key
   ```
   
   The hashtable composition would be different. Each entry is a `SmallVec` 
(non allocating Vec) containing a list of indicies into a "mutable storage" area
   ```
   ┌───────────────┐
   │┌─────────────┐│    ┌───────┐                       ┌──────┐┌──────┐ 
┌────────────┐
   ││   0x1133    ├┼───▶│  [1]  │─ ─ ─ ─ ─ ─            │      ││      │ │      
      │
   │└─────────────┘│    └───────┘           └ ─ ─ ─ ─ ─ │      ││      │ │      
      │
   │               │                                    │  k1  ││ abs  │ │      
      │
   │               │                         ─ ─ ─ ─ ─ ▶│values││ (k2) │ │      
      │
   │     ...       │                        │           │      ││values│ 
│Accumulators│
   │               │       ...                          │ (and ││      │ │  for 
SUM   │
   │               │                        │           │valid ││ (and │ │      
      │
   │               │                         ─ ─ ─ ─ ─ ▶│mask) ││valid │ │      
      │
   │               │                        │           │      ││mask) │ │      
      │
   │               │                                    │      ││      │ │      
      │
   │┌─────────────┐│    ┌───────┐           │           │      ││      │ │      
      │
   ││   0x432A    │├───▶│ [2,4] │─ ─ ─ ─ ─ ─            └──────┘└──────┘ 
└────────────┘
   │└─────────────┘│    └───────┘
   └───────────────┘               values are lists
    keys are gby key               (SmallVec) of         mutable storage
    signatures                     offsets into
                                   storage tables
   hashtable
   ```
   
   The mutable storage area contains:
   1. A `Vec` of `ScalarValues` for each group key column
   2. The `Vec` of accumulators for each grouping
   
   For example, this is one example of how (logically) this mutable storage 
would work
   
   ```
           valid          valid
            bit            bit
           mask           mask
    ┌────┐┌────┐   ┌────┐┌────┐   ┌────┐
    │"D" ││ t  │   │ 1  ││ t  │   │ 11 │
    ├────┤├────┤   ├────┤├────┤   ├────┤
    │"C" ││ t  │   │ 3  ││ t  │   │ 3  │
    ├────┤├────┤   ├────┤├────┤   ├────┤
    │"A" ││ t  │   │ 1  ││ t  │   │ 27 │
    ├────┤├────┤   ├────┤├────┤   ├────┤
    │"D" ││ t  │   │ 2  ││ t  │   │ 2  │
    ├────┤├────┤   ├────┤├────┤   ├────┤
    │ "" ││ t  │   │ 0  ││ f  │   │ 4  │
    └────┘└────┘   └────┘└────┘   └────┘
   
      group by key storage      Accumulator
           (5 groups)             for SUM
                               (aggregates)
   
   
      Example showing how groups
     (D,1), (C,3), (A,1), (D,2),
         (NULL, 0) are stored
   ```
   
   I propose using `Vec<ScalarValue>` to store the group key values in the 
mutable area as there is no equivalent of a mutable `Array` in arrow-rs yet 
(though I think there is 
[`MutablePrimitiveArray`](https://github.com/jorgecarleitao/arrow2/blob/b0f9da6f2feb35580871186a594cd4bbb7fe23f3/src/array/primitive/mutable.rs)
 in arrow2). If/when we get access to a mutable array in datafusion, we can 
potentially switch to using that representation for the mutable storage area, 
which would likely both take less memory for some data types, but also allow 
for faster output generation.
   
   # Alternatives considered
   
   One alternate that would require fewer changes but be slightly slower would 
be to append a validity bitmap on the end of both the keys and values in the 
hash table. For example
   
   ```
         Alternate Design
    ┌───────────────┐         
┌──────────────────┬────────────┬─────────────────╦ ═ ═ ═ ═ ═ ═ ═ ═ ╗
    │┌─────────────┐│         │    Group Keys    │Accumulator │   row_indices   
│  Null Bitmask
    ││  key 1234   ├┼────────▶│Box[GroupByScalar]│  for SUM   │    Vec<u32>     
║      (NEW)      ║
    │└─────────────┘│         │ formed key 1234  │            │ (scratch space) 
│
    │               │         
└──────────────────┴────────────┴─────────────────╩ ═ ═ ═ ═ ═ ═ ═ ═ ╝
    │               │
    │     ...       │
    │               │                  ...
    │               │
    │               │
    │               │
    │               │         
┌──────────────────┬────────────┬─────────────────╦ ═ ═ ═ ═ ═ ═ ═ ═ ╗
    │┌─────────────┐│         │    Group Keys    │Accumulator │   row_indices   
│  Null Bitmask
    ││  key 3211   │├────────▶│Box[GroupByScalar]│  for SUM   │    Vec<u32>     
║      (NEW)      ║
    │└─────────────┘│         │ formed key 3211  │            │ (scratch space) 
│
    └───────────────┘         
└──────────────────┴────────────┴─────────────────╩ ═ ═ ═ ═ ═ ═ ═ ═ ╝
   ```
   
   And the keys would have a null bitmask appended on the end:
   ```
                                              │        │
                           string len                      0x1234
                          (as usize le)       │  "foo" │    as le
   {                  ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──╦ ═
     k1: "foo"        │00│00│00│00│00│00│00│03│"f│"o│"0│34│12│00║◀ ─ ─
     k2: 0x1234u16    └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──╩ ═      │
   }                   0  1  2  3  4  5  6  7 │8  9  10│11 12 13
                                                                      │
                                              │        │
                                                                      │
   
                                                               New bitmask at
                                                                end of each
                                                                    key
   
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to