This is an automated email from the ASF dual-hosted git repository. dheres pushed a commit to branch hash_agg_spike in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
commit 814b1fc29b8b7a30cc1a9786c514f8fe5fe67c5b Author: Andrew Lamb <[email protected]> AuthorDate: Sat Jul 1 04:59:30 2023 -0400 Add comments --- benchmarks/Cargo.toml | 1 + benchmarks/src/bin/tpch.rs | 3 + .../core/src/physical_plan/aggregates/row_hash2.rs | 136 ++++++++++++++++----- 3 files changed, 107 insertions(+), 33 deletions(-) diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 1d360739d8..f8ca0b2496 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -37,6 +37,7 @@ arrow = { workspace = true } datafusion = { path = "../datafusion/core", version = "27.0.0" } env_logger = "0.10" futures = "0.3" +log = "^0.4" mimalloc = { version = "0.1", optional = true, default-features = false } num_cpus = "1.13.0" parquet = { workspace = true } diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 32359dc1f8..d18dd38b9b 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -16,7 +16,9 @@ // under the License. //! Benchmark derived from TPC-H. This is not an official TPC-H benchmark. +use log::info; +use arrow::util::pretty::pretty_format_batches; use datafusion::datasource::file_format::{csv::CsvFormat, FileFormat}; use datafusion::datasource::{MemTable, TableProvider}; use datafusion::error::{DataFusionError, Result}; @@ -235,6 +237,7 @@ async fn benchmark_query( let elapsed = start.elapsed(); //.as_secs_f64() * 1000.0; let ms = elapsed.as_secs_f64() * 1000.0; millis.push(ms); + info!("output:\n\n{}\n\n", pretty_format_batches(&result)?); let row_count = result.iter().map(|b| b.num_rows()).sum(); println!( "Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows" diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash2.rs b/datafusion/core/src/physical_plan/aggregates/row_hash2.rs index 2eb058d8c5..3e9dbfe0cf 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash2.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash2.rs @@ -58,66 +58,136 @@ pub(crate) enum ExecutionState { use super::AggregateExec; -/// Grouping aggregate +/// Hash based Grouping Aggregator /// -/// For each aggregation entry, we use: -/// - [Arrow-row] represents grouping keys for fast hash computation and comparison directly on raw bytes. -/// - [GroupsAccumulator] to store per group aggregates +/// # Design Goals /// -/// The architecture is the following: +/// This structure is designed so that much can be vectorized (done in +/// a tight loop) as possible /// -/// TODO +/// # Architecture /// -/// [WordAligned]: datafusion_row::layout +/// ```text +/// +/// stores "group stores group values, internally stores aggregate +/// indexes" in arrow_row format values, for all groups +/// +/// ┌─────────────┐ ┌────────────┐ ┌──────────────┐ ┌──────────────┐ +/// │ ┌─────┐ │ │ ┌────────┐ │ │┌────────────┐│ │┌────────────┐│ +/// │ │ 5 │ │ ┌────┼▶│ "A" │ │ ││accumulator ││ ││accumulator ││ +/// │ ├─────┤ │ │ │ ├────────┤ │ ││ 0 ││ ││ N ││ +/// │ │ 9 │ │ │ │ │ "Z" │ │ ││ ┌────────┐ ││ ││ ┌────────┐ ││ +/// │ └─────┘ │ │ │ └────────┘ │ ││ │ state │ ││ ││ │ state │ ││ +/// │ ... │ │ │ │ ││ │┌─────┐ │ ││ ... ││ │┌─────┐ │ ││ +/// │ ┌─────┐ │ │ │ ... │ ││ │├─────┤ │ ││ ││ │├─────┤ │ ││ +/// │ │ 1 │───┼─┘ │ │ ││ │└─────┘ │ ││ ││ │└─────┘ │ ││ +/// │ ├─────┤ │ │ │ ││ │ │ ││ ││ │ │ ││ +/// │ │ 13 │───┼─┐ │ ┌────────┐ │ ││ │ ... │ ││ ││ │ ... │ ││ +/// │ └─────┘ │ └────┼▶│ "Q" │ │ ││ │ │ ││ ││ │ │ ││ +/// └─────────────┘ │ └────────┘ │ ││ │┌─────┐ │ ││ ││ │┌─────┐ │ ││ +/// │ │ ││ │└─────┘ │ ││ ││ │└─────┘ │ ││ +/// └────────────┘ ││ └────────┘ ││ ││ └────────┘ ││ +/// │└────────────┘│ │└────────────┘│ +/// └──────────────┘ └──────────────┘ +/// +/// map group_values accumulators +/// (Hash Table) +/// +/// ``` +/// +/// For example, given a query like `COUNT(x), SUM(y) ... GROUP BY z`, +/// `group_values` will store the distinct values of `z`. There will +/// be one accumulator for `COUNT(x)`, specialized for the data type +/// of `x` and one accumulator for `SUM(y)`, specialized for the data +/// type of `y`. +/// +/// # Description +/// +/// The hash table stores "group indices", one for each (distinct) +/// group value. +/// +/// The group values are stored in [`Self::group_values`] at the +/// corresponding group index. +/// +/// The accumulator state (e.g partial sums) is managed by and stored +/// by a [`GroupsAccumulator`] accumulator. There is one accumulator +/// per aggregate expression (COUNT, AVG, etc) in the +/// query. Internally, each `GroupsAccumulator` manages the state for +/// multiple groups, and is passed `group_indexes` during update. Note +/// The accumulator state is not managed by this operator (e.g in the +/// hash table). pub(crate) struct GroupedHashAggregateStream2 { schema: SchemaRef, input: SendableRecordBatchStream, mode: AggregateMode, /// Accumulators, one for each `AggregateExpr` in the query + /// + /// For example, if the query has aggregates, `SUM(x)`, + /// `COUNT(y)`, there will be two accumulators, each one + /// specialized for that partcular aggregate and its input types accumulators: Vec<Box<dyn GroupsAccumulator>>, - /// Arguments expressionf or each accumulator + + /// Arguments or each accumulator. aggregate_arguments: Vec<Vec<Arc<dyn PhysicalExpr>>>, - /// Filter expression to evaluate for each aggregate + + /// Optional filter expression to evaluate, one for each for + /// aggregate. If present, only those rows for which the filter + /// evaluate to true should be included in the aggregate results. + /// + /// For example, for an aggregate like `SUM(x FILTER x > 100)`, + /// the filter expression is `x > 100`. filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>, /// Converter for each row row_converter: RowConverter, + + /// GROUP BY expressions group_by: PhysicalGroupBy, /// The memory reservation for this grouping reservation: MemoryReservation, - /// Logically maps group values to a group_index `group_states` + /// Logically maps group values to a group_index in + /// [`Self::group_values`] and in each accumulator /// /// Uses the raw API of hashbrown to avoid actually storing the - /// keys in the table + /// keys (group values) in the table /// /// keys: u64 hashes of the GroupValue - /// values: (hash, index into `group_states`) + /// values: (hash, group_index) map: RawTable<(u64, usize)>, - /// The actual group by values, stored in arrow Row format - /// the index of group_by_values is the index - /// https://github.com/apache/arrow-rs/issues/4466 - group_by_values: Vec<OwnedRow>, + /// The actual group by values, stored in arrow [`Row`] format. The + /// group_values[i] holds the group value for group_index `i`. + /// + /// The row format is used to compare group keys quickly. This is + /// especially important for multi-column group keys. + /// + /// TODO, make this Rows (rather than Vec<OwnedRow> to reduce + /// allocations once + /// https://github.com/apache/arrow-rs/issues/4466 is available + group_values: Vec<OwnedRow>, - /// scratch space for the current Batch / Aggregate being - /// processed. Saved here to avoid reallocations + /// scratch space for the current input Batch being + /// processed. Reused across batches here to avoid reallocations current_group_indices: Vec<usize>, - /// generating input/output? + /// Tracks if this stream is generating input/output? exec_state: ExecutionState, + /// Execution metrics baseline_metrics: BaselineMetrics, + /// Random state for creating hashes random_state: RandomState, - /// size to be used for resulting RecordBatches + + /// max rows in output RecordBatches batch_size: usize, } impl GroupedHashAggregateStream2 { - /// Create a new GroupedHashAggregateStream + /// Create a new GroupedHashAggregateStream2 pub fn new( agg: &AggregateExec, context: Arc<TaskContext>, @@ -137,15 +207,14 @@ impl GroupedHashAggregateStream2 { let mut aggregate_exprs = vec![]; let mut aggregate_arguments = vec![]; - // The expressions to evaluate the batch, one vec of expressions per aggregation. - // Assuming create_schema() always puts group columns in front of aggregation columns, we set - // col_idx_base to the group expression count. - + // The arguments for each aggregate, one vec of expressions + // per aggregation. let all_aggregate_expressions = aggregates::aggregate_expressions( &agg.aggr_expr, &agg.mode, agg_group_by.expr.len(), )?; + let filter_expressions = match agg.mode { AggregateMode::Partial | AggregateMode::Single => agg_filter_expr, AggregateMode::Final | AggregateMode::FinalPartitioned => { @@ -194,7 +263,7 @@ impl GroupedHashAggregateStream2 { group_by: agg_group_by, reservation, map, - group_by_values, + group_values: group_by_values, current_group_indices, exec_state, baseline_metrics, @@ -204,7 +273,8 @@ impl GroupedHashAggregateStream2 { } } -/// Crate a `GroupsAccumulator` for each of the aggregate_exprs to hold the aggregation state +/// Crate a [`GroupsAccumulator`] for each of the aggregate_exprs to +/// hold the aggregation state fn create_accumulators( aggregate_exprs: Vec<Arc<dyn AggregateExpr>>, ) -> Result<Vec<Box<dyn GroupsAccumulator>>> { @@ -326,7 +396,7 @@ impl GroupedHashAggregateStream2 { // TODO update *allocated based on size of the row // that was just pushed into // aggr_state.group_by_values - group_rows.row(row) == self.group_by_values[*group_idx].row() + group_rows.row(row) == self.group_values[*group_idx].row() }); let group_idx = match entry { @@ -335,8 +405,8 @@ impl GroupedHashAggregateStream2 { // 1.2 Need to create new entry for the group None => { // Add new entry to aggr_state and save newly created index - let group_idx = self.group_by_values.len(); - self.group_by_values.push(group_rows.row(row).owned()); + let group_idx = self.group_values.len(); + self.group_values.push(group_rows.row(row).owned()); // for hasher function, use precomputed hash value self.map.insert_accounted( @@ -382,7 +452,7 @@ impl GroupedHashAggregateStream2 { .zip(input_values.iter()) .zip(filter_values.iter()); - let total_num_groups = self.group_by_values.len(); + let total_num_groups = self.group_values.len(); for ((acc, values), opt_filter) in t { let acc_size_pre = acc.size(); @@ -424,13 +494,13 @@ impl GroupedHashAggregateStream2 { impl GroupedHashAggregateStream2 { /// Create an output RecordBatch with all group keys and accumulator states/values fn create_batch_from_map(&mut self) -> Result<RecordBatch> { - if self.group_by_values.is_empty() { + if self.group_values.is_empty() { let schema = self.schema.clone(); return Ok(RecordBatch::new_empty(schema)); } // First output rows are the groups - let groups_rows = self.group_by_values.iter().map(|owned_row| owned_row.row()); + let groups_rows = self.group_values.iter().map(|owned_row| owned_row.row()); let mut output: Vec<ArrayRef> = self.row_converter.convert_rows(groups_rows)?;
