This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 7b49031bf41bbe8133afc1ffa216e1dc58c58010
Author: yuzelin <[email protected]>
AuthorDate: Tue Sep 23 17:15:23 2025 +0800

    [core] nested-update supports limit the input (#6309)
---
 .../primary-key-table/merge-engine/aggregation.md  |  4 +++
 .../main/java/org/apache/paimon/CoreOptions.java   |  9 +++++
 .../compact/aggregate/FieldNestedUpdateAgg.java    | 31 ++++++++++++++--
 .../factory/FieldNestedUpdateAggFactory.java       |  9 +++--
 .../compact/aggregate/FieldAggregatorTest.java     | 41 ++++++++++++++++++++--
 5 files changed, 87 insertions(+), 7 deletions(-)

diff --git a/docs/content/primary-key-table/merge-engine/aggregation.md 
b/docs/content/primary-key-table/merge-engine/aggregation.md
index 92e0f79f64..a575e48738 100644
--- a/docs/content/primary-key-table/merge-engine/aggregation.md
+++ b/docs/content/primary-key-table/merge-engine/aggregation.md
@@ -307,6 +307,10 @@ public static class BitmapContainsUDF extends 
ScalarFunction {
 
   Use `fields.<field-name>.nested-key=pk0,pk1,...` to specify the primary keys 
of the nested table. If no keys, row will be appended to array<row>.
 
+  Use `fields.<field-name>.count-limit=<Interger>` to specify the maximum 
number of rows in the nested table. When no nested-key, it will select data
+  sequentially up to limit; but if nested-key is specified, it cannot 
guarantee the correctness of the aggregation result. This option can be used to
+  avoid abnormal input.
+
   An example:
 
   {{< tabs "nested_update-example" >}}
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index e2e141f141..ad7bb024f2 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -81,6 +81,8 @@ public class CoreOptions implements Serializable {
 
     public static final String NESTED_KEY = "nested-key";
 
+    public static final String COUNT_LIMIT = "count-limit";
+
     public static final String DISTINCT = "distinct";
 
     public static final String LIST_AGG_DELIMITER = "list-agg-delimiter";
@@ -2189,6 +2191,13 @@ public class CoreOptions implements Serializable {
         return 
Arrays.stream(keyString.split(",")).map(String::trim).collect(Collectors.toList());
     }
 
+    public int fieldNestedUpdateAggCountLimit(String fieldName) {
+        return options.get(
+                key(FIELDS_PREFIX + "." + fieldName + "." + COUNT_LIMIT)
+                        .intType()
+                        .defaultValue(Integer.MAX_VALUE));
+    }
+
     public boolean fieldCollectAggDistinct(String fieldName) {
         return options.get(
                 key(FIELDS_PREFIX + "." + fieldName + "." + DISTINCT)
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java
index 005bf7b17f..b2848b35b4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java
@@ -36,6 +36,7 @@ import java.util.Map;
 
 import static org.apache.paimon.codegen.CodeGenUtils.newProjection;
 import static org.apache.paimon.codegen.CodeGenUtils.newRecordEqualiser;
+import static org.apache.paimon.options.ConfigOptions.key;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /**
@@ -51,7 +52,10 @@ public class FieldNestedUpdateAgg extends FieldAggregator {
     @Nullable private final Projection keyProjection;
     @Nullable private final RecordEqualiser elementEqualiser;
 
-    public FieldNestedUpdateAgg(String name, ArrayType dataType, List<String> 
nestedKey) {
+    private final int countLimit;
+
+    public FieldNestedUpdateAgg(
+            String name, ArrayType dataType, List<String> nestedKey, int 
countLimit) {
         super(name, dataType);
         RowType nestedType = (RowType) dataType.getElementType();
         this.nestedFields = nestedType.getFieldCount();
@@ -62,6 +66,9 @@ public class FieldNestedUpdateAgg extends FieldAggregator {
             this.keyProjection = newProjection(nestedType, nestedKey);
             this.elementEqualiser = null;
         }
+
+        // If deduplicate key is set, we don't guarantee that the result is 
exactly right
+        this.countLimit = countLimit;
     }
 
     @Override
@@ -73,9 +80,15 @@ public class FieldNestedUpdateAgg extends FieldAggregator {
         InternalArray acc = (InternalArray) accumulator;
         InternalArray input = (InternalArray) inputField;
 
+        if (acc.size() >= countLimit) {
+            return accumulator;
+        }
+
+        int remainCount = countLimit - acc.size();
+
         List<InternalRow> rows = new ArrayList<>(acc.size() + input.size());
         addNonNullRows(acc, rows);
-        addNonNullRows(input, rows);
+        addNonNullRows(input, rows, remainCount);
 
         if (keyProjection != null) {
             Map<BinaryRow, InternalRow> map = new HashMap<>();
@@ -141,4 +154,18 @@ public class FieldNestedUpdateAgg extends FieldAggregator {
             rows.add(array.getRow(i, nestedFields));
         }
     }
+
+    private void addNonNullRows(InternalArray array, List<InternalRow> rows, 
int remainSize) {
+        int count = 0;
+        for (int i = 0; i < array.size(); i++) {
+            if (count >= remainSize) {
+                return;
+            }
+            if (array.isNullAt(i)) {
+                continue;
+            }
+            rows.add(array.getRow(i, nestedFields));
+            count++;
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java
index 43d8eb429f..070931e011 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java
@@ -36,7 +36,10 @@ public class FieldNestedUpdateAggFactory implements 
FieldAggregatorFactory {
 
     @Override
     public FieldNestedUpdateAgg create(DataType fieldType, CoreOptions 
options, String field) {
-        return createFieldNestedUpdateAgg(fieldType, 
options.fieldNestedUpdateAggNestedKey(field));
+        return createFieldNestedUpdateAgg(
+                fieldType,
+                options.fieldNestedUpdateAggNestedKey(field),
+                options.fieldNestedUpdateAggCountLimit(field));
     }
 
     @Override
@@ -45,7 +48,7 @@ public class FieldNestedUpdateAggFactory implements 
FieldAggregatorFactory {
     }
 
     private FieldNestedUpdateAgg createFieldNestedUpdateAgg(
-            DataType fieldType, List<String> nestedKey) {
+            DataType fieldType, List<String> nestedKey, int countLimit) {
         if (nestedKey == null) {
             nestedKey = Collections.emptyList();
         }
@@ -56,6 +59,6 @@ public class FieldNestedUpdateAggFactory implements 
FieldAggregatorFactory {
         ArrayType arrayType = (ArrayType) fieldType;
         checkArgument(arrayType.getElementType() instanceof RowType, 
typeErrorMsg, fieldType);
 
-        return new FieldNestedUpdateAgg(identifier(), arrayType, nestedKey);
+        return new FieldNestedUpdateAgg(identifier(), arrayType, nestedKey, 
countLimit);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
index 166b4479c8..4e09b993d5 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
@@ -345,7 +345,8 @@ public class FieldAggregatorTest {
                                         DataTypes.FIELD(0, "k0", 
DataTypes.INT()),
                                         DataTypes.FIELD(1, "k1", 
DataTypes.INT()),
                                         DataTypes.FIELD(2, "v", 
DataTypes.STRING()))),
-                        Arrays.asList("k0", "k1"));
+                        Arrays.asList("k0", "k1"),
+                        Integer.MAX_VALUE);
 
         InternalArray accumulator;
         InternalArray.ElementGetter elementGetter =
@@ -383,7 +384,8 @@ public class FieldAggregatorTest {
                 new FieldNestedUpdateAgg(
                         FieldNestedUpdateAggFactory.NAME,
                         DataTypes.ARRAY(elementRowType),
-                        Collections.emptyList());
+                        Collections.emptyList(),
+                        Integer.MAX_VALUE);
 
         InternalArray accumulator = null;
         InternalArray.ElementGetter elementGetter =
@@ -405,6 +407,41 @@ public class FieldAggregatorTest {
                 
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(row(0, 1, "B")));
     }
 
+    @Test
+    public void testFieldNestedAppendAggWithCountLimit() {
+        DataType elementRowType =
+                DataTypes.ROW(
+                        DataTypes.FIELD(0, "k0", DataTypes.INT()),
+                        DataTypes.FIELD(1, "k1", DataTypes.INT()),
+                        DataTypes.FIELD(2, "v", DataTypes.STRING()));
+        FieldNestedUpdateAgg agg =
+                new FieldNestedUpdateAgg(
+                        FieldNestedUpdateAggFactory.NAME,
+                        DataTypes.ARRAY(elementRowType),
+                        Collections.emptyList(),
+                        2);
+
+        InternalArray accumulator = null;
+        InternalArray.ElementGetter elementGetter =
+                InternalArray.createElementGetter(elementRowType);
+
+        InternalRow current = row(0, 1, "B");
+        accumulator = (InternalArray) agg.agg(accumulator, 
singletonArray(current));
+        assertThat(unnest(accumulator, elementGetter))
+                
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(row(0, 1, "B")));
+
+        current = row(0, 1, "b");
+        accumulator = (InternalArray) agg.agg(accumulator, 
singletonArray(current));
+        assertThat(unnest(accumulator, elementGetter))
+                .containsExactlyInAnyOrderElementsOf(Arrays.asList(row(0, 1, 
"B"), row(0, 1, "b")));
+
+        // count limit is 2, so the third element will be dropped
+        current = row(0, 1, "C");
+        accumulator = (InternalArray) agg.agg(accumulator, 
singletonArray(current));
+        assertThat(unnest(accumulator, elementGetter))
+                .containsExactlyInAnyOrderElementsOf(Arrays.asList(row(0, 1, 
"B"), row(0, 1, "b")));
+    }
+
     private List<Object> unnest(InternalArray array, 
InternalArray.ElementGetter elementGetter) {
         return IntStream.range(0, array.size())
                 .mapToObj(i -> elementGetter.getElementOrNull(array, i))

Reply via email to