This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch hash_agg_spike
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 814b1fc29b8b7a30cc1a9786c514f8fe5fe67c5b
Author: Andrew Lamb <[email protected]>
AuthorDate: Sat Jul 1 04:59:30 2023 -0400

    Add comments
---
 benchmarks/Cargo.toml                              |   1 +
 benchmarks/src/bin/tpch.rs                         |   3 +
 .../core/src/physical_plan/aggregates/row_hash2.rs | 136 ++++++++++++++++-----
 3 files changed, 107 insertions(+), 33 deletions(-)

diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 1d360739d8..f8ca0b2496 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -37,6 +37,7 @@ arrow = { workspace = true }
 datafusion = { path = "../datafusion/core", version = "27.0.0" }
 env_logger = "0.10"
 futures = "0.3"
+log = "^0.4"
 mimalloc = { version = "0.1", optional = true, default-features = false }
 num_cpus = "1.13.0"
 parquet = { workspace = true }
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 32359dc1f8..d18dd38b9b 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -16,7 +16,9 @@
 // under the License.
 
 //! Benchmark derived from TPC-H. This is not an official TPC-H benchmark.
+use log::info;
 
+use arrow::util::pretty::pretty_format_batches;
 use datafusion::datasource::file_format::{csv::CsvFormat, FileFormat};
 use datafusion::datasource::{MemTable, TableProvider};
 use datafusion::error::{DataFusionError, Result};
@@ -235,6 +237,7 @@ async fn benchmark_query(
         let elapsed = start.elapsed(); //.as_secs_f64() * 1000.0;
         let ms = elapsed.as_secs_f64() * 1000.0;
         millis.push(ms);
+        info!("output:\n\n{}\n\n", pretty_format_batches(&result)?);
         let row_count = result.iter().map(|b| b.num_rows()).sum();
         println!(
             "Query {query_id} iteration {i} took {ms:.1} ms and returned 
{row_count} rows"
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash2.rs 
b/datafusion/core/src/physical_plan/aggregates/row_hash2.rs
index 2eb058d8c5..3e9dbfe0cf 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash2.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash2.rs
@@ -58,66 +58,136 @@ pub(crate) enum ExecutionState {
 
 use super::AggregateExec;
 
-/// Grouping aggregate
+/// Hash based Grouping Aggregator
 ///
-/// For each aggregation entry, we use:
-/// - [Arrow-row] represents grouping keys for fast hash computation and 
comparison directly on raw bytes.
-/// - [GroupsAccumulator] to store per group aggregates
+/// # Design Goals
 ///
-/// The architecture is the following:
+/// This structure is designed so that much can be vectorized (done in
+/// a tight loop) as possible
 ///
-/// TODO
+/// # Architecture
 ///
-/// [WordAligned]: datafusion_row::layout
+/// ```text
+///
+/// stores "group       stores group values,       internally stores aggregate
+///    indexes"          in arrow_row format         values, for all groups
+///
+/// ┌─────────────┐      ┌────────────┐    ┌──────────────┐       
┌──────────────┐
+/// │   ┌─────┐   │      │ ┌────────┐ │    │┌────────────┐│       
│┌────────────┐│
+/// │   │  5  │   │ ┌────┼▶│  "A"   │ │    ││accumulator ││       
││accumulator ││
+/// │   ├─────┤   │ │    │ ├────────┤ │    ││     0      ││       ││     N     
 ││
+/// │   │  9  │   │ │    │ │  "Z"   │ │    ││ ┌────────┐ ││       ││ 
┌────────┐ ││
+/// │   └─────┘   │ │    │ └────────┘ │    ││ │ state  │ ││       ││ │ state  
│ ││
+/// │     ...     │ │    │            │    ││ │┌─────┐ │ ││  ...  ││ │┌─────┐ 
│ ││
+/// │   ┌─────┐   │ │    │    ...     │    ││ │├─────┤ │ ││       ││ │├─────┤ 
│ ││
+/// │   │  1  │───┼─┘    │            │    ││ │└─────┘ │ ││       ││ │└─────┘ 
│ ││
+/// │   ├─────┤   │      │            │    ││ │        │ ││       ││ │        
│ ││
+/// │   │ 13  │───┼─┐    │ ┌────────┐ │    ││ │  ...   │ ││       ││ │  ...   
│ ││
+/// │   └─────┘   │ └────┼▶│  "Q"   │ │    ││ │        │ ││       ││ │        
│ ││
+/// └─────────────┘      │ └────────┘ │    ││ │┌─────┐ │ ││       ││ │┌─────┐ 
│ ││
+///                      │            │    ││ │└─────┘ │ ││       ││ │└─────┘ 
│ ││
+///                      └────────────┘    ││ └────────┘ ││       ││ 
└────────┘ ││
+///                                        │└────────────┘│       
│└────────────┘│
+///                                        └──────────────┘       
└──────────────┘
+///
+///       map            group_values                   accumulators
+///  (Hash Table)
+///
+///  ```
+///
+/// For example, given a query like `COUNT(x), SUM(y) ... GROUP BY z`,
+/// `group_values` will store the distinct values of `z`. There will
+/// be one accumulator for `COUNT(x)`, specialized for the data type
+/// of `x` and one accumulator for `SUM(y)`, specialized for the data
+/// type of `y`.
+///
+/// # Description
+///
+/// The hash table stores "group indices", one for each (distinct)
+/// group value.
+///
+/// The group values are stored in [`Self::group_values`] at the
+/// corresponding group index.
+///
+/// The accumulator state (e.g partial sums) is managed by and stored
+/// by a [`GroupsAccumulator`] accumulator. There is one accumulator
+/// per aggregate expression (COUNT, AVG, etc) in the
+/// query. Internally, each `GroupsAccumulator` manages the state for
+/// multiple groups, and is passed `group_indexes` during update. Note
+/// The accumulator state is not managed by this operator (e.g in the
+/// hash table).
 pub(crate) struct GroupedHashAggregateStream2 {
     schema: SchemaRef,
     input: SendableRecordBatchStream,
     mode: AggregateMode,
 
     /// Accumulators, one for each `AggregateExpr` in the query
+    ///
+    /// For example, if the query has aggregates, `SUM(x)`,
+    /// `COUNT(y)`, there will be two accumulators, each one
+    /// specialized for that partcular aggregate and its input types
     accumulators: Vec<Box<dyn GroupsAccumulator>>,
-    /// Arguments expressionf or each accumulator
+
+    /// Arguments or each accumulator.
     aggregate_arguments: Vec<Vec<Arc<dyn PhysicalExpr>>>,
-    /// Filter expression to evaluate for each aggregate
+
+    /// Optional filter expression to evaluate, one for each for
+    /// aggregate. If present, only those rows for which the filter
+    /// evaluate to true should be included in the aggregate results.
+    ///
+    /// For example, for an aggregate like `SUM(x FILTER x > 100)`,
+    /// the filter expression is  `x > 100`.
     filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
 
     /// Converter for each row
     row_converter: RowConverter,
+
+    /// GROUP BY expressions
     group_by: PhysicalGroupBy,
 
     /// The memory reservation for this grouping
     reservation: MemoryReservation,
 
-    /// Logically maps group values to a group_index `group_states`
+    /// Logically maps group values to a group_index in
+    /// [`Self::group_values`] and in each accumulator
     ///
     /// Uses the raw API of hashbrown to avoid actually storing the
-    /// keys in the table
+    /// keys (group values) in the table
     ///
     /// keys: u64 hashes of the GroupValue
-    /// values: (hash, index into `group_states`)
+    /// values: (hash, group_index)
     map: RawTable<(u64, usize)>,
 
-    /// The actual group by values, stored in arrow Row format
-    /// the index of group_by_values is the index
-    /// https://github.com/apache/arrow-rs/issues/4466
-    group_by_values: Vec<OwnedRow>,
+    /// The actual group by values, stored in arrow [`Row`] format. The
+    /// group_values[i] holds the group value for group_index `i`.
+    ///
+    /// The row format is used to compare group keys quickly. This is
+    /// especially important for multi-column group keys.
+    ///
+    /// TODO, make this Rows (rather than Vec<OwnedRow> to reduce
+    /// allocations once
+    /// https://github.com/apache/arrow-rs/issues/4466 is available
+    group_values: Vec<OwnedRow>,
 
-    /// scratch space for the current Batch / Aggregate being
-    /// processed. Saved here to avoid reallocations
+    /// scratch space for the current input Batch being
+    /// processed. Reused across batches here to avoid reallocations
     current_group_indices: Vec<usize>,
 
-    /// generating input/output?
+    /// Tracks if this stream is generating input/output?
     exec_state: ExecutionState,
 
+    /// Execution metrics
     baseline_metrics: BaselineMetrics,
 
+    /// Random state for creating hashes
     random_state: RandomState,
-    /// size to be used for resulting RecordBatches
+
+    /// max rows in output RecordBatches
     batch_size: usize,
 }
 
 impl GroupedHashAggregateStream2 {
-    /// Create a new GroupedHashAggregateStream
+    /// Create a new GroupedHashAggregateStream2
     pub fn new(
         agg: &AggregateExec,
         context: Arc<TaskContext>,
@@ -137,15 +207,14 @@ impl GroupedHashAggregateStream2 {
         let mut aggregate_exprs = vec![];
         let mut aggregate_arguments = vec![];
 
-        // The expressions to evaluate the batch, one vec of expressions per 
aggregation.
-        // Assuming create_schema() always puts group columns in front of 
aggregation columns, we set
-        // col_idx_base to the group expression count.
-
+        // The arguments for each aggregate, one vec of expressions
+        // per aggregation.
         let all_aggregate_expressions = aggregates::aggregate_expressions(
             &agg.aggr_expr,
             &agg.mode,
             agg_group_by.expr.len(),
         )?;
+
         let filter_expressions = match agg.mode {
             AggregateMode::Partial | AggregateMode::Single => agg_filter_expr,
             AggregateMode::Final | AggregateMode::FinalPartitioned => {
@@ -194,7 +263,7 @@ impl GroupedHashAggregateStream2 {
             group_by: agg_group_by,
             reservation,
             map,
-            group_by_values,
+            group_values: group_by_values,
             current_group_indices,
             exec_state,
             baseline_metrics,
@@ -204,7 +273,8 @@ impl GroupedHashAggregateStream2 {
     }
 }
 
-/// Crate a `GroupsAccumulator` for each of the aggregate_exprs to hold the 
aggregation state
+/// Crate a [`GroupsAccumulator`] for each of the aggregate_exprs to
+/// hold the aggregation state
 fn create_accumulators(
     aggregate_exprs: Vec<Arc<dyn AggregateExpr>>,
 ) -> Result<Vec<Box<dyn GroupsAccumulator>>> {
@@ -326,7 +396,7 @@ impl GroupedHashAggregateStream2 {
                 // TODO update *allocated based on size of the row
                 // that was just pushed into
                 // aggr_state.group_by_values
-                group_rows.row(row) == self.group_by_values[*group_idx].row()
+                group_rows.row(row) == self.group_values[*group_idx].row()
             });
 
             let group_idx = match entry {
@@ -335,8 +405,8 @@ impl GroupedHashAggregateStream2 {
                 //  1.2 Need to create new entry for the group
                 None => {
                     // Add new entry to aggr_state and save newly created index
-                    let group_idx = self.group_by_values.len();
-                    self.group_by_values.push(group_rows.row(row).owned());
+                    let group_idx = self.group_values.len();
+                    self.group_values.push(group_rows.row(row).owned());
 
                     // for hasher function, use precomputed hash value
                     self.map.insert_accounted(
@@ -382,7 +452,7 @@ impl GroupedHashAggregateStream2 {
                 .zip(input_values.iter())
                 .zip(filter_values.iter());
 
-            let total_num_groups = self.group_by_values.len();
+            let total_num_groups = self.group_values.len();
 
             for ((acc, values), opt_filter) in t {
                 let acc_size_pre = acc.size();
@@ -424,13 +494,13 @@ impl GroupedHashAggregateStream2 {
 impl GroupedHashAggregateStream2 {
     /// Create an output RecordBatch with all group keys and accumulator 
states/values
     fn create_batch_from_map(&mut self) -> Result<RecordBatch> {
-        if self.group_by_values.is_empty() {
+        if self.group_values.is_empty() {
             let schema = self.schema.clone();
             return Ok(RecordBatch::new_empty(schema));
         }
 
         // First output rows are the groups
-        let groups_rows = self.group_by_values.iter().map(|owned_row| 
owned_row.row());
+        let groups_rows = self.group_values.iter().map(|owned_row| 
owned_row.row());
 
         let mut output: Vec<ArrayRef> = 
self.row_converter.convert_rows(groups_rows)?;
 

Reply via email to