This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new d23e48fc1c Minor: Add some more doc comments to 
`BoundedAggregateStream` (#6881)
d23e48fc1c is described below

commit d23e48fc1cd0b988ebfca38b5d70516b33541999
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Jul 10 10:00:26 2023 -0400

    Minor: Add some more doc comments to `BoundedAggregateStream` (#6881)
    
    * Minor: Add docs to BoundedAggregateStream
    
    * Update datafusion/core/src/physical_plan/aggregates/mod.rs
    
    Co-authored-by: Mustafa Akur 
<[email protected]>
    
    ---------
    
    Co-authored-by: Mustafa Akur 
<[email protected]>
---
 datafusion/common/src/utils.rs                     |  7 +-
 .../aggregates/bounded_aggregate_stream.rs         | 84 ++++++++++++++--------
 .../core/src/physical_plan/aggregates/mod.rs       | 32 ++++++---
 3 files changed, 83 insertions(+), 40 deletions(-)

diff --git a/datafusion/common/src/utils.rs b/datafusion/common/src/utils.rs
index 2edcd07846..c324e2079d 100644
--- a/datafusion/common/src/utils.rs
+++ b/datafusion/common/src/utils.rs
@@ -177,9 +177,10 @@ where
     Ok(low)
 }
 
-/// This function finds the partition points according to `partition_columns`.
-/// If there are no sort columns, then the result will be a single element
-/// vector containing one partition range spanning all data.
+/// Given a list of 0 or more already sorted columns, finds the
+/// partition ranges that would partition equally across columns.
+///
+/// See [`lexicographical_partition_ranges`] for more details.
 pub fn evaluate_partition_ranges(
     num_rows: usize,
     partition_columns: &[SortColumn],
diff --git 
a/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs 
b/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
index 4bbac3c4a5..a89ef3aaff 100644
--- a/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
+++ b/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
@@ -115,7 +115,9 @@ pub(crate) struct BoundedAggregateStream {
     /// first element in the array corresponds to normal accumulators
     /// second element in the array corresponds to row accumulators
     indices: [Vec<Range<usize>>; 2],
+    /// Information on how the input of this group is ordered
     aggregation_ordering: AggregationOrdering,
+    /// Has this stream finished producing output
     is_end: bool,
 }
 
@@ -275,11 +277,11 @@ impl Stream for BoundedAggregateStream {
                         }
                         // inner had error, return to caller
                         Some(Err(e)) => return Poll::Ready(Some(Err(e))),
-                        // inner is done, producing output
+                        // inner is done, switch to producing output
                         None => {
-                            for element in 
self.aggr_state.ordered_group_states.iter_mut()
-                            {
-                                element.status = GroupStatus::CanEmit;
+                            let states = 
self.aggr_state.ordered_group_states.iter_mut();
+                            for state in states {
+                                state.status = GroupStatus::CanEmit;
                             }
                             self.exec_state = ExecutionState::ProducingOutput;
                         }
@@ -297,6 +299,7 @@ impl Stream for BoundedAggregateStream {
                         Ok(Some(result)) => {
                             let batch = 
result.record_output(&self.baseline_metrics);
                             self.row_group_skip_position += batch.num_rows();
+                            // try to read more input
                             self.exec_state = ExecutionState::ReadingInput;
                             self.prune();
                             return Poll::Ready(Some(Ok(batch)));
@@ -325,18 +328,22 @@ impl RecordBatchStream for BoundedAggregateStream {
 /// indices for a group. This information is used when executing streaming
 /// GROUP BY calculations.
 struct GroupOrderInfo {
+    /// The group by key
     owned_row: OwnedRow,
+    /// the hash value of the group
     hash: u64,
+    /// the range of row indices in the input batch that belong to this group
     range: Range<usize>,
 }
 
 impl BoundedAggregateStream {
-    // Update the aggr_state according to group_by values (result of 
group_by_expressions) when group by
-    // expressions are fully ordered.
-    fn update_ordered_group_state(
+    /// Update the aggr_state hash table according to group_by values
+    /// (result of group_by_expressions) when group by expressions are
+    /// fully ordered.
+    fn update_fully_ordered_group_state(
         &mut self,
         group_values: &[ArrayRef],
-        per_group_indices: Vec<GroupOrderInfo>,
+        per_group_order_info: Vec<GroupOrderInfo>,
         allocated: &mut usize,
     ) -> Result<Vec<usize>> {
         // 1.1 construct the key from the group values
@@ -348,7 +355,7 @@ impl BoundedAggregateStream {
 
         let AggregationState {
             map: row_map,
-            ordered_group_states: row_group_states,
+            ordered_group_states,
             ..
         } = &mut self.aggr_state;
 
@@ -356,13 +363,13 @@ impl BoundedAggregateStream {
             owned_row,
             hash,
             range,
-        } in per_group_indices
+        } in per_group_order_info
         {
             let entry = row_map.get_mut(hash, |(_hash, group_idx)| {
                 // verify that a group that we are inserting with hash is
                 // actually the same key value as the group in
                 // existing_idx  (aka group_values @ row)
-                let ordered_group_state = &row_group_states[*group_idx];
+                let ordered_group_state = &ordered_group_states[*group_idx];
                 let group_state = &ordered_group_state.group_state;
                 owned_row.row() == group_state.group_by_values.row()
             });
@@ -370,7 +377,7 @@ impl BoundedAggregateStream {
             match entry {
                 // Existing entry for this group value
                 Some((_hash, group_idx)) => {
-                    let group_state = &mut 
row_group_states[*group_idx].group_state;
+                    let group_state = &mut 
ordered_group_states[*group_idx].group_state;
 
                     // 1.3
                     if group_state.indices.is_empty() {
@@ -385,6 +392,7 @@ impl BoundedAggregateStream {
                 None => {
                     let accumulator_set =
                         
aggregates::create_accumulators(&self.normal_aggr_expr)?;
+                    // Save the value of the ordering columns as 
Vec<ScalarValue>
                     let row = get_row_at_idx(group_values, range.start)?;
                     let ordered_columns = self
                         .aggregation_ordering
@@ -392,6 +400,7 @@ impl BoundedAggregateStream {
                         .iter()
                         .map(|idx| row[*idx].clone())
                         .collect::<Vec<_>>();
+
                     // Add new entry to group_states and save newly created 
index
                     let group_state = GroupState {
                         group_by_values: owned_row,
@@ -403,7 +412,7 @@ impl BoundedAggregateStream {
                         indices: (range.start as u32..range.end as u32)
                             .collect::<Vec<_>>(), // 1.3
                     };
-                    let group_idx = row_group_states.len();
+                    let group_idx = ordered_group_states.len();
 
                     // NOTE: do NOT include the `RowGroupState` struct size in 
here because this is captured by
                     // `group_states` (see allocation down below)
@@ -431,10 +440,10 @@ impl BoundedAggregateStream {
                     let ordered_group_state = OrderedGroupState {
                         group_state,
                         ordered_columns,
-                        status: GroupStatus::GroupProgress,
+                        status: GroupStatus::GroupInProgress,
                         hash,
                     };
-                    row_group_states.push_accounted(ordered_group_state, 
allocated);
+                    ordered_group_states.push_accounted(ordered_group_state, 
allocated);
 
                     groups_with_rows.push(group_idx);
                 }
@@ -538,7 +547,7 @@ impl BoundedAggregateStream {
                     let ordered_group_state = OrderedGroupState {
                         group_state,
                         ordered_columns,
-                        status: GroupStatus::GroupProgress,
+                        status: GroupStatus::GroupInProgress,
                         hash,
                     };
                     group_states.push_accounted(ordered_group_state, 
allocated);
@@ -707,6 +716,7 @@ impl BoundedAggregateStream {
 
         let row_converter_size_pre = self.row_converter.size();
         for group_values in &group_by_values {
+            // If the input is fully sorted on its grouping keys
             let groups_with_rows = if let AggregationOrdering {
                 mode: GroupByOrderMode::FullyOrdered,
                 order_indices,
@@ -727,8 +737,9 @@ impl BoundedAggregateStream {
                     })
                     .collect::<Vec<_>>();
                 let n_rows = group_rows.num_rows();
+                // determine the boundaries between groups
                 let ranges = evaluate_partition_ranges(n_rows, &sort_column)?;
-                let per_group_indices = ranges
+                let per_group_order_info = ranges
                     .into_iter()
                     .map(|range| GroupOrderInfo {
                         owned_row: group_rows.row(range.start).owned(),
@@ -736,9 +747,9 @@ impl BoundedAggregateStream {
                         range,
                     })
                     .collect::<Vec<_>>();
-                self.update_ordered_group_state(
+                self.update_fully_ordered_group_state(
                     group_values,
-                    per_group_indices,
+                    per_group_order_info,
                     &mut allocated,
                 )?
             } else {
@@ -835,22 +846,30 @@ impl BoundedAggregateStream {
     }
 }
 
+/// Tracks the state of the ordered grouping
 #[derive(Debug, PartialEq)]
 enum GroupStatus {
-    // `GroupProgress` means data for current group is not complete. New data 
may arrive.
-    GroupProgress,
-    // `CanEmit` means data for current group is completed. And its result can 
emitted.
+    /// Data for current group is not complete, and new data may yet
+    /// arrive.
+    GroupInProgress,
+    /// Data for current group is completed, and its result can emitted.
     CanEmit,
-    // Emitted means that result for the groups is outputted. Group can be 
pruned from state.
+    /// Result for the groups has been successfully emitted, and group
+    /// state can be pruned.
     Emitted,
 }
 
-/// The state that is built for each output group.
+/// Information about the order of the state that is built for each
+/// output group.
 #[derive(Debug)]
 pub struct OrderedGroupState {
+    /// Aggregate values
     group_state: GroupState,
+    /// The actual value of the ordered columns for this group
     ordered_columns: Vec<ScalarValue>,
+    /// Can we emit this group?
     status: GroupStatus,
+    /// Hash value of the group
     hash: u64,
 }
 
@@ -883,16 +902,23 @@ impl std::fmt::Debug for AggregationState {
 }
 
 impl BoundedAggregateStream {
-    /// Prune the groups from the `self.aggr_state.group_states` which are in
-    /// `GroupStatus::Emitted`(this status means that result of this group 
emitted/outputted already, and
-    /// we are sure that these groups cannot receive new rows.) status.
+    /// Prune the groups from `[Self::ordered_group_states]` which are in
+    /// [`GroupStatus::Emitted`].
+    ///
+    /// Emitted means that the result of this group has already been
+    /// emitted, and we are sure that these groups can not receive new
+    /// rows.
     fn prune(&mut self) {
+        // clear out emitted groups
         let n_partition = self.aggr_state.ordered_group_states.len();
         self.aggr_state
             .ordered_group_states
             .retain(|elem| elem.status != GroupStatus::Emitted);
+
         let n_partition_new = self.aggr_state.ordered_group_states.len();
         let n_pruned = n_partition - n_partition_new;
+
+        // update hash table with the new indexes of the remaining groups
         self.aggr_state.map.clear();
         for (idx, item) in 
self.aggr_state.ordered_group_states.iter().enumerate() {
             self.aggr_state
@@ -920,7 +946,9 @@ impl BoundedAggregateStream {
         );
         let group_state_chunk =
             &self.aggr_state.ordered_group_states[skip_items..end_idx];
-        // Consider only the groups that can be emitted. (The ones we are sure 
that will not receive new entry.)
+
+        // Consider only the groups that can be emitted. (The ones we
+        // are sure that will not receive new entry.)
         let group_state_chunk = group_state_chunk
             .iter()
             .filter(|item| item.status == GroupStatus::CanEmit)
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs 
b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 4bf5f66445..60d483d8c8 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -81,19 +81,33 @@ pub enum AggregateMode {
 }
 
 /// Group By expression modes
+///
+/// `PartiallyOrdered` and `FullyOrdered` are used to reason about
+/// when certain group by keys will never again be seen (and thus can
+/// be emitted by the grouping operator).
+///
+/// Specifically, each distinct combination of the relevant columns
+/// are contiguous in the input, and once a new combination is seen
+/// previous combinations are guaranteed never to appear again
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub enum GroupByOrderMode {
-    /// None of the expressions in the GROUP BY clause have an ordering.
+    /// The input is not (known to be) ordered by any of the
+    /// expressions in the GROUP BY clause.
     None,
-    /// Some of the expressions in the GROUP BY clause have an ordering.
-    // For example, if the input is ordered by a, b, c and we group by b, a, d;
-    // the mode will be `PartiallyOrdered` meaning a subset of group b, a, d
-    // defines a preset for the existing ordering, e.g a, b defines a preset.
+    /// The input is known to be ordered by a preset (prefix but
+    /// possibly reordered) of the expressions in the `GROUP BY` clause.
+    ///
+    /// For example, if the input is ordered by `a, b, c` and we group
+    /// by `b, a, d`, `PartiallyOrdered` means a subset of group `b,
+    /// a, d` defines a preset for the existing ordering, in this case
+    /// `a, b`.
     PartiallyOrdered,
-    /// All the expressions in the GROUP BY clause have orderings.
-    // For example, if the input is ordered by a, b, c, d and we group by b, a;
-    // the mode will be `Ordered` meaning a all of the of group b, d
-    // defines a preset for the existing ordering, e.g a, b defines a preset.
+    /// The input is known to be ordered by *all* the expressions in the
+    /// `GROUP BY` clause.
+    ///
+    /// For example, if the input is ordered by `a, b, c, d` and we group by 
b, a,
+    /// `Ordered` means that all of the of group by expressions appear
+    ///  as a preset for the existing ordering, in this case `a, b`.
     FullyOrdered,
 }
 

Reply via email to