This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 93d2623cd6 [core] Introduce 'nested_partial_update' agg func (#6924)
93d2623cd6 is described below

commit 93d2623cd6d79b4e883d58a2eb315d150168ccf1
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Dec 29 17:09:51 2025 +0800

    [core] Introduce 'nested_partial_update' agg func (#6924)
---
 .../primary-key-table/merge-engine/aggregation.md  |   5 +
 .../aggregate/FieldNestedPartialUpdateAgg.java     | 108 +++++++++++++++++++++
 .../FieldNestedPartialUpdateAggFactory.java        |  58 +++++++++++
 .../services/org.apache.paimon.factories.Factory   |   1 +
 .../compact/aggregate/FieldAggregatorTest.java     |  45 ++++++++-
 5 files changed, 216 insertions(+), 1 deletion(-)

diff --git a/docs/content/primary-key-table/merge-engine/aggregation.md 
b/docs/content/primary-key-table/merge-engine/aggregation.md
index ed845eebfb..fd8414ca1d 100644
--- a/docs/content/primary-key-table/merge-engine/aggregation.md
+++ b/docs/content/primary-key-table/merge-engine/aggregation.md
@@ -376,6 +376,11 @@ public static class BitmapContainsUDF extends 
ScalarFunction {
 
   {{< /tabs >}}
 
+### nested_partial_update
+  The nested_partial_update function collects multiple rows into one 
array<row> (so-called 'nested table'). It supports
+  ARRAY<ROW> data types. You need to use 
`fields.<field-name>.nested-key=pk0,pk1,...` to specify the primary keys of the
+  nested table. The values in each row are written by partial updating some 
columns.
+
 ### collect
   The collect function collects elements into an Array. You can set 
`fields.<field-name>.distinct=true` to deduplicate elements.
   It only supports ARRAY type.
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedPartialUpdateAgg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedPartialUpdateAgg.java
new file mode 100644
index 0000000000..d8e058e839
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedPartialUpdateAgg.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact.aggregate;
+
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalRow.FieldGetter;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.codegen.CodeGenUtils.newProjection;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * Used to partial update a field which representing a nested table. The data 
type of nested table
+ * field is {@code ARRAY<ROW>}.
+ */
+public class FieldNestedPartialUpdateAgg extends FieldAggregator {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int nestedFields;
+    private final Projection keyProjection;
+    private final FieldGetter[] fieldGetters;
+
+    public FieldNestedPartialUpdateAgg(String name, ArrayType dataType, 
List<String> nestedKey) {
+        super(name, dataType);
+        RowType nestedType = (RowType) dataType.getElementType();
+        this.nestedFields = nestedType.getFieldCount();
+        checkArgument(!nestedKey.isEmpty());
+        this.keyProjection = newProjection(nestedType, nestedKey);
+        this.fieldGetters = new FieldGetter[nestedFields];
+        for (int i = 0; i < nestedFields; i++) {
+            fieldGetters[i] = 
InternalRow.createFieldGetter(nestedType.getTypeAt(i), i);
+        }
+    }
+
+    @Override
+    public Object agg(Object accumulator, Object inputField) {
+        if (accumulator == null || inputField == null) {
+            return accumulator == null ? inputField : accumulator;
+        }
+
+        InternalArray acc = (InternalArray) accumulator;
+        InternalArray input = (InternalArray) inputField;
+
+        List<InternalRow> rows = new ArrayList<>(acc.size() + input.size());
+        addNonNullRows(acc, rows);
+        addNonNullRows(input, rows);
+
+        if (keyProjection != null) {
+            Map<BinaryRow, GenericRow> map = new HashMap<>();
+            for (InternalRow row : rows) {
+                BinaryRow key = keyProjection.apply(row).copy();
+                GenericRow toUpdate = map.computeIfAbsent(key, k -> new 
GenericRow(nestedFields));
+                partialUpdate(toUpdate, row);
+            }
+
+            rows = new ArrayList<>(map.values());
+        }
+
+        return new GenericArray(rows.toArray());
+    }
+
+    private void addNonNullRows(InternalArray array, List<InternalRow> rows) {
+        for (int i = 0; i < array.size(); i++) {
+            if (array.isNullAt(i)) {
+                continue;
+            }
+            rows.add(array.getRow(i, nestedFields));
+        }
+    }
+
+    private void partialUpdate(GenericRow toUpdate, InternalRow input) {
+        for (int i = 0; i < fieldGetters.length; i++) {
+            FieldGetter fieldGetter = fieldGetters[i];
+            Object field = fieldGetter.getFieldOrNull(input);
+            if (field != null) {
+                toUpdate.setField(i, field);
+            }
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedPartialUpdateAggFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedPartialUpdateAggFactory.java
new file mode 100644
index 0000000000..80b899a8b2
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedPartialUpdateAggFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact.aggregate.factory;
+
+import org.apache.paimon.CoreOptions;
+import 
org.apache.paimon.mergetree.compact.aggregate.FieldNestedPartialUpdateAgg;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+
+import java.util.List;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Factory for #{@link FieldNestedPartialUpdateAgg}. */
+public class FieldNestedPartialUpdateAggFactory implements 
FieldAggregatorFactory {
+
+    public static final String NAME = "nested_partial_update";
+
+    @Override
+    public FieldNestedPartialUpdateAgg create(
+            DataType fieldType, CoreOptions options, String field) {
+        return createFieldNestedPartialUpdateAgg(
+                fieldType, options.fieldNestedUpdateAggNestedKey(field));
+    }
+
+    @Override
+    public String identifier() {
+        return NAME;
+    }
+
+    private FieldNestedPartialUpdateAgg createFieldNestedPartialUpdateAgg(
+            DataType fieldType, List<String> nestedKey) {
+        checkArgument(!nestedKey.isEmpty());
+        String typeErrorMsg =
+                "Data type for nested table column must be 'Array<Row>' but 
was '%s'.";
+        checkArgument(fieldType instanceof ArrayType, typeErrorMsg, fieldType);
+        ArrayType arrayType = (ArrayType) fieldType;
+        checkArgument(arrayType.getElementType() instanceof RowType, 
typeErrorMsg, fieldType);
+        return new FieldNestedPartialUpdateAgg(identifier(), arrayType, 
nestedKey);
+    }
+}
diff --git 
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 7f221e5172..3bfb3480de 100644
--- 
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -30,6 +30,7 @@ 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMaxAggFactory
 org.apache.paimon.mergetree.compact.aggregate.factory.FieldMergeMapAggFactory
 org.apache.paimon.mergetree.compact.aggregate.factory.FieldMinAggFactory
 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedUpdateAggFactory
+org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedPartialUpdateAggFactory
 org.apache.paimon.mergetree.compact.aggregate.factory.FieldPrimaryKeyAggFactory
 org.apache.paimon.mergetree.compact.aggregate.factory.FieldProductAggFactory
 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldRoaringBitmap32AggFactory
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
index 0b838c3609..8541954be8 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
@@ -40,6 +40,7 @@ import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldListaggAggFact
 import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMaxAggFactory;
 import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMergeMapAggFactory;
 import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMinAggFactory;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedPartialUpdateAggFactory;
 import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedUpdateAggFactory;
 import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldProductAggFactory;
 import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldRoaringBitmap32AggFactory;
@@ -626,7 +627,7 @@ public class FieldAggregatorTest {
         return new GenericArray(new InternalRow[] {row});
     }
 
-    private InternalRow row(int k0, int k1, String v) {
+    private InternalRow row(Integer k0, Integer k1, String v) {
         return GenericRow.of(k0, k1, BinaryString.fromString(v));
     }
 
@@ -1154,6 +1155,48 @@ public class FieldAggregatorTest {
         assertThat(agg).isEqualTo("test");
     }
 
+    @Test
+    public void testFieldNestedPartialUpdateAgg() {
+        DataType elementRowType =
+                DataTypes.ROW(
+                        DataTypes.FIELD(0, "k", DataTypes.INT()),
+                        DataTypes.FIELD(1, "v1", DataTypes.INT()),
+                        DataTypes.FIELD(2, "v2", DataTypes.STRING()));
+        FieldNestedPartialUpdateAgg agg =
+                new FieldNestedPartialUpdateAgg(
+                        FieldNestedPartialUpdateAggFactory.NAME,
+                        DataTypes.ARRAY(
+                                DataTypes.ROW(
+                                        DataTypes.FIELD(0, "k", 
DataTypes.INT()),
+                                        DataTypes.FIELD(1, "v1", 
DataTypes.INT()),
+                                        DataTypes.FIELD(2, "v2", 
DataTypes.STRING()))),
+                        Collections.singletonList("k"));
+
+        InternalArray accumulator;
+        InternalArray.ElementGetter elementGetter =
+                InternalArray.createElementGetter(elementRowType);
+
+        InternalRow current = row(0, 0, null);
+        accumulator = (InternalArray) agg.agg(null, singletonArray(current));
+        assertThat(unnest(accumulator, elementGetter))
+                
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(current));
+
+        current = row(0, null, "A");
+        accumulator = (InternalArray) agg.agg(accumulator, 
singletonArray(current));
+        assertThat(unnest(accumulator, elementGetter))
+                
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(row(0, 0, "A")));
+
+        current = row(0, 1, "B");
+        accumulator = (InternalArray) agg.agg(accumulator, 
singletonArray(current));
+        assertThat(unnest(accumulator, elementGetter))
+                
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(row(0, 1, "B")));
+
+        current = row(1, 2, "C");
+        accumulator = (InternalArray) agg.agg(accumulator, 
singletonArray(current));
+        assertThat(unnest(accumulator, elementGetter))
+                .containsExactlyInAnyOrderElementsOf(Arrays.asList(row(0, 1, 
"B"), row(1, 2, "C")));
+    }
+
     private Map<Object, Object> toMap(Object... kvs) {
         Map<Object, Object> result = new HashMap<>();
         for (int i = 0; i < kvs.length; i += 2) {

Reply via email to