alamb opened a new issue #790: URL: https://github.com/apache/arrow-datafusion/issues/790
# Rationale 1. The current GroupByHash operator does not take NULL into account and thus produces incorrect answers when grouping on columns that contain NULL, as described in #782 and #781. 2. Without additional changes, adding support for NULL in grouping will likely both slow down group by hashing as well as increase the memory overhead per group (e.g. see #786) Thus this ticket proposes to rearrange the GroupByHash code to be more efficient in both space and time, thus providing us with a performance budget to add NULL support without an overall regression ## Overview of Current GroupByHash This section explains the current state of GroupByHash on master at https://github.com/apache/arrow-datafusion/commit/54163410da05e8e6c68af55d699bf6a89e229bb6 At a high level, the group by hash does the following for each input row, in a vectorized fashion: 1. Compute the group by key values (the expressions that appear in the GROUP BY clause) 2. Form a key out of the group by values 2. Find/Create an entry in hash map of (key values) --> (accumulators) 3. Update the accumulators (one for each aggregate, such as COUNT that appears in the query) with the arguments When all the input has been processed, then the hash table is drained, producing one row for each entry in the hash table, in the following manner: ``` (group key1, group key2, ...) (aggregate1, aggregrate2, ...) ``` So for example, given a query such as ```sql SELECT SUM(c1) FROM t GROUP BY k1, abs(k2) ``` This looks something like ``` ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌─────────────┐ │ │ │ │ │ │ │ │────────▶│ key 1234 │ │ │ │ │ │ │ │ │ └─────────────┘ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ┌─────────────┐ │ │ │ │ │ │ │ │────────▶│ key 23 │ │ │ │ │ │ │ │abs │ └─────────────┘ │ k1 │ │ k2 │──────────────▶│ k1 │ │(k2)│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ... │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ┌─────────────┐ │ │ │ │ │ │ │ │────────▶│ key 321 │ └────┘ └────┘ └────┘ └────┘ └─────────────┘ group by key input data values step 1: (group_values) step 2: evaluate create a variable sized hash gby exprs table key for each row ``` The hash table that is formed looks something like this: ``` ┌───────────────┐ ┌──────────────────┬────────────┬─────────────────┐ │┌─────────────┐│ │ Group Keys │Accumulator │ row_indices │ ││ key 1234 ├┼────────▶│Box[GroupByScalar]│ for SUM │ Vec<u32> │ │└─────────────┘│ │ formed key 1234 │ │ (scratch space) │ │ │ └──────────────────┴────────────┴─────────────────┘ │ │ │ ... │ │ │ ... │ │ │ │ │ │ │ │ ┌──────────────────┬────────────┬─────────────────┐ │┌─────────────┐│ │ Group Keys │Accumulator │ row_indices │ ││ key 321 │├────────▶│Box[GroupByScalar]│ for SUM │ Vec<u32> │ │└─────────────┘│ │ formed key 321 │ │ (scratch space) │ └───────────────┘ └──────────────────┴────────────┴─────────────────┘ hash table "accumulators" Step 3: NOTE: Each entry in the hash table has The keys are used to find an 1. The original group keys entry in the hash table 2. The accumulators which then are mapped 3. row_indexes scratch space ``` ## Key Formation The current state of the art, introduced in 93de66ae67a33764ac4029b0f825c415b9b2e92d / https://github.com/apache/arrow/pull/8863 by @Dandandan is quite clever. The code in [`create_key`](https://github.com/apache/arrow-datafusion/blob/30693df8961dca300306dfd0c8fca130375b50b3/datafusion/src/physical_plan/hash_aggregate.rs#L647), packs data from the group keys together into a single `mut Vec` which is then used as the key for the hash table For example, if the input row was: ``` { k1: "foo" k2: 0x1234 as u16 } ``` The resuling key is a 13 byte `Vec`, 11 bytes for "foo" (8 bytes for the length + 3 bytes for "foo") and 2 bytes for 0x1234, a 16 bit integer: ``` │ │ string len 0x1234 (as usize le) │ "foo" │ as le ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┐ │00│00│00│00│00│00│00│03│"f│"o│"o│34│12│ └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘ byte 0 1 2 3 4 5 6 7 │8 9 10│11 12 offset │ │ ``` However, there are at least a few downsides of this approach: 1. There is no way to represent NULL as mentioned by @Dandandan on https://github.com/apache/arrow-datafusion/issues/782#issuecomment-887698068 2. The data for each group key value is currently stored twice -- once in the `Vec` key and once in the values as a `GroupByScalar` used to produce the output -- resulting in memory overhead, especially for variable length (e.g. string) values # Proposal Modeled after what I think @Dandandan is suggesting in https://github.com/apache/arrow-datafusion/pull/786#issuecomment-888229063: The HashTable would not store the key or aggregate accumulators directly, but instead would map "signatures" computed from the group by keys to list offsets in a mutable storage area that contained the values and aggregates. The "signature" is simply a hash of the values (and validitiy bit) of the group key values. The term "signature" is used to avoid confusion with the hash used in the hash table. It would be computed as a `u64` or similar directly from the group by key values ``` ┌────┐ ┌────┐ ┌─────────────┐ │ │ │ │────────▶│ 0x1133 │ │ │ │ │ └─────────────┘ │ │ │ │ ┌─────────────┐ │ │ │ │────────▶│ 0x432A │ │ │ │ │ └─────────────┘ │ │ │abs │ │ k1 │ │(k2)│ │ │ │ │ ... │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ┌─────────────┐ │ │ │ │────────▶│ 0x432A │ └────┘ └────┘ └─────────────┘ group by key values (group_values) Step 2: Create a FIXED LENGTH signature (e.g. u64) by hashing the values in the group by key ``` The hashtable composition would be different. Each entry is a `SmallVec` (non allocating Vec) containing a list of indicies into a "mutable storage" area ``` ┌───────────────┐ │┌─────────────┐│ ┌───────┐ ┌──────┐┌──────┐ ┌────────────┐ ││ 0x1133 ├┼───▶│ [1] │─ ─ ─ ─ ─ ─ │ ││ │ │ │ │└─────────────┘│ └───────┘ └ ─ ─ ─ ─ ─ │ ││ │ │ │ │ │ │ k1 ││ abs │ │ │ │ │ ─ ─ ─ ─ ─ ▶│values││ (k2) │ │ │ │ ... │ │ │ ││values│ │Accumulators│ │ │ ... │ (and ││ │ │ for SUM │ │ │ │ │valid ││ (and │ │ │ │ │ ─ ─ ─ ─ ─ ▶│mask) ││valid │ │ │ │ │ │ │ ││mask) │ │ │ │ │ │ ││ │ │ │ │┌─────────────┐│ ┌───────┐ │ │ ││ │ │ │ ││ 0x432A │├───▶│ [2,4] │─ ─ ─ ─ ─ ─ └──────┘└──────┘ └────────────┘ │└─────────────┘│ └───────┘ └───────────────┘ values are lists keys are gby key (SmallVec) of mutable storage signatures offsets into storage tables hashtable ``` The mutable storage area contains: 1. A `Vec` of `ScalarValues` for each group key column 2. The `Vec` of accumulators for each grouping For example, this is one example of how (logically) this mutable storage would work ``` valid valid bit bit mask mask ┌────┐┌────┐ ┌────┐┌────┐ ┌────┐ │"D" ││ t │ │ 1 ││ t │ │ 11 │ ├────┤├────┤ ├────┤├────┤ ├────┤ │"C" ││ t │ │ 3 ││ t │ │ 3 │ ├────┤├────┤ ├────┤├────┤ ├────┤ │"A" ││ t │ │ 1 ││ t │ │ 27 │ ├────┤├────┤ ├────┤├────┤ ├────┤ │"D" ││ t │ │ 2 ││ t │ │ 2 │ ├────┤├────┤ ├────┤├────┤ ├────┤ │ "" ││ t │ │ 0 ││ f │ │ 4 │ └────┘└────┘ └────┘└────┘ └────┘ group by key storage Accumulator (5 groups) for SUM (aggregates) Example showing how groups (D,1), (C,3), (A,1), (D,2), (NULL, 0) are stored ``` I propose using `Vec<ScalarValue>` to store the group key values in the mutable area as there is no equivalent of a mutable `Array` in arrow-rs yet (though I think there is [`MutablePrimitiveArray`](https://github.com/jorgecarleitao/arrow2/blob/b0f9da6f2feb35580871186a594cd4bbb7fe23f3/src/array/primitive/mutable.rs) in arrow2). If/when we get access to a mutable array in datafusion, we can potentially switch to using that representation for the mutable storage area, which would likely both take less memory for some data types, but also allow for faster output generation. # Alternatives considered One alternate that would require fewer changes but be slightly slower would be to append a validity bitmap on the end of both the keys and values in the hash table. For example ``` Alternate Design ┌───────────────┐ ┌──────────────────┬────────────┬─────────────────╦ ═ ═ ═ ═ ═ ═ ═ ═ ╗ │┌─────────────┐│ │ Group Keys │Accumulator │ row_indices │ Null Bitmask ││ key 1234 ├┼────────▶│Box[GroupByScalar]│ for SUM │ Vec<u32> ║ (NEW) ║ │└─────────────┘│ │ formed key 1234 │ │ (scratch space) │ │ │ └──────────────────┴────────────┴─────────────────╩ ═ ═ ═ ═ ═ ═ ═ ═ ╝ │ │ │ ... │ │ │ ... │ │ │ │ │ │ │ │ ┌──────────────────┬────────────┬─────────────────╦ ═ ═ ═ ═ ═ ═ ═ ═ ╗ │┌─────────────┐│ │ Group Keys │Accumulator │ row_indices │ Null Bitmask ││ key 3211 │├────────▶│Box[GroupByScalar]│ for SUM │ Vec<u32> ║ (NEW) ║ │└─────────────┘│ │ formed key 3211 │ │ (scratch space) │ └───────────────┘ └──────────────────┴────────────┴─────────────────╩ ═ ═ ═ ═ ═ ═ ═ ═ ╝ ``` And the keys would have a null bitmask appended on the end: ``` │ │ string len 0x1234 (as usize le) │ "foo" │ as le { ┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──╦ ═ k1: "foo" │00│00│00│00│00│00│00│03│"f│"o│"0│34│12│00║◀ ─ ─ k2: 0x1234u16 └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──╩ ═ │ } 0 1 2 3 4 5 6 7 │8 9 10│11 12 13 │ │ │ │ New bitmask at end of each key ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org