kosiew commented on code in PR #22015:
URL: https://github.com/apache/datafusion/pull/22015#discussion_r3206020656
##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -1935,4 +1991,293 @@ mod tests {
Ok(())
}
+
+ // ---- retract_batch tests ----
+
+ #[test]
+ fn retract_basic_sliding_window() -> Result<()> {
+ let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
+
+ // Simulate ROWS BETWEEN 1 PRECEDING AND CURRENT ROW over [A, B, C, D]
+ // Row 1: frame = [A]
+ acc.update_batch(&[data(["A"])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A"]);
+
+ // Row 2: frame = [A, B]
+ acc.update_batch(&[data(["B"])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B"]);
+
+ // Row 3: frame = [B, C] — A leaves
+ acc.update_batch(&[data(["C"])])?;
+ acc.retract_batch(&[data(["A"])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["B", "C"]);
+
+ // Row 4: frame = [C, D] — B leaves
+ acc.update_batch(&[data(["D"])])?;
+ acc.retract_batch(&[data(["B"])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["C", "D"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn retract_multi_element_across_arrays() -> Result<()> {
+ let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
+
+ // First batch: 3 elements
+ acc.update_batch(&[data(["A", "B", "C"])])?;
+ // Second batch: 1 element
+ acc.update_batch(&[data(["D"])])?;
+
+ assert_eq!(
+ print_nulls(str_arr(acc.evaluate()?)?),
+ vec!["A", "B", "C", "D"]
+ );
+
+ // Partial retract from front array: A leaves
+ acc.retract_batch(&[data(["A"])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["B", "C",
"D"]);
+
+ // Retract spanning two arrays: B, C (rest of first array) + D (second
array)
+ acc.retract_batch(&[data(["B", "C", "D"])])?;
+ let result = acc.evaluate()?;
+ assert!(
+ matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
+ "expected null list after full retract, got {result:?}"
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn retract_with_nulls_preserved() -> Result<()> {
+ // ignore_nulls = false: NULLs are stored and counted for retract
+ let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
+
+ acc.update_batch(&[data([Some("A"), None, Some("C")])])?;
+ assert_eq!(
+ print_nulls(str_arr(acc.evaluate()?)?),
+ vec!["A", "NULL", "C"]
+ );
+
+ // Retract 2 elements: A and NULL both leave
+ acc.retract_batch(&[data([Some("A"), None])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["C"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn retract_with_ignore_nulls() -> Result<()> {
+ // ignore_nulls = true: NULLs are NOT stored by update_batch,
+ // so retract must only count non-null values
+ let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, true)?;
+
+ // update_batch with [A, NULL, C] → stores only [A, C] (NULL filtered)
+ acc.update_batch(&[data([Some("A"), None, Some("C")])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "C"]);
+
+ // retract_batch receives the original values including NULL: [A, NULL]
+ // But only 1 non-null value (A) should be retracted
+ acc.retract_batch(&[data([Some("A"), None])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["C"]);
+
+ // retract_batch with [NULL, C] — only C (1 non-null) retracted
+ acc.retract_batch(&[data([None, Some("C")])])?;
+ let result = acc.evaluate()?;
+ assert!(
+ matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
+ "expected null list after full retract, got {result:?}"
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn retract_ignore_nulls_all_nulls_batch() -> Result<()> {
+ // When ignore_nulls = true and retract batch is all NULLs, nothing is
retracted
+ let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, true)?;
+
+ acc.update_batch(&[data([Some("A"), Some("B")])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B"]);
+
+ // Retract batch of all NULLs: to_retract = 0, nothing changes
+ acc.retract_batch(&[data::<Option<&str>, 3>([None, None, None])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn retract_empty_accumulator() -> Result<()> {
+ let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
+
+ // Retract on empty accumulator should be a no-op
+ acc.retract_batch(&[data(["A"])])?;
+ let result = acc.evaluate()?;
+ assert!(
+ matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
+ "expected null list for empty accumulator, got {result:?}"
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn retract_front_offset_partial_consume() -> Result<()> {
+ // Reproduces the RANGE BETWEEN 2 PRECEDING AND 2 FOLLOWING scenario:
+ // ts: 1, 2, 3, 4, 100
+ //
+ // Row 1 (ts=1): update [A,B,C] (3 elements, ts in [-1,3])
+ // Row 2 (ts=2): update [D] (ts=4 enters)
+ // Row 3 (ts=3): no change (same frame [0..4))
+ // Row 4 (ts=4): retract [A] (ts=1 leaves, partial consume)
+ // Row 5 (ts=100): retract [B,C,D] (3-element retract spanning arrays)
+ let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
+
+ // Row 1: update_batch(["A","B","C"])
+ acc.update_batch(&[data(["A", "B", "C"])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B",
"C"]);
+
+ // Row 2: update_batch(["D"])
+ acc.update_batch(&[data(["D"])])?;
+ assert_eq!(
+ print_nulls(str_arr(acc.evaluate()?)?),
+ vec!["A", "B", "C", "D"]
+ );
+
+ // Row 4: retract_batch(["A"]) — partial consume, front_offset = 1
+ acc.retract_batch(&[data(["A"])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["B", "C",
"D"]);
+
+ // Row 5: update_batch(["E"]), then retract_batch(["B","C","D"])
+ // retract spans: ["A","B","C"] (offset=1, 2 remaining) + ["D"] (1
element)
+ acc.update_batch(&[data(["E"])])?;
+ acc.retract_batch(&[data(["B", "C", "D"])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["E"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn retract_update_after_full_drain() -> Result<()> {
+ // Verify accumulator works correctly after being fully drained
+ let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
+
+ acc.update_batch(&[data(["A", "B"])])?;
+ acc.retract_batch(&[data(["A", "B"])])?;
+
+ // Accumulator is empty now
+ let result = acc.evaluate()?;
+ assert!(
+ matches!(&result, ScalarValue::List(arr) if arr.is_null(0)),
+ "expected null list, got {result:?}"
+ );
+
+ // New values should work normally after drain
+ acc.update_batch(&[data(["X", "Y"])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["X", "Y"]);
+
+ acc.retract_batch(&[data(["X"])])?;
+ assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["Y"]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn retract_supports_retract_batch() -> Result<()> {
+ let acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?;
+ assert!(acc.supports_retract_batch());
+
+ let acc_ignore = ArrayAggAccumulator::try_new(&DataType::Utf8, true)?;
+ assert!(acc_ignore.supports_retract_batch());
+
+ Ok(())
+ }
+
+ #[test]
+ fn retract_ignore_nulls_logical_vs_physical() -> Result<()> {
+ // Regression test: DictionaryArray where logical nulls differ from
physical nulls.
+ // Indices are all valid (physical null_count = 0) but some point to
null
+ // dictionary values (logical_null_count > 0).
+ use arrow::array::StringDictionaryBuilder;
+
+ let dict_type =
+ DataType::Dictionary(Box::new(DataType::Int32),
Box::new(DataType::Utf8));
+ let mut acc = ArrayAggAccumulator::try_new(&dict_type, true)?;
+
+ // Build a DictionaryArray: ["hello", NULL, "world", NULL]
+ // All indices valid → physical null_count = 0
+ // Indices 1,3 point to null → logical_null_count = 2
+ let mut builder =
StringDictionaryBuilder::<arrow::datatypes::Int32Type>::new();
+ builder.append_value("hello");
+ builder.append_null();
+ builder.append_value("world");
+ builder.append_null();
+ let dict_array: ArrayRef = Arc::new(builder.finish());
+
+ assert_eq!(dict_array.null_count(), 2);
Review Comment:
Thanks for adding this regression test. I think it needs one more tweak to
cover the original issue properly.
Right now this uses `StringDictionaryBuilder::append_null()`, which creates
null dictionary keys. Because of that, `dict_array.null_count()` and
`dict_array.logical_null_count()` are both `2`, so the test would still pass
with the old `val.len() - val.null_count()` behavior.
Could you please construct a dictionary or run array with valid physical
entries that reference null values instead? It would also be helpful to assert
that `null_count() != logical_null_count()` before checking the retract
behavior, so the test clearly covers the regression.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]