This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d23e48fc1c Minor: Add some more doc comments to
`BoundedAggregateStream` (#6881)
d23e48fc1c is described below
commit d23e48fc1cd0b988ebfca38b5d70516b33541999
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Jul 10 10:00:26 2023 -0400
Minor: Add some more doc comments to `BoundedAggregateStream` (#6881)
* Minor: Add docs to BoundedAggregateStream
* Update datafusion/core/src/physical_plan/aggregates/mod.rs
Co-authored-by: Mustafa Akur
<[email protected]>
---------
Co-authored-by: Mustafa Akur
<[email protected]>
---
datafusion/common/src/utils.rs | 7 +-
.../aggregates/bounded_aggregate_stream.rs | 84 ++++++++++++++--------
.../core/src/physical_plan/aggregates/mod.rs | 32 ++++++---
3 files changed, 83 insertions(+), 40 deletions(-)
diff --git a/datafusion/common/src/utils.rs b/datafusion/common/src/utils.rs
index 2edcd07846..c324e2079d 100644
--- a/datafusion/common/src/utils.rs
+++ b/datafusion/common/src/utils.rs
@@ -177,9 +177,10 @@ where
Ok(low)
}
-/// This function finds the partition points according to `partition_columns`.
-/// If there are no sort columns, then the result will be a single element
-/// vector containing one partition range spanning all data.
+/// Given a list of 0 or more already sorted columns, finds the
+/// partition ranges that would partition equally across columns.
+///
+/// See [`lexicographical_partition_ranges`] for more details.
pub fn evaluate_partition_ranges(
num_rows: usize,
partition_columns: &[SortColumn],
diff --git
a/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
b/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
index 4bbac3c4a5..a89ef3aaff 100644
--- a/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
+++ b/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
@@ -115,7 +115,9 @@ pub(crate) struct BoundedAggregateStream {
/// first element in the array corresponds to normal accumulators
/// second element in the array corresponds to row accumulators
indices: [Vec<Range<usize>>; 2],
+ /// Information on how the input of this group is ordered
aggregation_ordering: AggregationOrdering,
+ /// Has this stream finished producing output
is_end: bool,
}
@@ -275,11 +277,11 @@ impl Stream for BoundedAggregateStream {
}
// inner had error, return to caller
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
- // inner is done, producing output
+ // inner is done, switch to producing output
None => {
- for element in
self.aggr_state.ordered_group_states.iter_mut()
- {
- element.status = GroupStatus::CanEmit;
+ let states =
self.aggr_state.ordered_group_states.iter_mut();
+ for state in states {
+ state.status = GroupStatus::CanEmit;
}
self.exec_state = ExecutionState::ProducingOutput;
}
@@ -297,6 +299,7 @@ impl Stream for BoundedAggregateStream {
Ok(Some(result)) => {
let batch =
result.record_output(&self.baseline_metrics);
self.row_group_skip_position += batch.num_rows();
+ // try to read more input
self.exec_state = ExecutionState::ReadingInput;
self.prune();
return Poll::Ready(Some(Ok(batch)));
@@ -325,18 +328,22 @@ impl RecordBatchStream for BoundedAggregateStream {
/// indices for a group. This information is used when executing streaming
/// GROUP BY calculations.
struct GroupOrderInfo {
+ /// The group by key
owned_row: OwnedRow,
+ /// the hash value of the group
hash: u64,
+ /// the range of row indices in the input batch that belong to this group
range: Range<usize>,
}
impl BoundedAggregateStream {
- // Update the aggr_state according to group_by values (result of
group_by_expressions) when group by
- // expressions are fully ordered.
- fn update_ordered_group_state(
+ /// Update the aggr_state hash table according to group_by values
+ /// (result of group_by_expressions) when group by expressions are
+ /// fully ordered.
+ fn update_fully_ordered_group_state(
&mut self,
group_values: &[ArrayRef],
- per_group_indices: Vec<GroupOrderInfo>,
+ per_group_order_info: Vec<GroupOrderInfo>,
allocated: &mut usize,
) -> Result<Vec<usize>> {
// 1.1 construct the key from the group values
@@ -348,7 +355,7 @@ impl BoundedAggregateStream {
let AggregationState {
map: row_map,
- ordered_group_states: row_group_states,
+ ordered_group_states,
..
} = &mut self.aggr_state;
@@ -356,13 +363,13 @@ impl BoundedAggregateStream {
owned_row,
hash,
range,
- } in per_group_indices
+ } in per_group_order_info
{
let entry = row_map.get_mut(hash, |(_hash, group_idx)| {
// verify that a group that we are inserting with hash is
// actually the same key value as the group in
// existing_idx (aka group_values @ row)
- let ordered_group_state = &row_group_states[*group_idx];
+ let ordered_group_state = &ordered_group_states[*group_idx];
let group_state = &ordered_group_state.group_state;
owned_row.row() == group_state.group_by_values.row()
});
@@ -370,7 +377,7 @@ impl BoundedAggregateStream {
match entry {
// Existing entry for this group value
Some((_hash, group_idx)) => {
- let group_state = &mut
row_group_states[*group_idx].group_state;
+ let group_state = &mut
ordered_group_states[*group_idx].group_state;
// 1.3
if group_state.indices.is_empty() {
@@ -385,6 +392,7 @@ impl BoundedAggregateStream {
None => {
let accumulator_set =
aggregates::create_accumulators(&self.normal_aggr_expr)?;
+ // Save the value of the ordering columns as
Vec<ScalarValue>
let row = get_row_at_idx(group_values, range.start)?;
let ordered_columns = self
.aggregation_ordering
@@ -392,6 +400,7 @@ impl BoundedAggregateStream {
.iter()
.map(|idx| row[*idx].clone())
.collect::<Vec<_>>();
+
// Add new entry to group_states and save newly created
index
let group_state = GroupState {
group_by_values: owned_row,
@@ -403,7 +412,7 @@ impl BoundedAggregateStream {
indices: (range.start as u32..range.end as u32)
.collect::<Vec<_>>(), // 1.3
};
- let group_idx = row_group_states.len();
+ let group_idx = ordered_group_states.len();
// NOTE: do NOT include the `RowGroupState` struct size in
here because this is captured by
// `group_states` (see allocation down below)
@@ -431,10 +440,10 @@ impl BoundedAggregateStream {
let ordered_group_state = OrderedGroupState {
group_state,
ordered_columns,
- status: GroupStatus::GroupProgress,
+ status: GroupStatus::GroupInProgress,
hash,
};
- row_group_states.push_accounted(ordered_group_state,
allocated);
+ ordered_group_states.push_accounted(ordered_group_state,
allocated);
groups_with_rows.push(group_idx);
}
@@ -538,7 +547,7 @@ impl BoundedAggregateStream {
let ordered_group_state = OrderedGroupState {
group_state,
ordered_columns,
- status: GroupStatus::GroupProgress,
+ status: GroupStatus::GroupInProgress,
hash,
};
group_states.push_accounted(ordered_group_state,
allocated);
@@ -707,6 +716,7 @@ impl BoundedAggregateStream {
let row_converter_size_pre = self.row_converter.size();
for group_values in &group_by_values {
+ // If the input is fully sorted on its grouping keys
let groups_with_rows = if let AggregationOrdering {
mode: GroupByOrderMode::FullyOrdered,
order_indices,
@@ -727,8 +737,9 @@ impl BoundedAggregateStream {
})
.collect::<Vec<_>>();
let n_rows = group_rows.num_rows();
+ // determine the boundaries between groups
let ranges = evaluate_partition_ranges(n_rows, &sort_column)?;
- let per_group_indices = ranges
+ let per_group_order_info = ranges
.into_iter()
.map(|range| GroupOrderInfo {
owned_row: group_rows.row(range.start).owned(),
@@ -736,9 +747,9 @@ impl BoundedAggregateStream {
range,
})
.collect::<Vec<_>>();
- self.update_ordered_group_state(
+ self.update_fully_ordered_group_state(
group_values,
- per_group_indices,
+ per_group_order_info,
&mut allocated,
)?
} else {
@@ -835,22 +846,30 @@ impl BoundedAggregateStream {
}
}
+/// Tracks the state of the ordered grouping
#[derive(Debug, PartialEq)]
enum GroupStatus {
- // `GroupProgress` means data for current group is not complete. New data
may arrive.
- GroupProgress,
- // `CanEmit` means data for current group is completed. And its result can
emitted.
+ /// Data for current group is not complete, and new data may yet
+ /// arrive.
+ GroupInProgress,
+ /// Data for current group is completed, and its result can emitted.
CanEmit,
- // Emitted means that result for the groups is outputted. Group can be
pruned from state.
+ /// Result for the groups has been successfully emitted, and group
+ /// state can be pruned.
Emitted,
}
-/// The state that is built for each output group.
+/// Information about the order of the state that is built for each
+/// output group.
#[derive(Debug)]
pub struct OrderedGroupState {
+ /// Aggregate values
group_state: GroupState,
+ /// The actual value of the ordered columns for this group
ordered_columns: Vec<ScalarValue>,
+ /// Can we emit this group?
status: GroupStatus,
+ /// Hash value of the group
hash: u64,
}
@@ -883,16 +902,23 @@ impl std::fmt::Debug for AggregationState {
}
impl BoundedAggregateStream {
- /// Prune the groups from the `self.aggr_state.group_states` which are in
- /// `GroupStatus::Emitted`(this status means that result of this group
emitted/outputted already, and
- /// we are sure that these groups cannot receive new rows.) status.
+ /// Prune the groups from `[Self::ordered_group_states]` which are in
+ /// [`GroupStatus::Emitted`].
+ ///
+ /// Emitted means that the result of this group has already been
+ /// emitted, and we are sure that these groups can not receive new
+ /// rows.
fn prune(&mut self) {
+ // clear out emitted groups
let n_partition = self.aggr_state.ordered_group_states.len();
self.aggr_state
.ordered_group_states
.retain(|elem| elem.status != GroupStatus::Emitted);
+
let n_partition_new = self.aggr_state.ordered_group_states.len();
let n_pruned = n_partition - n_partition_new;
+
+ // update hash table with the new indexes of the remaining groups
self.aggr_state.map.clear();
for (idx, item) in
self.aggr_state.ordered_group_states.iter().enumerate() {
self.aggr_state
@@ -920,7 +946,9 @@ impl BoundedAggregateStream {
);
let group_state_chunk =
&self.aggr_state.ordered_group_states[skip_items..end_idx];
- // Consider only the groups that can be emitted. (The ones we are sure
that will not receive new entry.)
+
+ // Consider only the groups that can be emitted. (The ones we
+ // are sure that will not receive new entry.)
let group_state_chunk = group_state_chunk
.iter()
.filter(|item| item.status == GroupStatus::CanEmit)
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs
b/datafusion/core/src/physical_plan/aggregates/mod.rs
index 4bf5f66445..60d483d8c8 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -81,19 +81,33 @@ pub enum AggregateMode {
}
/// Group By expression modes
+///
+/// `PartiallyOrdered` and `FullyOrdered` are used to reason about
+/// when certain group by keys will never again be seen (and thus can
+/// be emitted by the grouping operator).
+///
+/// Specifically, each distinct combination of the relevant columns
+/// are contiguous in the input, and once a new combination is seen
+/// previous combinations are guaranteed never to appear again
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum GroupByOrderMode {
- /// None of the expressions in the GROUP BY clause have an ordering.
+ /// The input is not (known to be) ordered by any of the
+ /// expressions in the GROUP BY clause.
None,
- /// Some of the expressions in the GROUP BY clause have an ordering.
- // For example, if the input is ordered by a, b, c and we group by b, a, d;
- // the mode will be `PartiallyOrdered` meaning a subset of group b, a, d
- // defines a preset for the existing ordering, e.g a, b defines a preset.
+ /// The input is known to be ordered by a preset (prefix but
+ /// possibly reordered) of the expressions in the `GROUP BY` clause.
+ ///
+ /// For example, if the input is ordered by `a, b, c` and we group
+ /// by `b, a, d`, `PartiallyOrdered` means a subset of group `b,
+ /// a, d` defines a preset for the existing ordering, in this case
+ /// `a, b`.
PartiallyOrdered,
- /// All the expressions in the GROUP BY clause have orderings.
- // For example, if the input is ordered by a, b, c, d and we group by b, a;
- // the mode will be `Ordered` meaning a all of the of group b, d
- // defines a preset for the existing ordering, e.g a, b defines a preset.
+ /// The input is known to be ordered by *all* the expressions in the
+ /// `GROUP BY` clause.
+ ///
+ /// For example, if the input is ordered by `a, b, c, d` and we group by
b, a,
+ /// `Ordered` means that all of the of group by expressions appear
+ /// as a preset for the existing ordering, in this case `a, b`.
FullyOrdered,
}