martin-g commented on code in PR #20504:
URL: https://github.com/apache/datafusion/pull/20504#discussion_r2845681154
##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -414,6 +435,293 @@ impl Accumulator for ArrayAggAccumulator {
}
}
+#[derive(Debug)]
+struct ArrayAggGroupsAccumulator {
+ datatype: DataType,
+ ignore_nulls: bool,
+ /// Input batches received via `update_batch`.
+ batches: Vec<ArrayRef>,
+ /// Per-group list of `(batch_index, row_index)` pairs into `batches`.
+ indices: Vec<Vec<(u32, u32)>>,
+ /// Number of index entries referencing each batch.
+ batch_refcounts: Vec<u32>,
+ /// Per-group array chunks from `merge_batch`.
+ merged: Vec<Vec<ArrayRef>>,
+}
+
+impl ArrayAggGroupsAccumulator {
+ fn new(datatype: DataType, ignore_nulls: bool) -> Self {
+ Self {
+ datatype,
+ ignore_nulls,
+ batches: Vec::new(),
+ indices: Vec::new(),
+ batch_refcounts: Vec::new(),
+ merged: Vec::new(),
+ }
+ }
+}
+
+impl GroupsAccumulator for ArrayAggGroupsAccumulator {
+ /// Store references to each input batch and record per-group
+ /// `(batch_index, row_index)` pairs. Materialization is deferred
+ /// to `evaluate`, which minimizes the work done per-batch.
+ fn update_batch(
+ &mut self,
+ values: &[ArrayRef],
+ group_indices: &[usize],
+ opt_filter: Option<&BooleanArray>,
+ total_num_groups: usize,
+ ) -> Result<()> {
+ assert_eq!(values.len(), 1, "single argument to update_batch");
+ let input = &values[0];
+
+ self.indices.resize_with(total_num_groups, Vec::new);
+ self.merged.resize_with(total_num_groups, Vec::new);
+
+ let nulls = if self.ignore_nulls {
+ input.logical_nulls()
+ } else {
+ None
+ };
+
+ let batch_idx = self.batches.len();
+ let mut batch_pushed = false;
+
+ for (row_idx, &group_idx) in group_indices.iter().enumerate() {
+ // Skip filtered rows
+ if let Some(filter) = opt_filter
+ && (filter.is_null(row_idx) || !filter.value(row_idx))
+ {
+ continue;
+ }
+
+ // Skip null values when ignore_nulls is set
+ if let Some(ref nulls) = nulls
+ && nulls.is_null(row_idx)
+ {
+ continue;
+ }
+
+ if !batch_pushed {
+ self.batches.push(Arc::clone(input));
+ self.batch_refcounts.push(0);
+ batch_pushed = true;
+ }
+ self.batch_refcounts[batch_idx] += 1;
+ self.indices[group_idx].push((batch_idx as u32, row_idx as u32));
+ }
+
+ Ok(())
+ }
+
+ fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
+ let emit_indices = emit_to.take_needed(&mut self.indices);
+ let emit_merged = emit_to.take_needed(&mut self.merged);
+ let num_groups = emit_indices.len();
+
+ let mut offsets = Vec::<i32>::with_capacity(num_groups + 1);
+ offsets.push(0);
+ let mut nulls_builder = NullBufferBuilder::new(num_groups);
+ let mut cur_offset = 0i32;
+
+ // Build ListArray offsets and nulls: groups with no elements
+ // are null, others occupy offsets[i]..offsets[i+1] in the
+ // flat values array.
+ for (group_indices, group_merged) in
emit_indices.iter().zip(emit_merged.iter()) {
+ let merged_len = group_merged.iter().map(|a|
a.len()).sum::<usize>();
+ let total_len = group_indices.len() + merged_len;
+
+ if total_len == 0 {
+ nulls_builder.append_null();
+ } else {
+ nulls_builder.append_non_null();
+ }
+
+ cur_offset += total_len as i32;
+ offsets.push(cur_offset);
+ }
+
+ let total_rows = cur_offset as usize;
+
+ // Build the flat values array for the output ListArray using
+ // `interleave`. `interleave` takes a list of source arrays and a list
+ // of (source_index, row_index) pairs, and gathers the referenced
+ // elements into a single output array.
+ //
+ // We assemble two inputs:
+ // 1. `sources`: all arrays that contain data we need — the
+ // input batches (from update_batch) followed by the merged
+ // chunks (from merge_batch), flattened in group order.
+ // 2. `interleave_indices`: for each group, the (source, row)
+ // pairs that select that group's elements from `sources`.
+ // Batch-indexed rows come first, then merged chunks.
+ //
+ // The resulting array is used directly as the ListArray's values
+ // buffer, with the offsets computed above slicing it into per-group
+ // lists.
+ let flat_values = if total_rows == 0 {
+ new_empty_array(&self.datatype)
+ } else {
+ let mut sources: Vec<&dyn Array> = Vec::new();
+ for batch in &self.batches {
+ sources.push(batch.as_ref());
+ }
+ let merged_source_start = sources.len();
+ for group_merged in &emit_merged {
+ for chunk in group_merged {
+ sources.push(chunk.as_ref());
+ }
+ }
+
+ let mut interleave_indices: Vec<(usize, usize)> =
+ Vec::with_capacity(total_rows);
+ let mut merged_src_idx = merged_source_start;
+ for (group_indices, group_merged) in
+ emit_indices.iter().zip(emit_merged.iter())
+ {
+ for &(batch_idx, row_idx) in group_indices {
+ interleave_indices.push((batch_idx as usize, row_idx as
usize));
+ }
+ for chunk in group_merged {
+ for row in 0..chunk.len() {
+ interleave_indices.push((merged_src_idx, row));
+ }
+ merged_src_idx += 1;
+ }
+ }
+
+ arrow::compute::interleave(&sources, &interleave_indices)?
+ };
+
+ // Release batch references that are no longer needed.
+ if emit_to == EmitTo::All {
+ self.batches.clear();
+ self.batch_refcounts.clear();
+ } else {
+ // EmitTo::First(n): remaining groups still reference
+ // `self.batches` by positional index, so we can't remove entries
+ // without invalidating those indices. Instead, when a batch's
+ // refcount reaches zero, we replace it with an empty array to free
+ // the payload buffers while keeping indices stable.
+ //
+ // Repeated partial emits can leave "tombstone" empty batches, so
we
+ // retain some vector slots until a later `EmitTo::All` reset. We
+ // could compact more aggressively (e.g., by rewriting batch
+ // indices), but it is probably not worth the cost in practice.
+ let empty = new_empty_array(&self.datatype);
+ for group_indices in &emit_indices {
+ for &(batch_idx, _) in group_indices {
+ let rc = &mut self.batch_refcounts[batch_idx as usize];
+ *rc -= 1;
+ if *rc == 0 {
+ self.batches[batch_idx as usize] = Arc::clone(&empty);
+ }
+ }
+ }
+ }
+
+ let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
+ let field = Arc::new(Field::new_list_field(self.datatype.clone(),
true));
+ let result = ListArray::new(field, offsets, flat_values,
nulls_builder.finish());
+
+ Ok(Arc::new(result))
+ }
+
+ fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
+ Ok(vec![self.evaluate(emit_to)?])
+ }
+
+ fn merge_batch(
+ &mut self,
+ values: &[ArrayRef],
+ group_indices: &[usize],
+ _opt_filter: Option<&BooleanArray>,
+ total_num_groups: usize,
+ ) -> Result<()> {
+ assert_eq!(values.len(), 1, "one argument to merge_batch");
+ let input_list = values[0].as_list::<i32>();
+
+ self.indices.resize_with(total_num_groups, Vec::new);
+ self.merged.resize_with(total_num_groups, Vec::new);
+
+ for (row_idx, &group_idx) in group_indices.iter().enumerate() {
+ if input_list.is_null(row_idx) {
+ continue;
+ }
+ let list_value = input_list.value(row_idx);
+ if !list_value.is_empty() {
+ self.merged[group_idx].push(list_value);
+ }
+ }
+
+ Ok(())
+ }
+
+ fn convert_to_state(
+ &self,
+ values: &[ArrayRef],
+ opt_filter: Option<&BooleanArray>,
+ ) -> Result<Vec<ArrayRef>> {
+ assert_eq!(values.len(), 1, "one argument to convert_to_state");
+
+ let input = &values[0];
+
+ // Each row becomes a 1-element list: offsets are [0, 1, 2, ..., n].
+ let offsets = OffsetBuffer::from_repeated_length(1, input.len());
+
+ // Filtered rows become null list entries, which merge_batch
+ // will skip.
+ let filter_nulls = opt_filter.and_then(filter_to_nulls);
+
+ // With ignore_nulls, null values also become null list entries.
Without
+ // ignore_nulls, null values stay as [NULL] so merge_batch retains
them.
+ let nulls = if self.ignore_nulls {
+ let logical = input.logical_nulls();
+ NullBuffer::union(filter_nulls.as_ref(), logical.as_ref())
+ } else {
+ filter_nulls
+ };
+
+ let field = Arc::new(Field::new_list_field(self.datatype.clone(),
true));
+ let list_array = ListArray::new(field, offsets, Arc::clone(input),
nulls);
+
+ Ok(vec![Arc::new(list_array)])
+ }
+
+ fn supports_convert_to_state(&self) -> bool {
+ true
+ }
+
+ fn size(&self) -> usize {
+ self.batches
+ .iter()
+ .map(|arr| arr.get_array_memory_size())
Review Comment:
Here you use `arr.get_array_memory_size()` and below (line 716) -
`arr.to_data().get_slice_memory_size().unwrap_or_default()`.
I believe the latter is the correct one.
##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -414,6 +435,293 @@ impl Accumulator for ArrayAggAccumulator {
}
}
+#[derive(Debug)]
+struct ArrayAggGroupsAccumulator {
+ datatype: DataType,
+ ignore_nulls: bool,
+ /// Input batches received via `update_batch`.
+ batches: Vec<ArrayRef>,
+ /// Per-group list of `(batch_index, row_index)` pairs into `batches`.
+ indices: Vec<Vec<(u32, u32)>>,
+ /// Number of index entries referencing each batch.
+ batch_refcounts: Vec<u32>,
+ /// Per-group array chunks from `merge_batch`.
+ merged: Vec<Vec<ArrayRef>>,
+}
+
+impl ArrayAggGroupsAccumulator {
+ fn new(datatype: DataType, ignore_nulls: bool) -> Self {
+ Self {
+ datatype,
+ ignore_nulls,
+ batches: Vec::new(),
+ indices: Vec::new(),
+ batch_refcounts: Vec::new(),
+ merged: Vec::new(),
+ }
+ }
+}
+
+impl GroupsAccumulator for ArrayAggGroupsAccumulator {
+ /// Store references to each input batch and record per-group
+ /// `(batch_index, row_index)` pairs. Materialization is deferred
+ /// to `evaluate`, which minimizes the work done per-batch.
+ fn update_batch(
+ &mut self,
+ values: &[ArrayRef],
+ group_indices: &[usize],
+ opt_filter: Option<&BooleanArray>,
+ total_num_groups: usize,
+ ) -> Result<()> {
+ assert_eq!(values.len(), 1, "single argument to update_batch");
Review Comment:
```suggestion
assert_eq_or_internal_err!(values.len(), 1, "single argument to
update_batch");
```
##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -414,6 +435,293 @@ impl Accumulator for ArrayAggAccumulator {
}
}
+#[derive(Debug)]
+struct ArrayAggGroupsAccumulator {
+ datatype: DataType,
+ ignore_nulls: bool,
+ /// Input batches received via `update_batch`.
+ batches: Vec<ArrayRef>,
+ /// Per-group list of `(batch_index, row_index)` pairs into `batches`.
+ indices: Vec<Vec<(u32, u32)>>,
+ /// Number of index entries referencing each batch.
+ batch_refcounts: Vec<u32>,
+ /// Per-group array chunks from `merge_batch`.
+ merged: Vec<Vec<ArrayRef>>,
+}
+
+impl ArrayAggGroupsAccumulator {
+ fn new(datatype: DataType, ignore_nulls: bool) -> Self {
+ Self {
+ datatype,
+ ignore_nulls,
+ batches: Vec::new(),
+ indices: Vec::new(),
+ batch_refcounts: Vec::new(),
+ merged: Vec::new(),
+ }
+ }
+}
+
+impl GroupsAccumulator for ArrayAggGroupsAccumulator {
+ /// Store references to each input batch and record per-group
+ /// `(batch_index, row_index)` pairs. Materialization is deferred
+ /// to `evaluate`, which minimizes the work done per-batch.
+ fn update_batch(
+ &mut self,
+ values: &[ArrayRef],
+ group_indices: &[usize],
+ opt_filter: Option<&BooleanArray>,
+ total_num_groups: usize,
+ ) -> Result<()> {
+ assert_eq!(values.len(), 1, "single argument to update_batch");
+ let input = &values[0];
+
+ self.indices.resize_with(total_num_groups, Vec::new);
+ self.merged.resize_with(total_num_groups, Vec::new);
+
+ let nulls = if self.ignore_nulls {
+ input.logical_nulls()
+ } else {
+ None
+ };
+
+ let batch_idx = self.batches.len();
+ let mut batch_pushed = false;
+
+ for (row_idx, &group_idx) in group_indices.iter().enumerate() {
+ // Skip filtered rows
+ if let Some(filter) = opt_filter
+ && (filter.is_null(row_idx) || !filter.value(row_idx))
+ {
+ continue;
+ }
+
+ // Skip null values when ignore_nulls is set
+ if let Some(ref nulls) = nulls
+ && nulls.is_null(row_idx)
+ {
+ continue;
+ }
+
+ if !batch_pushed {
+ self.batches.push(Arc::clone(input));
+ self.batch_refcounts.push(0);
+ batch_pushed = true;
+ }
+ self.batch_refcounts[batch_idx] += 1;
+ self.indices[group_idx].push((batch_idx as u32, row_idx as u32));
+ }
+
+ Ok(())
+ }
+
+ fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
+ let emit_indices = emit_to.take_needed(&mut self.indices);
+ let emit_merged = emit_to.take_needed(&mut self.merged);
+ let num_groups = emit_indices.len();
+
+ let mut offsets = Vec::<i32>::with_capacity(num_groups + 1);
+ offsets.push(0);
+ let mut nulls_builder = NullBufferBuilder::new(num_groups);
+ let mut cur_offset = 0i32;
+
+ // Build ListArray offsets and nulls: groups with no elements
+ // are null, others occupy offsets[i]..offsets[i+1] in the
+ // flat values array.
+ for (group_indices, group_merged) in
emit_indices.iter().zip(emit_merged.iter()) {
+ let merged_len = group_merged.iter().map(|a|
a.len()).sum::<usize>();
+ let total_len = group_indices.len() + merged_len;
+
+ if total_len == 0 {
+ nulls_builder.append_null();
+ } else {
+ nulls_builder.append_non_null();
+ }
+
+ cur_offset += total_len as i32;
Review Comment:
nit: Should this use `checked_add()` ?
It is unlikely that it will overflow i32::MAX but the mixed usage of
different sizes and silent ops and castings may hide an eventual problem.
##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -414,6 +435,293 @@ impl Accumulator for ArrayAggAccumulator {
}
}
+#[derive(Debug)]
+struct ArrayAggGroupsAccumulator {
+ datatype: DataType,
+ ignore_nulls: bool,
+ /// Input batches received via `update_batch`.
+ batches: Vec<ArrayRef>,
+ /// Per-group list of `(batch_index, row_index)` pairs into `batches`.
+ indices: Vec<Vec<(u32, u32)>>,
+ /// Number of index entries referencing each batch.
+ batch_refcounts: Vec<u32>,
+ /// Per-group array chunks from `merge_batch`.
+ merged: Vec<Vec<ArrayRef>>,
+}
+
+impl ArrayAggGroupsAccumulator {
+ fn new(datatype: DataType, ignore_nulls: bool) -> Self {
+ Self {
+ datatype,
+ ignore_nulls,
+ batches: Vec::new(),
+ indices: Vec::new(),
+ batch_refcounts: Vec::new(),
+ merged: Vec::new(),
+ }
+ }
+}
+
+impl GroupsAccumulator for ArrayAggGroupsAccumulator {
+ /// Store references to each input batch and record per-group
+ /// `(batch_index, row_index)` pairs. Materialization is deferred
+ /// to `evaluate`, which minimizes the work done per-batch.
+ fn update_batch(
+ &mut self,
+ values: &[ArrayRef],
+ group_indices: &[usize],
+ opt_filter: Option<&BooleanArray>,
+ total_num_groups: usize,
+ ) -> Result<()> {
+ assert_eq!(values.len(), 1, "single argument to update_batch");
+ let input = &values[0];
+
+ self.indices.resize_with(total_num_groups, Vec::new);
+ self.merged.resize_with(total_num_groups, Vec::new);
+
+ let nulls = if self.ignore_nulls {
+ input.logical_nulls()
+ } else {
+ None
+ };
+
+ let batch_idx = self.batches.len();
+ let mut batch_pushed = false;
+
+ for (row_idx, &group_idx) in group_indices.iter().enumerate() {
+ // Skip filtered rows
+ if let Some(filter) = opt_filter
+ && (filter.is_null(row_idx) || !filter.value(row_idx))
+ {
+ continue;
+ }
+
+ // Skip null values when ignore_nulls is set
+ if let Some(ref nulls) = nulls
+ && nulls.is_null(row_idx)
+ {
+ continue;
+ }
+
+ if !batch_pushed {
+ self.batches.push(Arc::clone(input));
+ self.batch_refcounts.push(0);
+ batch_pushed = true;
+ }
+ self.batch_refcounts[batch_idx] += 1;
+ self.indices[group_idx].push((batch_idx as u32, row_idx as u32));
+ }
+
+ Ok(())
+ }
+
+ fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
+ let emit_indices = emit_to.take_needed(&mut self.indices);
+ let emit_merged = emit_to.take_needed(&mut self.merged);
+ let num_groups = emit_indices.len();
+
+ let mut offsets = Vec::<i32>::with_capacity(num_groups + 1);
+ offsets.push(0);
+ let mut nulls_builder = NullBufferBuilder::new(num_groups);
+ let mut cur_offset = 0i32;
+
+ // Build ListArray offsets and nulls: groups with no elements
+ // are null, others occupy offsets[i]..offsets[i+1] in the
+ // flat values array.
+ for (group_indices, group_merged) in
emit_indices.iter().zip(emit_merged.iter()) {
+ let merged_len = group_merged.iter().map(|a|
a.len()).sum::<usize>();
+ let total_len = group_indices.len() + merged_len;
+
+ if total_len == 0 {
+ nulls_builder.append_null();
+ } else {
+ nulls_builder.append_non_null();
+ }
+
+ cur_offset += total_len as i32;
+ offsets.push(cur_offset);
+ }
+
+ let total_rows = cur_offset as usize;
+
+ // Build the flat values array for the output ListArray using
+ // `interleave`. `interleave` takes a list of source arrays and a list
+ // of (source_index, row_index) pairs, and gathers the referenced
+ // elements into a single output array.
+ //
+ // We assemble two inputs:
+ // 1. `sources`: all arrays that contain data we need — the
+ // input batches (from update_batch) followed by the merged
+ // chunks (from merge_batch), flattened in group order.
+ // 2. `interleave_indices`: for each group, the (source, row)
+ // pairs that select that group's elements from `sources`.
+ // Batch-indexed rows come first, then merged chunks.
+ //
+ // The resulting array is used directly as the ListArray's values
+ // buffer, with the offsets computed above slicing it into per-group
+ // lists.
+ let flat_values = if total_rows == 0 {
+ new_empty_array(&self.datatype)
+ } else {
+ let mut sources: Vec<&dyn Array> = Vec::new();
+ for batch in &self.batches {
+ sources.push(batch.as_ref());
+ }
+ let merged_source_start = sources.len();
+ for group_merged in &emit_merged {
+ for chunk in group_merged {
+ sources.push(chunk.as_ref());
+ }
+ }
+
+ let mut interleave_indices: Vec<(usize, usize)> =
+ Vec::with_capacity(total_rows);
+ let mut merged_src_idx = merged_source_start;
+ for (group_indices, group_merged) in
+ emit_indices.iter().zip(emit_merged.iter())
+ {
+ for &(batch_idx, row_idx) in group_indices {
+ interleave_indices.push((batch_idx as usize, row_idx as
usize));
+ }
+ for chunk in group_merged {
+ for row in 0..chunk.len() {
+ interleave_indices.push((merged_src_idx, row));
+ }
+ merged_src_idx += 1;
+ }
+ }
+
+ arrow::compute::interleave(&sources, &interleave_indices)?
+ };
+
+ // Release batch references that are no longer needed.
+ if emit_to == EmitTo::All {
+ self.batches.clear();
+ self.batch_refcounts.clear();
+ } else {
+ // EmitTo::First(n): remaining groups still reference
+ // `self.batches` by positional index, so we can't remove entries
+ // without invalidating those indices. Instead, when a batch's
+ // refcount reaches zero, we replace it with an empty array to free
+ // the payload buffers while keeping indices stable.
+ //
+ // Repeated partial emits can leave "tombstone" empty batches, so
we
+ // retain some vector slots until a later `EmitTo::All` reset. We
+ // could compact more aggressively (e.g., by rewriting batch
+ // indices), but it is probably not worth the cost in practice.
+ let empty = new_empty_array(&self.datatype);
+ for group_indices in &emit_indices {
+ for &(batch_idx, _) in group_indices {
+ let rc = &mut self.batch_refcounts[batch_idx as usize];
+ *rc -= 1;
+ if *rc == 0 {
+ self.batches[batch_idx as usize] = Arc::clone(&empty);
+ }
+ }
+ }
+ }
+
+ let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
+ let field = Arc::new(Field::new_list_field(self.datatype.clone(),
true));
+ let result = ListArray::new(field, offsets, flat_values,
nulls_builder.finish());
+
+ Ok(Arc::new(result))
+ }
+
+ fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
+ Ok(vec![self.evaluate(emit_to)?])
+ }
+
+ fn merge_batch(
+ &mut self,
+ values: &[ArrayRef],
+ group_indices: &[usize],
+ _opt_filter: Option<&BooleanArray>,
+ total_num_groups: usize,
+ ) -> Result<()> {
+ assert_eq!(values.len(), 1, "one argument to merge_batch");
+ let input_list = values[0].as_list::<i32>();
+
+ self.indices.resize_with(total_num_groups, Vec::new);
+ self.merged.resize_with(total_num_groups, Vec::new);
+
+ for (row_idx, &group_idx) in group_indices.iter().enumerate() {
+ if input_list.is_null(row_idx) {
+ continue;
+ }
+ let list_value = input_list.value(row_idx);
+ if !list_value.is_empty() {
+ self.merged[group_idx].push(list_value);
+ }
+ }
+
+ Ok(())
+ }
+
+ fn convert_to_state(
+ &self,
+ values: &[ArrayRef],
+ opt_filter: Option<&BooleanArray>,
+ ) -> Result<Vec<ArrayRef>> {
+ assert_eq!(values.len(), 1, "one argument to convert_to_state");
Review Comment:
```suggestion
assert_eq_or_internal_err!(values.len(), 1, "one argument to
convert_to_state");
```
##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -414,6 +435,293 @@ impl Accumulator for ArrayAggAccumulator {
}
}
+#[derive(Debug)]
+struct ArrayAggGroupsAccumulator {
+ datatype: DataType,
+ ignore_nulls: bool,
+ /// Input batches received via `update_batch`.
+ batches: Vec<ArrayRef>,
+ /// Per-group list of `(batch_index, row_index)` pairs into `batches`.
+ indices: Vec<Vec<(u32, u32)>>,
+ /// Number of index entries referencing each batch.
+ batch_refcounts: Vec<u32>,
+ /// Per-group array chunks from `merge_batch`.
+ merged: Vec<Vec<ArrayRef>>,
+}
+
+impl ArrayAggGroupsAccumulator {
+ fn new(datatype: DataType, ignore_nulls: bool) -> Self {
+ Self {
+ datatype,
+ ignore_nulls,
+ batches: Vec::new(),
+ indices: Vec::new(),
+ batch_refcounts: Vec::new(),
+ merged: Vec::new(),
+ }
+ }
+}
+
+impl GroupsAccumulator for ArrayAggGroupsAccumulator {
+ /// Store references to each input batch and record per-group
+ /// `(batch_index, row_index)` pairs. Materialization is deferred
+ /// to `evaluate`, which minimizes the work done per-batch.
+ fn update_batch(
+ &mut self,
+ values: &[ArrayRef],
+ group_indices: &[usize],
+ opt_filter: Option<&BooleanArray>,
+ total_num_groups: usize,
+ ) -> Result<()> {
+ assert_eq!(values.len(), 1, "single argument to update_batch");
+ let input = &values[0];
+
+ self.indices.resize_with(total_num_groups, Vec::new);
+ self.merged.resize_with(total_num_groups, Vec::new);
+
+ let nulls = if self.ignore_nulls {
+ input.logical_nulls()
+ } else {
+ None
+ };
+
+ let batch_idx = self.batches.len();
+ let mut batch_pushed = false;
+
+ for (row_idx, &group_idx) in group_indices.iter().enumerate() {
+ // Skip filtered rows
+ if let Some(filter) = opt_filter
+ && (filter.is_null(row_idx) || !filter.value(row_idx))
+ {
+ continue;
+ }
+
+ // Skip null values when ignore_nulls is set
+ if let Some(ref nulls) = nulls
+ && nulls.is_null(row_idx)
+ {
+ continue;
+ }
+
+ if !batch_pushed {
+ self.batches.push(Arc::clone(input));
+ self.batch_refcounts.push(0);
+ batch_pushed = true;
+ }
+ self.batch_refcounts[batch_idx] += 1;
+ self.indices[group_idx].push((batch_idx as u32, row_idx as u32));
+ }
+
+ Ok(())
+ }
+
+ fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
+ let emit_indices = emit_to.take_needed(&mut self.indices);
+ let emit_merged = emit_to.take_needed(&mut self.merged);
+ let num_groups = emit_indices.len();
+
+ let mut offsets = Vec::<i32>::with_capacity(num_groups + 1);
+ offsets.push(0);
+ let mut nulls_builder = NullBufferBuilder::new(num_groups);
+ let mut cur_offset = 0i32;
+
+ // Build ListArray offsets and nulls: groups with no elements
+ // are null, others occupy offsets[i]..offsets[i+1] in the
+ // flat values array.
+ for (group_indices, group_merged) in
emit_indices.iter().zip(emit_merged.iter()) {
+ let merged_len = group_merged.iter().map(|a|
a.len()).sum::<usize>();
+ let total_len = group_indices.len() + merged_len;
+
+ if total_len == 0 {
+ nulls_builder.append_null();
+ } else {
+ nulls_builder.append_non_null();
+ }
+
+ cur_offset += total_len as i32;
+ offsets.push(cur_offset);
+ }
+
+ let total_rows = cur_offset as usize;
+
+ // Build the flat values array for the output ListArray using
+ // `interleave`. `interleave` takes a list of source arrays and a list
+ // of (source_index, row_index) pairs, and gathers the referenced
+ // elements into a single output array.
+ //
+ // We assemble two inputs:
+ // 1. `sources`: all arrays that contain data we need — the
+ // input batches (from update_batch) followed by the merged
+ // chunks (from merge_batch), flattened in group order.
+ // 2. `interleave_indices`: for each group, the (source, row)
+ // pairs that select that group's elements from `sources`.
+ // Batch-indexed rows come first, then merged chunks.
+ //
+ // The resulting array is used directly as the ListArray's values
+ // buffer, with the offsets computed above slicing it into per-group
+ // lists.
+ let flat_values = if total_rows == 0 {
+ new_empty_array(&self.datatype)
+ } else {
+ let mut sources: Vec<&dyn Array> = Vec::new();
+ for batch in &self.batches {
+ sources.push(batch.as_ref());
+ }
+ let merged_source_start = sources.len();
+ for group_merged in &emit_merged {
+ for chunk in group_merged {
+ sources.push(chunk.as_ref());
+ }
+ }
+
+ let mut interleave_indices: Vec<(usize, usize)> =
+ Vec::with_capacity(total_rows);
+ let mut merged_src_idx = merged_source_start;
+ for (group_indices, group_merged) in
+ emit_indices.iter().zip(emit_merged.iter())
+ {
+ for &(batch_idx, row_idx) in group_indices {
+ interleave_indices.push((batch_idx as usize, row_idx as
usize));
+ }
+ for chunk in group_merged {
+ for row in 0..chunk.len() {
+ interleave_indices.push((merged_src_idx, row));
+ }
+ merged_src_idx += 1;
+ }
+ }
+
+ arrow::compute::interleave(&sources, &interleave_indices)?
+ };
+
+ // Release batch references that are no longer needed.
+ if emit_to == EmitTo::All {
+ self.batches.clear();
+ self.batch_refcounts.clear();
+ } else {
+ // EmitTo::First(n): remaining groups still reference
+ // `self.batches` by positional index, so we can't remove entries
+ // without invalidating those indices. Instead, when a batch's
+ // refcount reaches zero, we replace it with an empty array to free
+ // the payload buffers while keeping indices stable.
+ //
+ // Repeated partial emits can leave "tombstone" empty batches, so
we
+ // retain some vector slots until a later `EmitTo::All` reset. We
+ // could compact more aggressively (e.g., by rewriting batch
+ // indices), but it is probably not worth the cost in practice.
+ let empty = new_empty_array(&self.datatype);
+ for group_indices in &emit_indices {
+ for &(batch_idx, _) in group_indices {
+ let rc = &mut self.batch_refcounts[batch_idx as usize];
+ *rc -= 1;
+ if *rc == 0 {
+ self.batches[batch_idx as usize] = Arc::clone(&empty);
+ }
+ }
+ }
+ }
+
+ let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
+ let field = Arc::new(Field::new_list_field(self.datatype.clone(),
true));
+ let result = ListArray::new(field, offsets, flat_values,
nulls_builder.finish());
+
+ Ok(Arc::new(result))
+ }
+
+ fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
+ Ok(vec![self.evaluate(emit_to)?])
+ }
+
+ fn merge_batch(
+ &mut self,
+ values: &[ArrayRef],
+ group_indices: &[usize],
+ _opt_filter: Option<&BooleanArray>,
+ total_num_groups: usize,
+ ) -> Result<()> {
+ assert_eq!(values.len(), 1, "one argument to merge_batch");
Review Comment:
```suggestion
assert_eq_or_internal_err!(values.len(), 1, "one argument to
merge_batch");
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]