This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 7b49031bf41bbe8133afc1ffa216e1dc58c58010 Author: yuzelin <[email protected]> AuthorDate: Tue Sep 23 17:15:23 2025 +0800 [core] nested-update supports limit the input (#6309) --- .../primary-key-table/merge-engine/aggregation.md | 4 +++ .../main/java/org/apache/paimon/CoreOptions.java | 9 +++++ .../compact/aggregate/FieldNestedUpdateAgg.java | 31 ++++++++++++++-- .../factory/FieldNestedUpdateAggFactory.java | 9 +++-- .../compact/aggregate/FieldAggregatorTest.java | 41 ++++++++++++++++++++-- 5 files changed, 87 insertions(+), 7 deletions(-) diff --git a/docs/content/primary-key-table/merge-engine/aggregation.md b/docs/content/primary-key-table/merge-engine/aggregation.md index 92e0f79f64..a575e48738 100644 --- a/docs/content/primary-key-table/merge-engine/aggregation.md +++ b/docs/content/primary-key-table/merge-engine/aggregation.md @@ -307,6 +307,10 @@ public static class BitmapContainsUDF extends ScalarFunction { Use `fields.<field-name>.nested-key=pk0,pk1,...` to specify the primary keys of the nested table. If no keys, row will be appended to array<row>. + Use `fields.<field-name>.count-limit=<Interger>` to specify the maximum number of rows in the nested table. When no nested-key, it will select data + sequentially up to limit; but if nested-key is specified, it cannot guarantee the correctness of the aggregation result. This option can be used to + avoid abnormal input. + An example: {{< tabs "nested_update-example" >}} diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index e2e141f141..ad7bb024f2 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -81,6 +81,8 @@ public class CoreOptions implements Serializable { public static final String NESTED_KEY = "nested-key"; + public static final String COUNT_LIMIT = "count-limit"; + public static final String DISTINCT = "distinct"; public static final String LIST_AGG_DELIMITER = "list-agg-delimiter"; @@ -2189,6 +2191,13 @@ public class CoreOptions implements Serializable { return Arrays.stream(keyString.split(",")).map(String::trim).collect(Collectors.toList()); } + public int fieldNestedUpdateAggCountLimit(String fieldName) { + return options.get( + key(FIELDS_PREFIX + "." + fieldName + "." + COUNT_LIMIT) + .intType() + .defaultValue(Integer.MAX_VALUE)); + } + public boolean fieldCollectAggDistinct(String fieldName) { return options.get( key(FIELDS_PREFIX + "." + fieldName + "." + DISTINCT) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java index 005bf7b17f..b2848b35b4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java @@ -36,6 +36,7 @@ import java.util.Map; import static org.apache.paimon.codegen.CodeGenUtils.newProjection; import static org.apache.paimon.codegen.CodeGenUtils.newRecordEqualiser; +import static org.apache.paimon.options.ConfigOptions.key; import static org.apache.paimon.utils.Preconditions.checkNotNull; /** @@ -51,7 +52,10 @@ public class FieldNestedUpdateAgg extends FieldAggregator { @Nullable private final Projection keyProjection; @Nullable private final RecordEqualiser elementEqualiser; - public FieldNestedUpdateAgg(String name, ArrayType dataType, List<String> nestedKey) { + private final int countLimit; + + public FieldNestedUpdateAgg( + String name, ArrayType dataType, List<String> nestedKey, int countLimit) { super(name, dataType); RowType nestedType = (RowType) dataType.getElementType(); this.nestedFields = nestedType.getFieldCount(); @@ -62,6 +66,9 @@ public class FieldNestedUpdateAgg extends FieldAggregator { this.keyProjection = newProjection(nestedType, nestedKey); this.elementEqualiser = null; } + + // If deduplicate key is set, we don't guarantee that the result is exactly right + this.countLimit = countLimit; } @Override @@ -73,9 +80,15 @@ public class FieldNestedUpdateAgg extends FieldAggregator { InternalArray acc = (InternalArray) accumulator; InternalArray input = (InternalArray) inputField; + if (acc.size() >= countLimit) { + return accumulator; + } + + int remainCount = countLimit - acc.size(); + List<InternalRow> rows = new ArrayList<>(acc.size() + input.size()); addNonNullRows(acc, rows); - addNonNullRows(input, rows); + addNonNullRows(input, rows, remainCount); if (keyProjection != null) { Map<BinaryRow, InternalRow> map = new HashMap<>(); @@ -141,4 +154,18 @@ public class FieldNestedUpdateAgg extends FieldAggregator { rows.add(array.getRow(i, nestedFields)); } } + + private void addNonNullRows(InternalArray array, List<InternalRow> rows, int remainSize) { + int count = 0; + for (int i = 0; i < array.size(); i++) { + if (count >= remainSize) { + return; + } + if (array.isNullAt(i)) { + continue; + } + rows.add(array.getRow(i, nestedFields)); + count++; + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java index 43d8eb429f..070931e011 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java @@ -36,7 +36,10 @@ public class FieldNestedUpdateAggFactory implements FieldAggregatorFactory { @Override public FieldNestedUpdateAgg create(DataType fieldType, CoreOptions options, String field) { - return createFieldNestedUpdateAgg(fieldType, options.fieldNestedUpdateAggNestedKey(field)); + return createFieldNestedUpdateAgg( + fieldType, + options.fieldNestedUpdateAggNestedKey(field), + options.fieldNestedUpdateAggCountLimit(field)); } @Override @@ -45,7 +48,7 @@ public class FieldNestedUpdateAggFactory implements FieldAggregatorFactory { } private FieldNestedUpdateAgg createFieldNestedUpdateAgg( - DataType fieldType, List<String> nestedKey) { + DataType fieldType, List<String> nestedKey, int countLimit) { if (nestedKey == null) { nestedKey = Collections.emptyList(); } @@ -56,6 +59,6 @@ public class FieldNestedUpdateAggFactory implements FieldAggregatorFactory { ArrayType arrayType = (ArrayType) fieldType; checkArgument(arrayType.getElementType() instanceof RowType, typeErrorMsg, fieldType); - return new FieldNestedUpdateAgg(identifier(), arrayType, nestedKey); + return new FieldNestedUpdateAgg(identifier(), arrayType, nestedKey, countLimit); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java index 166b4479c8..4e09b993d5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java @@ -345,7 +345,8 @@ public class FieldAggregatorTest { DataTypes.FIELD(0, "k0", DataTypes.INT()), DataTypes.FIELD(1, "k1", DataTypes.INT()), DataTypes.FIELD(2, "v", DataTypes.STRING()))), - Arrays.asList("k0", "k1")); + Arrays.asList("k0", "k1"), + Integer.MAX_VALUE); InternalArray accumulator; InternalArray.ElementGetter elementGetter = @@ -383,7 +384,8 @@ public class FieldAggregatorTest { new FieldNestedUpdateAgg( FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY(elementRowType), - Collections.emptyList()); + Collections.emptyList(), + Integer.MAX_VALUE); InternalArray accumulator = null; InternalArray.ElementGetter elementGetter = @@ -405,6 +407,41 @@ public class FieldAggregatorTest { .containsExactlyInAnyOrderElementsOf(Collections.singletonList(row(0, 1, "B"))); } + @Test + public void testFieldNestedAppendAggWithCountLimit() { + DataType elementRowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k0", DataTypes.INT()), + DataTypes.FIELD(1, "k1", DataTypes.INT()), + DataTypes.FIELD(2, "v", DataTypes.STRING())); + FieldNestedUpdateAgg agg = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Collections.emptyList(), + 2); + + InternalArray accumulator = null; + InternalArray.ElementGetter elementGetter = + InternalArray.createElementGetter(elementRowType); + + InternalRow current = row(0, 1, "B"); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf(Collections.singletonList(row(0, 1, "B"))); + + current = row(0, 1, "b"); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf(Arrays.asList(row(0, 1, "B"), row(0, 1, "b"))); + + // count limit is 2, so the third element will be dropped + current = row(0, 1, "C"); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(current)); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf(Arrays.asList(row(0, 1, "B"), row(0, 1, "b"))); + } + private List<Object> unnest(InternalArray array, InternalArray.ElementGetter elementGetter) { return IntStream.range(0, array.size()) .mapToObj(i -> elementGetter.getElementOrNull(array, i))
