This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 3a23bb2531 perf: Optimize `array_agg()` using `GroupsAccumulator`
(#20504)
3a23bb2531 is described below
commit 3a23bb2531e9276607e71836e1c78ae2214085e9
Author: Neil Conway <[email protected]>
AuthorDate: Sat Feb 28 08:21:11 2026 -0500
perf: Optimize `array_agg()` using `GroupsAccumulator` (#20504)
## Which issue does this PR close?
- Closes #20465.
- Closes #17446.
## Rationale for this change
This PR optimizes the performance of `array_agg()` by adding support for
the `GroupsAccumulator` API.
The design tries to minimize the amount of per-batch work done in
`update_batch()`: we store a reference to the batch, and a `(group_idx,
row_idx)` pair for each row. In `evaluate()`, we assemble all the
requested output with a single `interleave` call.
This turns out to be significantly faster, because we copy much less
data and assembling the results can be vectorized more effectively. For
example, on a benchmark with 5000 groups and 5000 int64 values per
group, this approach is roughly 190x faster than the previous approach.
Releasing memory after a partial emit is a little more involved than the
previous approach, but with some determination it is still possible.
## What changes are included in this PR?
* Implement the `GroupsAccumulator` API for `array_agg()`
* Add benchmark for `array_agg` of a named struct over a dict, following
the workload in #17446
* Add unit tests
* Improve SLT test coverage
* Remove a redundant SLT test
## Are these changes tested?
Yes, and benchmarked.
## Are there any user-facing changes?
No.
## AI usage
Iterated with the help of multiple AI tools; I've reviewed and
understand the resulting code.
---
datafusion/core/benches/aggregate_query_sql.rs | 11 +
datafusion/core/benches/data_utils/mod.rs | 18 +-
.../src/aggregate/groups_accumulator/nulls.rs | 2 +-
datafusion/functions-aggregate/src/array_agg.rs | 718 ++++++++++++++++++++-
.../test_files/aggregate_skip_partial.slt | 41 +-
5 files changed, 773 insertions(+), 17 deletions(-)
diff --git a/datafusion/core/benches/aggregate_query_sql.rs
b/datafusion/core/benches/aggregate_query_sql.rs
index b47512e5e9..402ac9c717 100644
--- a/datafusion/core/benches/aggregate_query_sql.rs
+++ b/datafusion/core/benches/aggregate_query_sql.rs
@@ -284,6 +284,17 @@ fn criterion_benchmark(c: &mut Criterion) {
)
})
});
+
+ c.bench_function("array_agg_struct_query_group_by_mid_groups", |b| {
+ b.iter(|| {
+ query(
+ ctx.clone(),
+ &rt,
+ "SELECT u64_mid, array_agg(named_struct('market', dict10,
'price', f64)) \
+ FROM t GROUP BY u64_mid",
+ )
+ })
+ });
}
criterion_group!(benches, criterion_benchmark);
diff --git a/datafusion/core/benches/data_utils/mod.rs
b/datafusion/core/benches/data_utils/mod.rs
index a30ada4205..728c6490c7 100644
--- a/datafusion/core/benches/data_utils/mod.rs
+++ b/datafusion/core/benches/data_utils/mod.rs
@@ -20,8 +20,9 @@
use arrow::array::{
ArrayRef, Float32Array, Float64Array, RecordBatch, StringArray,
StringViewBuilder,
UInt64Array,
- builder::{Int64Builder, StringBuilder},
+ builder::{Int64Builder, StringBuilder, StringDictionaryBuilder},
};
+use arrow::datatypes::Int32Type;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::datasource::MemTable;
use datafusion::error::Result;
@@ -65,6 +66,11 @@ pub fn create_schema() -> Schema {
// Integers randomly selected from a narrow range of values such that
// there are a few distinct values, but they are repeated often.
Field::new("u64_narrow", DataType::UInt64, false),
+ Field::new(
+ "dict10",
+ DataType::Dictionary(Box::new(DataType::Int32),
Box::new(DataType::Utf8)),
+ true,
+ ),
])
}
@@ -109,6 +115,15 @@ fn create_record_batch(
.map(|_| rng.random_range(0..10))
.collect::<Vec<_>>();
+ let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();
+ for _ in 0..batch_size {
+ if rng.random::<f64>() > 0.9 {
+ dict_builder.append_null();
+ } else {
+ dict_builder.append_value(format!("market_{}",
rng.random_range(0..10)));
+ }
+ }
+
RecordBatch::try_new(
schema,
vec![
@@ -118,6 +133,7 @@ fn create_record_batch(
Arc::new(UInt64Array::from(integer_values_wide)),
Arc::new(UInt64Array::from(integer_values_mid)),
Arc::new(UInt64Array::from(integer_values_narrow)),
+ Arc::new(dict_builder.finish()),
],
)
.unwrap()
diff --git
a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/nulls.rs
b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/nulls.rs
index 74d361cf25..435560721c 100644
---
a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/nulls.rs
+++
b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/nulls.rs
@@ -44,7 +44,7 @@ pub fn set_nulls<T: ArrowNumericType + Send>(
/// The `NullBuffer` is
/// * `true` (representing valid) for values that were `true` in filter
/// * `false` (representing null) for values that were `false` or `null` in
filter
-fn filter_to_nulls(filter: &BooleanArray) -> Option<NullBuffer> {
+pub fn filter_to_nulls(filter: &BooleanArray) -> Option<NullBuffer> {
let (filter_bools, filter_nulls) = filter.clone().into_parts();
let filter_bools = NullBuffer::from(filter_bools);
NullBuffer::union(Some(&filter_bools), filter_nulls.as_ref())
diff --git a/datafusion/functions-aggregate/src/array_agg.rs
b/datafusion/functions-aggregate/src/array_agg.rs
index c07958a858..cd4cb9b19f 100644
--- a/datafusion/functions-aggregate/src/array_agg.rs
+++ b/datafusion/functions-aggregate/src/array_agg.rs
@@ -23,8 +23,10 @@ use std::mem::{size_of, size_of_val, take};
use std::sync::Arc;
use arrow::array::{
- Array, ArrayRef, AsArray, BooleanArray, ListArray, StructArray,
new_empty_array,
+ Array, ArrayRef, AsArray, BooleanArray, ListArray, NullBufferBuilder,
StructArray,
+ UInt32Array, new_empty_array,
};
+use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow::compute::{SortOptions, filter};
use arrow::datatypes::{DataType, Field, FieldRef, Fields};
@@ -36,8 +38,10 @@ use datafusion_common::{Result, ScalarValue,
assert_eq_or_internal_err, exec_err
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion_expr::utils::format_state_name;
use datafusion_expr::{
- Accumulator, AggregateUDFImpl, Documentation, Signature, Volatility,
+ Accumulator, AggregateUDFImpl, Documentation, EmitTo, GroupsAccumulator,
Signature,
+ Volatility,
};
+use
datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::filter_to_nulls;
use datafusion_functions_aggregate_common::merge_arrays::merge_ordered_arrays;
use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity;
use datafusion_functions_aggregate_common::utils::ordering_fields;
@@ -228,6 +232,23 @@ impl AggregateUDFImpl for ArrayAgg {
datafusion_expr::ReversedUDAF::Reversed(array_agg_udaf())
}
+ fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool {
+ !args.is_distinct && args.order_bys.is_empty()
+ }
+
+ fn create_groups_accumulator(
+ &self,
+ args: AccumulatorArgs,
+ ) -> Result<Box<dyn GroupsAccumulator>> {
+ let field = &args.expr_fields[0];
+ let data_type = field.data_type().clone();
+ let ignore_nulls = args.ignore_nulls && field.is_nullable();
+ Ok(Box::new(ArrayAggGroupsAccumulator::new(
+ data_type,
+ ignore_nulls,
+ )))
+ }
+
fn supports_null_handling_clause(&self) -> bool {
true
}
@@ -414,6 +435,331 @@ impl Accumulator for ArrayAggAccumulator {
}
}
+#[derive(Debug)]
+struct ArrayAggGroupsAccumulator {
+ datatype: DataType,
+ ignore_nulls: bool,
+ /// Source arrays — input arrays (from update_batch) or list backing
+ /// arrays (from merge_batch).
+ batches: Vec<ArrayRef>,
+ /// Per-batch list of (group_idx, row_idx) pairs.
+ batch_entries: Vec<Vec<(u32, u32)>>,
+ /// Total number of groups tracked.
+ num_groups: usize,
+}
+
+impl ArrayAggGroupsAccumulator {
+ fn new(datatype: DataType, ignore_nulls: bool) -> Self {
+ Self {
+ datatype,
+ ignore_nulls,
+ batches: Vec::new(),
+ batch_entries: Vec::new(),
+ num_groups: 0,
+ }
+ }
+
+ fn clear_state(&mut self) {
+ // `size()` measures Vec capacity rather than len, so allocate new
+ // buffers instead of using `clear()`.
+ self.batches = Vec::new();
+ self.batch_entries = Vec::new();
+ self.num_groups = 0;
+ }
+
+ fn compact_retained_state(&mut self, emit_groups: usize) -> Result<()> {
+ // EmitTo::First is used to recover from memory pressure. Simply
+ // removing emitted entries in place is not enough because mixed
batches
+ // would continue to pin their original Array arrays, even if only a
few
+ // retained rows remain.
+ //
+ // Rebuild the retained state from scratch so fully emitted batches are
+ // dropped, mixed batches are compacted to arrays containing only the
+ // surviving rows, and retained metadata is right-sized.
+ let emit_groups = emit_groups as u32;
+ let old_batches = take(&mut self.batches);
+ let old_batch_entries = take(&mut self.batch_entries);
+
+ let mut batches = Vec::new();
+ let mut batch_entries = Vec::new();
+
+ for (batch, entries) in old_batches.into_iter().zip(old_batch_entries)
{
+ let retained_len = entries.iter().filter(|(g, _)| *g >=
emit_groups).count();
+
+ if retained_len == 0 {
+ continue;
+ }
+
+ if retained_len == entries.len() {
+ // Nothing was emitted from this batch, so we keep the existing
+ // array and only renumber the remaining group IDs so that they
+ // start from 0.
+ let mut retained_entries = entries;
+ for (g, _) in &mut retained_entries {
+ *g -= emit_groups;
+ }
+ retained_entries.shrink_to_fit();
+ batches.push(batch);
+ batch_entries.push(retained_entries);
+ continue;
+ }
+
+ let mut retained_entries = Vec::with_capacity(retained_len);
+ let mut retained_rows = Vec::with_capacity(retained_len);
+
+ for (g, r) in entries {
+ if g >= emit_groups {
+ // Compute the new `(group_idx, row_idx)` pair for a
+ // retained row. `group_idx` is renumbered to start from
+ // 0, and `row_idx` points into the new dense batch we are
+ // building.
+ retained_entries.push((g - emit_groups,
retained_rows.len() as u32));
+ retained_rows.push(r);
+ }
+ }
+
+ debug_assert_eq!(retained_entries.len(), retained_len);
+ debug_assert_eq!(retained_rows.len(), retained_len);
+
+ let batch = if retained_len == batch.len() {
+ batch
+ } else {
+ // Compact mixed batches so retained rows no longer pin the
+ // original array.
+ let retained_rows = UInt32Array::from(retained_rows);
+ arrow::compute::take(batch.as_ref(), &retained_rows, None)?
+ };
+
+ batches.push(batch);
+ batch_entries.push(retained_entries);
+ }
+
+ self.batches = batches;
+ self.batch_entries = batch_entries;
+ self.num_groups -= emit_groups as usize;
+
+ Ok(())
+ }
+}
+
+impl GroupsAccumulator for ArrayAggGroupsAccumulator {
+ /// Store a reference to the input batch, plus a `(group_idx, row_idx)`
pair
+ /// for every row.
+ fn update_batch(
+ &mut self,
+ values: &[ArrayRef],
+ group_indices: &[usize],
+ opt_filter: Option<&BooleanArray>,
+ total_num_groups: usize,
+ ) -> Result<()> {
+ assert_eq!(values.len(), 1, "single argument to update_batch");
+ let input = &values[0];
+
+ self.num_groups = self.num_groups.max(total_num_groups);
+
+ let nulls = if self.ignore_nulls {
+ input.logical_nulls()
+ } else {
+ None
+ };
+
+ let mut entries = Vec::new();
+
+ for (row_idx, &group_idx) in group_indices.iter().enumerate() {
+ // Skip filtered rows
+ if let Some(filter) = opt_filter
+ && (filter.is_null(row_idx) || !filter.value(row_idx))
+ {
+ continue;
+ }
+
+ // Skip null values when ignore_nulls is set
+ if let Some(ref nulls) = nulls
+ && nulls.is_null(row_idx)
+ {
+ continue;
+ }
+
+ entries.push((group_idx as u32, row_idx as u32));
+ }
+
+ // We only need to record the batch if it was non-empty.
+ if !entries.is_empty() {
+ self.batches.push(Arc::clone(input));
+ self.batch_entries.push(entries);
+ }
+
+ Ok(())
+ }
+
+ /// Produce a `ListArray` ordered by group index: the list at
+ /// position N contains the aggregated values for group N.
+ ///
+ /// Uses a counting sort to rearrange the stored `(group, row)`
+ /// entries into group order, then calls `interleave` to gather
+ /// the values into a flat array that backs the output `ListArray`.
+ fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
+ let emit_groups = match emit_to {
+ EmitTo::All => self.num_groups,
+ EmitTo::First(n) => n,
+ };
+
+ // Step 1: Count entries per group. For EmitTo::First(n), only groups
+ // 0..n are counted; the rest are retained to be emitted in the future.
+ let mut counts = vec![0u32; emit_groups];
+ for entries in &self.batch_entries {
+ for &(g, _) in entries {
+ let g = g as usize;
+ if g < emit_groups {
+ counts[g] += 1;
+ }
+ }
+ }
+
+ // Step 2: Do a prefix sum over the counts and use it to build
ListArray
+ // offsets, null buffer, and write positions for the counting sort.
+ let mut offsets = Vec::<i32>::with_capacity(emit_groups + 1);
+ offsets.push(0);
+ let mut nulls_builder = NullBufferBuilder::new(emit_groups);
+ let mut write_positions = Vec::with_capacity(emit_groups);
+ let mut cur_offset = 0u32;
+ for &count in &counts {
+ if count == 0 {
+ nulls_builder.append_null();
+ } else {
+ nulls_builder.append_non_null();
+ }
+ write_positions.push(cur_offset);
+ cur_offset += count;
+ offsets.push(cur_offset as i32);
+ }
+ let total_rows = cur_offset as usize;
+
+ // Step 3: Scatter entries into group order using the counting sort.
The
+ // batch index is implicit from the outer loop position.
+ let flat_values = if total_rows == 0 {
+ new_empty_array(&self.datatype)
+ } else {
+ let mut interleave_indices = vec![(0usize, 0usize); total_rows];
+ for (batch_idx, entries) in self.batch_entries.iter().enumerate() {
+ for &(g, r) in entries {
+ let g = g as usize;
+ if g < emit_groups {
+ let wp = write_positions[g] as usize;
+ interleave_indices[wp] = (batch_idx, r as usize);
+ write_positions[g] += 1;
+ }
+ }
+ }
+
+ let sources: Vec<&dyn Array> =
+ self.batches.iter().map(|b| b.as_ref()).collect();
+ arrow::compute::interleave(&sources, &interleave_indices)?
+ };
+
+ // Step 4: Release state for emitted groups.
+ match emit_to {
+ EmitTo::All => self.clear_state(),
+ EmitTo::First(_) => self.compact_retained_state(emit_groups)?,
+ }
+
+ let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
+ let field = Arc::new(Field::new_list_field(self.datatype.clone(),
true));
+ let result = ListArray::new(field, offsets, flat_values,
nulls_builder.finish());
+
+ Ok(Arc::new(result))
+ }
+
+ fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
+ Ok(vec![self.evaluate(emit_to)?])
+ }
+
+ fn merge_batch(
+ &mut self,
+ values: &[ArrayRef],
+ group_indices: &[usize],
+ _opt_filter: Option<&BooleanArray>,
+ total_num_groups: usize,
+ ) -> Result<()> {
+ assert_eq!(values.len(), 1, "one argument to merge_batch");
+ let input_list = values[0].as_list::<i32>();
+
+ self.num_groups = self.num_groups.max(total_num_groups);
+
+ // Push the ListArray's backing values array as a single batch.
+ let list_values = input_list.values();
+ let list_offsets = input_list.offsets();
+
+ let mut entries = Vec::new();
+
+ for (row_idx, &group_idx) in group_indices.iter().enumerate() {
+ if input_list.is_null(row_idx) {
+ continue;
+ }
+ let start = list_offsets[row_idx] as u32;
+ let end = list_offsets[row_idx + 1] as u32;
+ for pos in start..end {
+ entries.push((group_idx as u32, pos));
+ }
+ }
+
+ if !entries.is_empty() {
+ self.batches.push(Arc::clone(list_values));
+ self.batch_entries.push(entries);
+ }
+
+ Ok(())
+ }
+
+ fn convert_to_state(
+ &self,
+ values: &[ArrayRef],
+ opt_filter: Option<&BooleanArray>,
+ ) -> Result<Vec<ArrayRef>> {
+ assert_eq!(values.len(), 1, "one argument to convert_to_state");
+
+ let input = &values[0];
+
+ // Each row becomes a 1-element list: offsets are [0, 1, 2, ..., n].
+ let offsets = OffsetBuffer::from_repeated_length(1, input.len());
+
+ // Filtered rows become null list entries, which merge_batch will skip.
+ let filter_nulls = opt_filter.and_then(filter_to_nulls);
+
+ // With ignore_nulls, null values also become null list entries.
Without
+ // ignore_nulls, null values stay as [NULL] so merge_batch retains
them.
+ let nulls = if self.ignore_nulls {
+ let logical = input.logical_nulls();
+ NullBuffer::union(filter_nulls.as_ref(), logical.as_ref())
+ } else {
+ filter_nulls
+ };
+
+ let field = Arc::new(Field::new_list_field(self.datatype.clone(),
true));
+ let list_array = ListArray::new(field, offsets, Arc::clone(input),
nulls);
+
+ Ok(vec![Arc::new(list_array)])
+ }
+
+ fn supports_convert_to_state(&self) -> bool {
+ true
+ }
+
+ fn size(&self) -> usize {
+ self.batches
+ .iter()
+ .map(|arr|
arr.to_data().get_slice_memory_size().unwrap_or_default())
+ .sum::<usize>()
+ + self.batches.capacity() * size_of::<ArrayRef>()
+ + self
+ .batch_entries
+ .iter()
+ .map(|e| e.capacity() * size_of::<(u32, u32)>())
+ .sum::<usize>()
+ + self.batch_entries.capacity() * size_of::<Vec<(u32, u32)>>()
+ }
+}
+
#[derive(Debug)]
pub struct DistinctArrayAggAccumulator {
values: HashSet<ScalarValue>,
@@ -1227,4 +1573,372 @@ mod tests {
acc1.merge_batch(&intermediate_state)?;
Ok(acc1)
}
+
+ // ---- GroupsAccumulator tests ----
+
+ use arrow::array::Int32Array;
+
+ fn list_array_to_i32_vecs(list: &ListArray) ->
Vec<Option<Vec<Option<i32>>>> {
+ (0..list.len())
+ .map(|i| {
+ if list.is_null(i) {
+ None
+ } else {
+ let arr = list.value(i);
+ let vals: Vec<Option<i32>> = arr
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap()
+ .iter()
+ .collect();
+ Some(vals)
+ }
+ })
+ .collect()
+ }
+
+ fn eval_i32_lists(
+ acc: &mut ArrayAggGroupsAccumulator,
+ emit_to: EmitTo,
+ ) -> Result<Vec<Option<Vec<Option<i32>>>>> {
+ let result = acc.evaluate(emit_to)?;
+ Ok(list_array_to_i32_vecs(result.as_list::<i32>()))
+ }
+
+ #[test]
+ fn groups_accumulator_multiple_batches() -> Result<()> {
+ let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+
+ // First batch
+ let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
+ acc.update_batch(&[values], &[0, 1, 0], None, 2)?;
+
+ // Second batch
+ let values: ArrayRef = Arc::new(Int32Array::from(vec![4, 5]));
+ acc.update_batch(&[values], &[1, 0], None, 2)?;
+
+ let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
+ assert_eq!(vals[0], Some(vec![Some(1), Some(3), Some(5)]));
+ assert_eq!(vals[1], Some(vec![Some(2), Some(4)]));
+
+ Ok(())
+ }
+
+ #[test]
+ fn groups_accumulator_emit_first() -> Result<()> {
+ let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+
+ let values: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30]));
+ acc.update_batch(&[values], &[0, 1, 2], None, 3)?;
+
+ // Emit first 2 groups
+ let vals = eval_i32_lists(&mut acc, EmitTo::First(2))?;
+ assert_eq!(vals.len(), 2);
+ assert_eq!(vals[0], Some(vec![Some(10)]));
+ assert_eq!(vals[1], Some(vec![Some(20)]));
+
+ // Remaining group (was index 2, now shifted to 0)
+ let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
+ assert_eq!(vals.len(), 1);
+ assert_eq!(vals[0], Some(vec![Some(30)]));
+
+ Ok(())
+ }
+
+ #[test]
+ fn groups_accumulator_emit_first_frees_batches() -> Result<()> {
+ // Batch 0 has rows only for group 0; batch 1 has rows for
+ // both groups. After emitting group 0, batch 0 should be
+ // dropped entirely and batch 1 should be compacted to the
+ // retained row(s).
+ let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+
+ let batch0: ArrayRef = Arc::new(Int32Array::from(vec![10, 20]));
+ acc.update_batch(&[batch0], &[0, 0], None, 2)?;
+
+ let batch1: ArrayRef = Arc::new(Int32Array::from(vec![30, 40]));
+ acc.update_batch(&[batch1], &[0, 1], None, 2)?;
+
+ assert_eq!(acc.batches.len(), 2);
+ assert!(!acc.batches[0].is_empty());
+ assert!(!acc.batches[1].is_empty());
+
+ // Emit group 0. Batch 0 is only referenced by group 0, so it
+ // should be removed. Batch 1 is mixed, so it should be compacted
+ // to contain only the retained row for group 1.
+ let vals = eval_i32_lists(&mut acc, EmitTo::First(1))?;
+ assert_eq!(vals[0], Some(vec![Some(10), Some(20), Some(30)]));
+
+ assert_eq!(acc.batches.len(), 1);
+ let retained = acc.batches[0]
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ assert_eq!(retained.values(), &[40]);
+ assert_eq!(acc.batch_entries, vec![vec![(0, 0)]]);
+
+ // Emit remaining group 1
+ let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
+ assert_eq!(vals[0], Some(vec![Some(40)]));
+
+ assert!(acc.batches.is_empty());
+ assert_eq!(acc.size(), 0);
+
+ Ok(())
+ }
+
+ #[test]
+ fn groups_accumulator_emit_first_compacts_mixed_batches() -> Result<()> {
+ let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+
+ let batch: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30, 40]));
+ acc.update_batch(&[batch], &[0, 1, 0, 1], None, 2)?;
+
+ let size_before = acc.size();
+ let vals = eval_i32_lists(&mut acc, EmitTo::First(1))?;
+ assert_eq!(vals[0], Some(vec![Some(10), Some(30)]));
+
+ assert_eq!(acc.num_groups, 1);
+ assert_eq!(acc.batches.len(), 1);
+ let retained = acc.batches[0]
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ assert_eq!(retained.values(), &[20, 40]);
+ assert_eq!(acc.batch_entries, vec![vec![(0, 0), (0, 1)]]);
+ assert!(acc.size() < size_before);
+
+ let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
+ assert_eq!(vals[0], Some(vec![Some(20), Some(40)]));
+ assert_eq!(acc.size(), 0);
+
+ Ok(())
+ }
+
+ #[test]
+ fn groups_accumulator_emit_all_releases_capacity() -> Result<()> {
+ let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+
+ let batch: ArrayRef = Arc::new(Int32Array::from_iter_values(0..64));
+ acc.update_batch(
+ &[batch],
+ &(0..64).map(|i| i % 4).collect::<Vec<_>>(),
+ None,
+ 4,
+ )?;
+
+ assert!(acc.size() > 0);
+ let _ = eval_i32_lists(&mut acc, EmitTo::All)?;
+
+ assert_eq!(acc.size(), 0);
+ assert_eq!(acc.batches.capacity(), 0);
+ assert_eq!(acc.batch_entries.capacity(), 0);
+
+ Ok(())
+ }
+
+ #[test]
+ fn groups_accumulator_null_groups() -> Result<()> {
+ // Groups that never receive values should produce null
+ let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+
+ let values: ArrayRef = Arc::new(Int32Array::from(vec![1]));
+ // Only group 0 gets a value, groups 1 and 2 are empty
+ acc.update_batch(&[values], &[0], None, 3)?;
+
+ let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
+ assert_eq!(vals, vec![Some(vec![Some(1)]), None, None]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn groups_accumulator_ignore_nulls() -> Result<()> {
+ let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, true);
+
+ let values: ArrayRef =
+ Arc::new(Int32Array::from(vec![Some(1), None, Some(3), None]));
+ acc.update_batch(&[values], &[0, 0, 1, 1], None, 2)?;
+
+ let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
+ // Group 0: only non-null value is 1
+ assert_eq!(vals[0], Some(vec![Some(1)]));
+ // Group 1: only non-null value is 3
+ assert_eq!(vals[1], Some(vec![Some(3)]));
+
+ Ok(())
+ }
+
+ #[test]
+ fn groups_accumulator_opt_filter() -> Result<()> {
+ let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+
+ let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
+ // Use a mix of false and null to filter out rows — both should
+ // be skipped.
+ let filter = BooleanArray::from(vec![Some(true), None, Some(true),
Some(false)]);
+ acc.update_batch(&[values], &[0, 0, 1, 1], Some(&filter), 2)?;
+
+ let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
+ assert_eq!(vals[0], Some(vec![Some(1)])); // row 1 filtered (null)
+ assert_eq!(vals[1], Some(vec![Some(3)])); // row 3 filtered (false)
+
+ Ok(())
+ }
+
+ #[test]
+ fn groups_accumulator_state_merge_roundtrip() -> Result<()> {
+ // Accumulator 1: update_batch, then merge, then update_batch again.
+ // Verifies that values appear in chronological insertion order.
+ let mut acc1 = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+ let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
+ acc1.update_batch(&[values], &[0, 1], None, 2)?;
+
+ // Accumulator 2
+ let mut acc2 = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+ let values: ArrayRef = Arc::new(Int32Array::from(vec![3, 4]));
+ acc2.update_batch(&[values], &[0, 1], None, 2)?;
+
+ // Merge acc2's state into acc1
+ let state = acc2.state(EmitTo::All)?;
+ acc1.merge_batch(&state, &[0, 1], None, 2)?;
+
+ // Another update_batch on acc1 after the merge
+ let values: ArrayRef = Arc::new(Int32Array::from(vec![5, 6]));
+ acc1.update_batch(&[values], &[0, 1], None, 2)?;
+
+ // Each group's values in insertion order:
+ // group 0: update(1), merge(3), update(5) → [1, 3, 5]
+ // group 1: update(2), merge(4), update(6) → [2, 4, 6]
+ let vals = eval_i32_lists(&mut acc1, EmitTo::All)?;
+ assert_eq!(vals[0], Some(vec![Some(1), Some(3), Some(5)]));
+ assert_eq!(vals[1], Some(vec![Some(2), Some(4), Some(6)]));
+
+ Ok(())
+ }
+
+ #[test]
+ fn groups_accumulator_convert_to_state() -> Result<()> {
+ let acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+
+ let values: ArrayRef = Arc::new(Int32Array::from(vec![Some(10), None,
Some(30)]));
+ let state = acc.convert_to_state(&[values], None)?;
+
+ assert_eq!(state.len(), 1);
+ let vals = list_array_to_i32_vecs(state[0].as_list::<i32>());
+ assert_eq!(
+ vals,
+ vec![
+ Some(vec![Some(10)]),
+ Some(vec![None]), // null preserved inside list, not promoted
+ Some(vec![Some(30)]),
+ ]
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn groups_accumulator_convert_to_state_with_filter() -> Result<()> {
+ let acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+
+ let values: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30]));
+ let filter = BooleanArray::from(vec![true, false, true]);
+ let state = acc.convert_to_state(&[values], Some(&filter))?;
+
+ let vals = list_array_to_i32_vecs(state[0].as_list::<i32>());
+ assert_eq!(
+ vals,
+ vec![
+ Some(vec![Some(10)]),
+ None, // filtered
+ Some(vec![Some(30)]),
+ ]
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn groups_accumulator_convert_to_state_merge_preserves_nulls() ->
Result<()> {
+ // Verifies that null values survive the convert_to_state ->
merge_batch
+ // round-trip when ignore_nulls is false (default null handling).
+ let acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+
+ let values: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None,
Some(3)]));
+ let state = acc.convert_to_state(&[values], None)?;
+
+ // Feed state into a new accumulator via merge_batch
+ let mut acc2 = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+ acc2.merge_batch(&state, &[0, 0, 1], None, 2)?;
+
+ // Group 0 received rows 0 ([1]) and 1 ([NULL]) → [1, NULL]
+ let vals = eval_i32_lists(&mut acc2, EmitTo::All)?;
+ assert_eq!(vals[0], Some(vec![Some(1), None]));
+ // Group 1 received row 2 ([3]) → [3]
+ assert_eq!(vals[1], Some(vec![Some(3)]));
+
+ Ok(())
+ }
+
+ #[test]
+ fn groups_accumulator_convert_to_state_merge_ignore_nulls() -> Result<()> {
+ // Verifies that null values are dropped in the convert_to_state ->
+ // merge_batch round-trip when ignore_nulls is true.
+ let acc = ArrayAggGroupsAccumulator::new(DataType::Int32, true);
+
+ let values: ArrayRef =
+ Arc::new(Int32Array::from(vec![Some(1), None, Some(3), None]));
+ let state = acc.convert_to_state(&[values], None)?;
+
+ let list = state[0].as_list::<i32>();
+ // Rows 0 and 2 are valid lists; rows 1 and 3 are null list entries
+ assert!(!list.is_null(0));
+ assert!(list.is_null(1));
+ assert!(!list.is_null(2));
+ assert!(list.is_null(3));
+
+ // Feed state into a new accumulator via merge_batch
+ let mut acc2 = ArrayAggGroupsAccumulator::new(DataType::Int32, true);
+ acc2.merge_batch(&state, &[0, 0, 1, 1], None, 2)?;
+
+ // Group 0: received [1] and null (skipped) → [1]
+ let vals = eval_i32_lists(&mut acc2, EmitTo::All)?;
+ assert_eq!(vals[0], Some(vec![Some(1)]));
+ // Group 1: received [3] and null (skipped) → [3]
+ assert_eq!(vals[1], Some(vec![Some(3)]));
+
+ Ok(())
+ }
+
+ #[test]
+ fn groups_accumulator_all_groups_empty() -> Result<()> {
+ let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+
+ // Create groups but don't add any values (all filtered out)
+ let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
+ let filter = BooleanArray::from(vec![false, false]);
+ acc.update_batch(&[values], &[0, 1], Some(&filter), 2)?;
+
+ let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
+ assert_eq!(vals, vec![None, None]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn groups_accumulator_ignore_nulls_all_null_group() -> Result<()> {
+ // When ignore_nulls is true and a group receives only nulls,
+ // it should produce a null output
+ let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, true);
+
+ let values: ArrayRef = Arc::new(Int32Array::from(vec![None, Some(1),
None]));
+ acc.update_batch(&[values], &[0, 1, 0], None, 2)?;
+
+ let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
+ assert_eq!(vals[0], None); // group 0 got only nulls, all filtered
+ assert_eq!(vals[1], Some(vec![Some(1)])); // group 1 got value 1
+
+ Ok(())
+ }
}
diff --git a/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt
b/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt
index 0885a6a7d6..c16a6f4424 100644
--- a/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt
+++ b/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt
@@ -175,6 +175,21 @@ GROUP BY 1, 2 ORDER BY 1 LIMIT 5;
-2117946883 d 1 0 0 0
-2098805236 c 1 0 0 0
+query IT????
+SELECT c5, c1,
+ ARRAY_AGG(c3),
+ ARRAY_AGG(CASE WHEN c1 = 'a' THEN c3 ELSE NULL END),
+ ARRAY_AGG(c3) FILTER (WHERE c1 = 'b'),
+ ARRAY_AGG(CASE WHEN c1 = 'a' THEN c3 ELSE NULL END) FILTER (WHERE c1 =
'b')
+FROM aggregate_test_100
+GROUP BY 1, 2 ORDER BY 1 LIMIT 5;
+----
+-2141999138 c [-2] [NULL] NULL NULL
+-2141451704 a [-72] [-72] NULL NULL
+-2138770630 b [63] [NULL] [63] [NULL]
+-2117946883 d [-59] [NULL] NULL NULL
+-2098805236 c [22] [NULL] NULL NULL
+
# Regression test for https://github.com/apache/datafusion/issues/11846
query TBBBB rowsort
select v1, bool_or(v2), bool_and(v2), bool_or(v3), bool_and(v3)
@@ -244,6 +259,19 @@ SELECT c2, count(c1), count(c5), count(c11) FROM
aggregate_test_100 GROUP BY c2
4 23 23 23
5 14 14 14
+# Test array_agg; we sort the output to ensure deterministic results
+query I??
+SELECT c2,
+ array_sort(array_agg(c5)),
+ array_sort(array_agg(c3) FILTER (WHERE c3 > 0))
+FROM aggregate_test_100 GROUP BY c2 ORDER BY c2;
+----
+1 [-1991133944, -1882293856, -1448995523, -1383162419, -1339586153,
-1331533190, -1176490478, -1143802338, -928766616, -644225469, -335410409,
383352709, 431378678, 794623392, 994303988, 1171968280, 1188089983, 1213926989,
1325868318, 1413111008, 2106705285, 2143473091] [12, 29, 36, 38, 41, 54, 57,
70, 71, 83, 103, 120, 125]
+2 [-2138770630, -1927628110, -1908480893, -1899175111, -1808210365,
-1660426473, -1222533990, -1090239422, -1011669561, -800561771, -587831330,
-537142430, -168758331, -108973366, 49866617, 370975815, 439738328, 715235348,
1354539333, 1593800404, 2033001162, 2053379412] [1, 29, 31, 45, 49, 52, 52, 63,
68, 93, 97, 113, 122]
+3 [-2141999138, -2141451704, -2098805236, -1302295658, -903316089, -421042466,
-382483011, -346989627, 141218956, 240273900, 397430452, 670497898, 912707948,
1299719633, 1337043149, 1436496767, 1489733240, 1738331255, 2030965207] [13,
13, 14, 17, 17, 22, 71, 73, 77, 97, 104, 112, 123]
+4 [-1885422396, -1813935549, -1009656194, -673237643, -237425046, -4229382,
61035129, 427197269, 434021400, 659422734, 702611616, 762932956, 852509237,
1282464673, 1423957796, 1544188174, 1579876740, 1902023838, 1991172974,
1993193190, 2047637360, 2051224722, 2064155045] [3, 5, 17, 30, 47, 55, 65, 73,
74, 96, 97, 102, 123]
+5 [-2117946883, -842693467, -629486480, -467659022, -134213907, 41423756,
586844478, 623103518, 706441268, 1188285940, 1689098844, 1824882165,
1955646088, 2025611582] [36, 62, 64, 68, 118]
+
# Test min / max for int / float
query IIIRR
SELECT c2, min(c5), max(c5), min(c11), max(c11) FROM aggregate_test_100 GROUP
BY c2 ORDER BY c2;
@@ -389,19 +417,6 @@ c 2.666666666667 0.425241138254
d 2.444444444444 0.541519476308
e 3 0.505440263521
-# FIXME: add bool_and(v3) column when issue fixed
-# ISSUE https://github.com/apache/datafusion/issues/11846
-query TBBB rowsort
-select v1, bool_or(v2), bool_and(v2), bool_or(v3)
-from aggregate_test_100_bool
-group by v1
-----
-a true false true
-b true false true
-c true false false
-d true false false
-e true false NULL
-
query TBBB rowsort
select v1,
bool_or(v2) FILTER (WHERE v1 = 'a' OR v1 = 'c' OR v1 = 'e'),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]