This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 93d2623cd6 [core] Introduce 'nested_partial_update' agg func (#6924)
93d2623cd6 is described below
commit 93d2623cd6d79b4e883d58a2eb315d150168ccf1
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Dec 29 17:09:51 2025 +0800
[core] Introduce 'nested_partial_update' agg func (#6924)
---
.../primary-key-table/merge-engine/aggregation.md | 5 +
.../aggregate/FieldNestedPartialUpdateAgg.java | 108 +++++++++++++++++++++
.../FieldNestedPartialUpdateAggFactory.java | 58 +++++++++++
.../services/org.apache.paimon.factories.Factory | 1 +
.../compact/aggregate/FieldAggregatorTest.java | 45 ++++++++-
5 files changed, 216 insertions(+), 1 deletion(-)
diff --git a/docs/content/primary-key-table/merge-engine/aggregation.md
b/docs/content/primary-key-table/merge-engine/aggregation.md
index ed845eebfb..fd8414ca1d 100644
--- a/docs/content/primary-key-table/merge-engine/aggregation.md
+++ b/docs/content/primary-key-table/merge-engine/aggregation.md
@@ -376,6 +376,11 @@ public static class BitmapContainsUDF extends
ScalarFunction {
{{< /tabs >}}
+### nested_partial_update
+ The nested_partial_update function collects multiple rows into one
array<row> (so-called 'nested table'). It supports
+ ARRAY<ROW> data types. You need to use
`fields.<field-name>.nested-key=pk0,pk1,...` to specify the primary keys of the
+ nested table. The values in each row are written by partial updating some
columns.
+
### collect
The collect function collects elements into an Array. You can set
`fields.<field-name>.distinct=true` to deduplicate elements.
It only supports ARRAY type.
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedPartialUpdateAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedPartialUpdateAgg.java
new file mode 100644
index 0000000000..d8e058e839
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedPartialUpdateAgg.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact.aggregate;
+
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalRow.FieldGetter;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.codegen.CodeGenUtils.newProjection;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * Used to partial update a field which representing a nested table. The data
type of nested table
+ * field is {@code ARRAY<ROW>}.
+ */
+public class FieldNestedPartialUpdateAgg extends FieldAggregator {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int nestedFields;
+ private final Projection keyProjection;
+ private final FieldGetter[] fieldGetters;
+
+ public FieldNestedPartialUpdateAgg(String name, ArrayType dataType,
List<String> nestedKey) {
+ super(name, dataType);
+ RowType nestedType = (RowType) dataType.getElementType();
+ this.nestedFields = nestedType.getFieldCount();
+ checkArgument(!nestedKey.isEmpty());
+ this.keyProjection = newProjection(nestedType, nestedKey);
+ this.fieldGetters = new FieldGetter[nestedFields];
+ for (int i = 0; i < nestedFields; i++) {
+ fieldGetters[i] =
InternalRow.createFieldGetter(nestedType.getTypeAt(i), i);
+ }
+ }
+
+ @Override
+ public Object agg(Object accumulator, Object inputField) {
+ if (accumulator == null || inputField == null) {
+ return accumulator == null ? inputField : accumulator;
+ }
+
+ InternalArray acc = (InternalArray) accumulator;
+ InternalArray input = (InternalArray) inputField;
+
+ List<InternalRow> rows = new ArrayList<>(acc.size() + input.size());
+ addNonNullRows(acc, rows);
+ addNonNullRows(input, rows);
+
+ if (keyProjection != null) {
+ Map<BinaryRow, GenericRow> map = new HashMap<>();
+ for (InternalRow row : rows) {
+ BinaryRow key = keyProjection.apply(row).copy();
+ GenericRow toUpdate = map.computeIfAbsent(key, k -> new
GenericRow(nestedFields));
+ partialUpdate(toUpdate, row);
+ }
+
+ rows = new ArrayList<>(map.values());
+ }
+
+ return new GenericArray(rows.toArray());
+ }
+
+ private void addNonNullRows(InternalArray array, List<InternalRow> rows) {
+ for (int i = 0; i < array.size(); i++) {
+ if (array.isNullAt(i)) {
+ continue;
+ }
+ rows.add(array.getRow(i, nestedFields));
+ }
+ }
+
+ private void partialUpdate(GenericRow toUpdate, InternalRow input) {
+ for (int i = 0; i < fieldGetters.length; i++) {
+ FieldGetter fieldGetter = fieldGetters[i];
+ Object field = fieldGetter.getFieldOrNull(input);
+ if (field != null) {
+ toUpdate.setField(i, field);
+ }
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedPartialUpdateAggFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedPartialUpdateAggFactory.java
new file mode 100644
index 0000000000..80b899a8b2
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedPartialUpdateAggFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact.aggregate.factory;
+
+import org.apache.paimon.CoreOptions;
+import
org.apache.paimon.mergetree.compact.aggregate.FieldNestedPartialUpdateAgg;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+
+import java.util.List;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Factory for #{@link FieldNestedPartialUpdateAgg}. */
+public class FieldNestedPartialUpdateAggFactory implements
FieldAggregatorFactory {
+
+ public static final String NAME = "nested_partial_update";
+
+ @Override
+ public FieldNestedPartialUpdateAgg create(
+ DataType fieldType, CoreOptions options, String field) {
+ return createFieldNestedPartialUpdateAgg(
+ fieldType, options.fieldNestedUpdateAggNestedKey(field));
+ }
+
+ @Override
+ public String identifier() {
+ return NAME;
+ }
+
+ private FieldNestedPartialUpdateAgg createFieldNestedPartialUpdateAgg(
+ DataType fieldType, List<String> nestedKey) {
+ checkArgument(!nestedKey.isEmpty());
+ String typeErrorMsg =
+ "Data type for nested table column must be 'Array<Row>' but
was '%s'.";
+ checkArgument(fieldType instanceof ArrayType, typeErrorMsg, fieldType);
+ ArrayType arrayType = (ArrayType) fieldType;
+ checkArgument(arrayType.getElementType() instanceof RowType,
typeErrorMsg, fieldType);
+ return new FieldNestedPartialUpdateAgg(identifier(), arrayType,
nestedKey);
+ }
+}
diff --git
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 7f221e5172..3bfb3480de 100644
---
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -30,6 +30,7 @@
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMaxAggFactory
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMergeMapAggFactory
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMinAggFactory
org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedUpdateAggFactory
+org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedPartialUpdateAggFactory
org.apache.paimon.mergetree.compact.aggregate.factory.FieldPrimaryKeyAggFactory
org.apache.paimon.mergetree.compact.aggregate.factory.FieldProductAggFactory
org.apache.paimon.mergetree.compact.aggregate.factory.FieldRoaringBitmap32AggFactory
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
index 0b838c3609..8541954be8 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
@@ -40,6 +40,7 @@ import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldListaggAggFact
import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMaxAggFactory;
import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMergeMapAggFactory;
import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMinAggFactory;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedPartialUpdateAggFactory;
import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedUpdateAggFactory;
import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldProductAggFactory;
import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldRoaringBitmap32AggFactory;
@@ -626,7 +627,7 @@ public class FieldAggregatorTest {
return new GenericArray(new InternalRow[] {row});
}
- private InternalRow row(int k0, int k1, String v) {
+ private InternalRow row(Integer k0, Integer k1, String v) {
return GenericRow.of(k0, k1, BinaryString.fromString(v));
}
@@ -1154,6 +1155,48 @@ public class FieldAggregatorTest {
assertThat(agg).isEqualTo("test");
}
+ @Test
+ public void testFieldNestedPartialUpdateAgg() {
+ DataType elementRowType =
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "k", DataTypes.INT()),
+ DataTypes.FIELD(1, "v1", DataTypes.INT()),
+ DataTypes.FIELD(2, "v2", DataTypes.STRING()));
+ FieldNestedPartialUpdateAgg agg =
+ new FieldNestedPartialUpdateAgg(
+ FieldNestedPartialUpdateAggFactory.NAME,
+ DataTypes.ARRAY(
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "k",
DataTypes.INT()),
+ DataTypes.FIELD(1, "v1",
DataTypes.INT()),
+ DataTypes.FIELD(2, "v2",
DataTypes.STRING()))),
+ Collections.singletonList("k"));
+
+ InternalArray accumulator;
+ InternalArray.ElementGetter elementGetter =
+ InternalArray.createElementGetter(elementRowType);
+
+ InternalRow current = row(0, 0, null);
+ accumulator = (InternalArray) agg.agg(null, singletonArray(current));
+ assertThat(unnest(accumulator, elementGetter))
+
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(current));
+
+ current = row(0, null, "A");
+ accumulator = (InternalArray) agg.agg(accumulator,
singletonArray(current));
+ assertThat(unnest(accumulator, elementGetter))
+
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(row(0, 0, "A")));
+
+ current = row(0, 1, "B");
+ accumulator = (InternalArray) agg.agg(accumulator,
singletonArray(current));
+ assertThat(unnest(accumulator, elementGetter))
+
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(row(0, 1, "B")));
+
+ current = row(1, 2, "C");
+ accumulator = (InternalArray) agg.agg(accumulator,
singletonArray(current));
+ assertThat(unnest(accumulator, elementGetter))
+ .containsExactlyInAnyOrderElementsOf(Arrays.asList(row(0, 1,
"B"), row(1, 2, "C")));
+ }
+
private Map<Object, Object> toMap(Object... kvs) {
Map<Object, Object> result = new HashMap<>();
for (int i = 0; i < kvs.length; i += 2) {