alamb commented on code in PR #15667:
URL: https://github.com/apache/datafusion/pull/15667#discussion_r2072364948
##########
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/nulls.rs:
##########
@@ -193,6 +193,14 @@ pub fn set_nulls_dyn(input: &dyn Array, nulls:
Option<NullBuffer>) -> Result<Arr
))
}
}
+ DataType::Struct(_) => unsafe {
+ let input = input.as_struct();
+ Arc::new(StructArray::new_unchecked(
Review Comment:
can you please add a safety note here (probably can just reuse the same as
above)
##########
datafusion/common/src/scalar/mod.rs:
##########
@@ -616,7 +616,20 @@ fn partial_cmp_list(arr1: &dyn Array, arr2: &dyn Array) ->
Option<Ordering> {
Some(arr1.len().cmp(&arr2.len()))
}
-fn partial_cmp_struct(s1: &Arc<StructArray>, s2: &Arc<StructArray>) ->
Option<Ordering> {
+fn expand_struct_columns<'a>(array: &'a StructArray, columns: &mut Vec<&'a
ArrayRef>) {
Review Comment:
I think a more standard name for this operation is "flatten" but since this
is an internal function I think it is fine too
##########
datafusion/functions-aggregate/src/min_max.rs:
##########
@@ -619,6 +625,45 @@ fn min_batch(values: &ArrayRef) -> Result<ScalarValue> {
})
}
+fn min_max_batch_struct(array: &ArrayRef, ordering: Ordering) ->
Result<ScalarValue> {
+ if array.len() == array.null_count() {
+ return ScalarValue::try_from(array.data_type());
+ }
+ let mut extreme = ScalarValue::try_from_array(array, 0)?;
+ for i in 1..array.len() {
+ let current = ScalarValue::try_from_array(array, i)?;
+ if current.is_null() {
+ continue;
+ }
+ if extreme.is_null() {
+ extreme = current;
+ continue;
+ }
+ if let Some(cmp) = extreme.partial_cmp(¤t) {
+ if cmp == ordering {
+ extreme = current;
+ }
+ }
+ }
+ // use deep_clone to free array reference
+ Ok(extreme.deep_clone())
+}
+
+macro_rules! min_max_struct {
+ ($VALUE:expr, $DELTA:expr, $OP:ident) => {{
+ if $VALUE.is_null() {
+ $DELTA.clone()
+ } else if $DELTA.is_null() {
+ $VALUE.clone()
+ } else {
+ match $VALUE.partial_cmp(&$DELTA) {
+ Some(choose_min_max!($OP)) => $DELTA.clone(),
Review Comment:
should this also use deep_clone?
##########
datafusion/functions-aggregate/src/min_max.rs:
##########
@@ -619,6 +625,45 @@ fn min_batch(values: &ArrayRef) -> Result<ScalarValue> {
})
}
+fn min_max_batch_struct(array: &ArrayRef, ordering: Ordering) ->
Result<ScalarValue> {
+ if array.len() == array.null_count() {
+ return ScalarValue::try_from(array.data_type());
+ }
+ let mut extreme = ScalarValue::try_from_array(array, 0)?;
+ for i in 1..array.len() {
+ let current = ScalarValue::try_from_array(array, i)?;
+ if current.is_null() {
+ continue;
+ }
+ if extreme.is_null() {
+ extreme = current;
+ continue;
+ }
+ if let Some(cmp) = extreme.partial_cmp(¤t) {
+ if cmp == ordering {
+ extreme = current;
+ }
+ }
+ }
+ // use deep_clone to free array reference
+ Ok(extreme.deep_clone())
+}
+
+macro_rules! min_max_struct {
+ ($VALUE:expr, $DELTA:expr, $OP:ident) => {{
+ if $VALUE.is_null() {
+ $DELTA.clone()
+ } else if $DELTA.is_null() {
+ $VALUE.clone()
+ } else {
+ match $VALUE.partial_cmp(&$DELTA) {
+ Some(choose_min_max!($OP)) => $DELTA.clone(),
+ _ => $VALUE.clone(),
+ }
+ }
+ }};
+}
+
/// dynamically-typed max(array) -> ScalarValue
pub fn max_batch(values: &ArrayRef) -> Result<ScalarValue> {
Review Comment:
I wonder if we could potentially remove the duplicated code in this function
by calling into MinAccumulator / max accumulator as necessary. I can't remember
why there is a second copy of this code
This might be something good to do as a follow on PR
##########
datafusion/sqllogictest/test_files/aggregate.slt:
##########
@@ -6923,3 +6923,61 @@ select c2, count(*) from test WHERE 1 = 1 group by c2;
4 1
5 1
6 1
+
+# Min/Max struct
+query ?? rowsort
+WITH t AS (SELECT i as c1, i + 1 as c2 FROM generate_series(1, 10) t(i))
+SELECT MIN(c), MAX(c) FROM (SELECT STRUCT(c1 AS 'a', c2 AS 'b') AS c FROM t)
+----
+{a: 1, b: 2} {a: 10, b: 11}
+
+# Min/Max struct with NULL
+query ?? rowsort
+WITH t AS (SELECT i as c1, i + 1 as c2 FROM generate_series(1, 10) t(i))
+SELECT MIN(c), MAX(c) FROM (SELECT CASE WHEN c1 % 2 == 0 THEN STRUCT(c1 AS
'a', c2 AS 'b') ELSE NULL END AS c FROM t)
+----
+{a: 2, b: 3} {a: 10, b: 11}
+
+# Min/Max struct with two recordbatch
+query ?? rowsort
+SELECT MIN(c), MAX(c) FROM (SELECT STRUCT(1 as 'a', 2 as 'b') AS c UNION
SELECT STRUCT(3 as 'a', 4 as 'b') AS c )
+----
+{a: 1, b: 2} {a: 3, b: 4}
+
+# Min/Max struct empty
+query ?? rowsort
+SELECT MIN(c), MAX(c) FROM (SELECT * FROM (SELECT STRUCT(1 as 'a', 2 as 'b')
AS c) LIMIT 0)
+----
+NULL NULL
+
+# Min/Max group struct
+query I?? rowsort
+WITH t AS (SELECT i as c1, i + 1 as c2 FROM generate_series(1, 10) t(i))
+SELECT key, MIN(c), MAX(c) FROM (SELECT STRUCT(c1 AS 'a', c2 AS 'b') AS c, (c1
% 2) AS key FROM t) GROUP BY key
+----
+0 {a: 2, b: 3} {a: 10, b: 11}
+1 {a: 1, b: 2} {a: 9, b: 10}
+
+# Min/Max group struct with NULL
+query I?? rowsort
+WITH t AS (SELECT i as c1, i + 1 as c2 FROM generate_series(1, 10) t(i))
+SELECT key, MIN(c), MAX(c) FROM (SELECT CASE WHEN c1 % 2 == 0 THEN STRUCT(c1
AS 'a', c2 AS 'b') ELSE NULL END AS c, (c1 % 2) AS key FROM t) GROUP BY key
+----
+0 {a: 2, b: 3} {a: 10, b: 11}
+1 NULL NULL
+
+# Min/Max group struct with NULL
+query I?? rowsort
+WITH t AS (SELECT i as c1, i + 1 as c2 FROM generate_series(1, 10) t(i))
+SELECT key, MIN(c), MAX(c) FROM (SELECT CASE WHEN c1 % 3 == 0 THEN STRUCT(c1
AS 'a', c2 AS 'b') ELSE NULL END AS c, (c1 % 2) AS key FROM t) GROUP BY key
+----
+0 {a: 6, b: 7} {a: 6, b: 7}
+1 {a: 3, b: 4} {a: 9, b: 10}
+
+# Min/Max struct empty
+query ?? rowsort
+WITH t AS (SELECT i as c1, i + 1 as c2 FROM generate_series(1, 10) t(i))
+SELECT MIN(c), MAX(c) FROM (SELECT STRUCT(c1 AS 'a', c2 AS 'b') AS c, (c1 % 2)
AS key FROM t LIMIT 0) GROUP BY key
+----
+
Review Comment:
Can. you please add additional tests for:
1. A struct with just one field (all the tests in this file so far have 2
fields)
2. Structs where the first field is equal, so the comparison has to go to
the second field
For 2 I am thinking something like this (where the first two fields are
equal)
```sql
> select min(column1),max(column1) from (
values
({"a":1,"b":2,"c":3}),
({"a":1,"b":2,"c":4})
);
+--------------------+--------------------+
| min(column1) | max(column1) |
+--------------------+--------------------+
| {a: 1, b: 2, c: 3} | {a: 1, b: 2, c: 4} |
+--------------------+--------------------+
1 row(s) fetched.
Elapsed 0.020 seconds.
```
##########
datafusion/common/src/scalar/mod.rs:
##########
@@ -3415,6 +3434,58 @@ impl ScalarValue {
.map(|sv| sv.size() - size_of_val(sv))
.sum::<usize>()
}
+
+ /// Performs a deep clone of the ScalarValue, creating new copies of all
nested data structures.
+ /// This is different from the standard `clone()` which may share data
through `Arc`.
+ /// Aggregation functions like `max` will cost a lot of memory if the data
is not cloned.
+ pub fn deep_clone(&self) -> Self {
Review Comment:
I think `deep_clone` is a fine name. Another one might be `force_clone` or
something to indicate it is cloning the underlying data (not just array refs)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]