tustvold commented on code in PR #7016:
URL: https://github.com/apache/arrow-datafusion/pull/7016#discussion_r1267407414
##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -385,102 +492,25 @@ impl RecordBatchStream for GroupedHashAggregateStream {
}
impl GroupedHashAggregateStream {
- /// Calculates the group indices for each input row of
- /// `group_values`.
- ///
- /// At the return of this function,
- /// `self.scratch_space.current_group_indices` has the same number
- /// of entries as each array in `group_values` and holds the
- /// correct group_index for that row.
- ///
- /// This is one of the core hot loops in the algorithm
- fn update_group_state(
- &mut self,
- group_values: &[ArrayRef],
- allocated: &mut usize,
- ) -> Result<()> {
- // Convert the group keys into the row format
- // Avoid reallocation when
https://github.com/apache/arrow-rs/issues/4479 is available
- let group_rows = self.row_converter.convert_columns(group_values)?;
- let n_rows = group_rows.num_rows();
-
- // track memory used
- let group_values_size_pre = self.group_values.size();
- let scratch_size_pre = self.scratch_space.size();
-
- // tracks to which group each of the input rows belongs
- let group_indices = &mut self.scratch_space.current_group_indices;
- group_indices.clear();
-
- // 1.1 Calculate the group keys for the group values
- let batch_hashes = &mut self.scratch_space.hashes_buffer;
- batch_hashes.clear();
- batch_hashes.resize(n_rows, 0);
- create_hashes(group_values, &self.random_state, batch_hashes)?;
-
- for (row, &hash) in batch_hashes.iter().enumerate() {
- let entry = self.map.get_mut(hash, |(_hash, group_idx)| {
- // verify that a group that we are inserting with hash is
- // actually the same key value as the group in
- // existing_idx (aka group_values @ row)
- group_rows.row(row) == self.group_values.row(*group_idx)
- });
-
- let group_idx = match entry {
- // Existing group_index for this group value
- Some((_hash, group_idx)) => *group_idx,
- // 1.2 Need to create new entry for the group
- None => {
- // Add new entry to aggr_state and save newly created index
- let group_idx = self.group_values.num_rows();
- self.group_values.push(group_rows.row(row));
-
- // for hasher function, use precomputed hash value
- self.map.insert_accounted(
- (hash, group_idx),
- |(hash, _group_index)| *hash,
- allocated,
- );
- group_idx
- }
- };
- group_indices.push(group_idx);
- }
-
- // account for memory growth in scratch space
- *allocated += self.scratch_space.size();
Review Comment:
I couldn't see a reason to keep track of the delta and not just use
try_resize at the end
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]