Dandandan commented on code in PR #11943: URL: https://github.com/apache/datafusion/pull/11943#discussion_r1730273664
########## datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs: ########## @@ -367,6 +379,289 @@ impl<T> VecAllocExt for Vec<T> { } } +pub trait EmitToExt { + /// Removes the number of rows from `v` required to emit the right + /// number of rows, returning a `Vec` with elements taken, and the + /// remaining values in `v`. + /// + /// This avoids copying if Self::All + fn take_needed<T>(&self, v: &mut Vec<T>) -> Vec<T>; + + /// Removes the number of rows from `blocks` required to emit, + /// returning a `Vec` with elements taken. + /// + /// The detailed behavior in different emissions: + /// - For Emit::CurrentBlock, the first block will be taken and return. + /// - For Emit::All and Emit::First, it will be only supported in `GroupStatesMode::Flat`, + /// similar as `take_needed`. + fn take_needed_from_blocks<T>( + &self, + blocks: &mut VecBlocks<T>, + mode: GroupStatesMode, + ) -> Vec<T>; +} + +impl EmitToExt for EmitTo { + fn take_needed<T>(&self, v: &mut Vec<T>) -> Vec<T> { + match self { + Self::All => { + // Take the entire vector, leave new (empty) vector + std::mem::take(v) + } + Self::First(n) => { + let split_at = min(v.len(), *n); + // get end n+1,.. values into t + let mut t = v.split_off(split_at); + // leave n+1,.. in v + std::mem::swap(v, &mut t); + t + } + Self::NextBlock(_) => unreachable!( + "can not support blocked emission in take_needed, you should use take_needed_from_blocks" + ), + } + } + + fn take_needed_from_blocks<T>( + &self, + blocks: &mut VecBlocks<T>, + mode: GroupStatesMode, + ) -> Vec<T> { + match self { + Self::All => { + debug_assert!(matches!(mode, GroupStatesMode::Flat)); + blocks.pop_first_block().unwrap_or_default() + } + Self::First(n) => { + debug_assert!(matches!(mode, GroupStatesMode::Flat)); + + let block = blocks.current_mut().unwrap(); + let split_at = min(block.len(), *n); + + // get end n+1,.. values into t + let mut t = block.split_off(split_at); + // leave n+1,.. in v + std::mem::swap(block, &mut t); + t + } + Self::NextBlock(_) => { + debug_assert!(matches!(mode, GroupStatesMode::Blocked(_))); + blocks.pop_first_block().unwrap_or_default() + } + } + } +} + +/// Blocked style group index used in blocked mode group values and accumulators +/// +/// Parts in index: +/// - High 32 bits represent `block_id` +/// - Low 32 bits represent `block_offset` +#[derive(Debug, Clone, Copy)] +pub struct BlockedGroupIndex { + pub block_id: usize, + pub block_offset: usize, +} + +impl BlockedGroupIndex { + pub fn new(group_index: usize) -> Self { + let block_id = + ((group_index as u64 >> 32) & BLOCKED_INDEX_LOW_32_BITS_MASK) as usize; + let block_offset = + ((group_index as u64) & BLOCKED_INDEX_LOW_32_BITS_MASK) as usize; + + Self { + block_id, + block_offset, + } + } + + pub fn new_from_parts(block_id: usize, block_offset: usize) -> Self { + Self { + block_id, + block_offset, + } + } + + pub fn as_packed_index(&self) -> usize { + ((((self.block_id as u64) << 32) & BLOCKED_INDEX_HIGH_32_BITS_MASK) Review Comment: I think it's a no-op in this case, so seems better to remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org