This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 79cfadc459 [core] nested-update supports limit the input (#6309)
79cfadc459 is described below
commit 79cfadc45976accbbdb235a5ed62400a0922eba9
Author: yuzelin <[email protected]>
AuthorDate: Tue Sep 23 17:15:23 2025 +0800
[core] nested-update supports limit the input (#6309)
---
.../primary-key-table/merge-engine/aggregation.md | 4 +++
.../main/java/org/apache/paimon/CoreOptions.java | 9 +++++
.../compact/aggregate/FieldNestedUpdateAgg.java | 31 ++++++++++++++--
.../factory/FieldNestedUpdateAggFactory.java | 9 +++--
.../compact/aggregate/FieldAggregatorTest.java | 41 ++++++++++++++++++++--
5 files changed, 87 insertions(+), 7 deletions(-)
diff --git a/docs/content/primary-key-table/merge-engine/aggregation.md
b/docs/content/primary-key-table/merge-engine/aggregation.md
index 92e0f79f64..a575e48738 100644
--- a/docs/content/primary-key-table/merge-engine/aggregation.md
+++ b/docs/content/primary-key-table/merge-engine/aggregation.md
@@ -307,6 +307,10 @@ public static class BitmapContainsUDF extends
ScalarFunction {
Use `fields.<field-name>.nested-key=pk0,pk1,...` to specify the primary keys
of the nested table. If no keys, row will be appended to array<row>.
+ Use `fields.<field-name>.count-limit=<Interger>` to specify the maximum
number of rows in the nested table. When no nested-key, it will select data
+ sequentially up to limit; but if nested-key is specified, it cannot
guarantee the correctness of the aggregation result. This option can be used to
+ avoid abnormal input.
+
An example:
{{< tabs "nested_update-example" >}}
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index e2e141f141..ad7bb024f2 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -81,6 +81,8 @@ public class CoreOptions implements Serializable {
public static final String NESTED_KEY = "nested-key";
+ public static final String COUNT_LIMIT = "count-limit";
+
public static final String DISTINCT = "distinct";
public static final String LIST_AGG_DELIMITER = "list-agg-delimiter";
@@ -2189,6 +2191,13 @@ public class CoreOptions implements Serializable {
return
Arrays.stream(keyString.split(",")).map(String::trim).collect(Collectors.toList());
}
+ public int fieldNestedUpdateAggCountLimit(String fieldName) {
+ return options.get(
+ key(FIELDS_PREFIX + "." + fieldName + "." + COUNT_LIMIT)
+ .intType()
+ .defaultValue(Integer.MAX_VALUE));
+ }
+
public boolean fieldCollectAggDistinct(String fieldName) {
return options.get(
key(FIELDS_PREFIX + "." + fieldName + "." + DISTINCT)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java
index 005bf7b17f..b2848b35b4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java
@@ -36,6 +36,7 @@ import java.util.Map;
import static org.apache.paimon.codegen.CodeGenUtils.newProjection;
import static org.apache.paimon.codegen.CodeGenUtils.newRecordEqualiser;
+import static org.apache.paimon.options.ConfigOptions.key;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
/**
@@ -51,7 +52,10 @@ public class FieldNestedUpdateAgg extends FieldAggregator {
@Nullable private final Projection keyProjection;
@Nullable private final RecordEqualiser elementEqualiser;
- public FieldNestedUpdateAgg(String name, ArrayType dataType, List<String>
nestedKey) {
+ private final int countLimit;
+
+ public FieldNestedUpdateAgg(
+ String name, ArrayType dataType, List<String> nestedKey, int
countLimit) {
super(name, dataType);
RowType nestedType = (RowType) dataType.getElementType();
this.nestedFields = nestedType.getFieldCount();
@@ -62,6 +66,9 @@ public class FieldNestedUpdateAgg extends FieldAggregator {
this.keyProjection = newProjection(nestedType, nestedKey);
this.elementEqualiser = null;
}
+
+ // If deduplicate key is set, we don't guarantee that the result is
exactly right
+ this.countLimit = countLimit;
}
@Override
@@ -73,9 +80,15 @@ public class FieldNestedUpdateAgg extends FieldAggregator {
InternalArray acc = (InternalArray) accumulator;
InternalArray input = (InternalArray) inputField;
+ if (acc.size() >= countLimit) {
+ return accumulator;
+ }
+
+ int remainCount = countLimit - acc.size();
+
List<InternalRow> rows = new ArrayList<>(acc.size() + input.size());
addNonNullRows(acc, rows);
- addNonNullRows(input, rows);
+ addNonNullRows(input, rows, remainCount);
if (keyProjection != null) {
Map<BinaryRow, InternalRow> map = new HashMap<>();
@@ -141,4 +154,18 @@ public class FieldNestedUpdateAgg extends FieldAggregator {
rows.add(array.getRow(i, nestedFields));
}
}
+
+ private void addNonNullRows(InternalArray array, List<InternalRow> rows,
int remainSize) {
+ int count = 0;
+ for (int i = 0; i < array.size(); i++) {
+ if (count >= remainSize) {
+ return;
+ }
+ if (array.isNullAt(i)) {
+ continue;
+ }
+ rows.add(array.getRow(i, nestedFields));
+ count++;
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java
index 43d8eb429f..070931e011 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java
@@ -36,7 +36,10 @@ public class FieldNestedUpdateAggFactory implements
FieldAggregatorFactory {
@Override
public FieldNestedUpdateAgg create(DataType fieldType, CoreOptions
options, String field) {
- return createFieldNestedUpdateAgg(fieldType,
options.fieldNestedUpdateAggNestedKey(field));
+ return createFieldNestedUpdateAgg(
+ fieldType,
+ options.fieldNestedUpdateAggNestedKey(field),
+ options.fieldNestedUpdateAggCountLimit(field));
}
@Override
@@ -45,7 +48,7 @@ public class FieldNestedUpdateAggFactory implements
FieldAggregatorFactory {
}
private FieldNestedUpdateAgg createFieldNestedUpdateAgg(
- DataType fieldType, List<String> nestedKey) {
+ DataType fieldType, List<String> nestedKey, int countLimit) {
if (nestedKey == null) {
nestedKey = Collections.emptyList();
}
@@ -56,6 +59,6 @@ public class FieldNestedUpdateAggFactory implements
FieldAggregatorFactory {
ArrayType arrayType = (ArrayType) fieldType;
checkArgument(arrayType.getElementType() instanceof RowType,
typeErrorMsg, fieldType);
- return new FieldNestedUpdateAgg(identifier(), arrayType, nestedKey);
+ return new FieldNestedUpdateAgg(identifier(), arrayType, nestedKey,
countLimit);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
index 166b4479c8..4e09b993d5 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
@@ -345,7 +345,8 @@ public class FieldAggregatorTest {
DataTypes.FIELD(0, "k0",
DataTypes.INT()),
DataTypes.FIELD(1, "k1",
DataTypes.INT()),
DataTypes.FIELD(2, "v",
DataTypes.STRING()))),
- Arrays.asList("k0", "k1"));
+ Arrays.asList("k0", "k1"),
+ Integer.MAX_VALUE);
InternalArray accumulator;
InternalArray.ElementGetter elementGetter =
@@ -383,7 +384,8 @@ public class FieldAggregatorTest {
new FieldNestedUpdateAgg(
FieldNestedUpdateAggFactory.NAME,
DataTypes.ARRAY(elementRowType),
- Collections.emptyList());
+ Collections.emptyList(),
+ Integer.MAX_VALUE);
InternalArray accumulator = null;
InternalArray.ElementGetter elementGetter =
@@ -405,6 +407,41 @@ public class FieldAggregatorTest {
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(row(0, 1, "B")));
}
+ @Test
+ public void testFieldNestedAppendAggWithCountLimit() {
+ DataType elementRowType =
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "k0", DataTypes.INT()),
+ DataTypes.FIELD(1, "k1", DataTypes.INT()),
+ DataTypes.FIELD(2, "v", DataTypes.STRING()));
+ FieldNestedUpdateAgg agg =
+ new FieldNestedUpdateAgg(
+ FieldNestedUpdateAggFactory.NAME,
+ DataTypes.ARRAY(elementRowType),
+ Collections.emptyList(),
+ 2);
+
+ InternalArray accumulator = null;
+ InternalArray.ElementGetter elementGetter =
+ InternalArray.createElementGetter(elementRowType);
+
+ InternalRow current = row(0, 1, "B");
+ accumulator = (InternalArray) agg.agg(accumulator,
singletonArray(current));
+ assertThat(unnest(accumulator, elementGetter))
+
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(row(0, 1, "B")));
+
+ current = row(0, 1, "b");
+ accumulator = (InternalArray) agg.agg(accumulator,
singletonArray(current));
+ assertThat(unnest(accumulator, elementGetter))
+ .containsExactlyInAnyOrderElementsOf(Arrays.asList(row(0, 1,
"B"), row(0, 1, "b")));
+
+ // count limit is 2, so the third element will be dropped
+ current = row(0, 1, "C");
+ accumulator = (InternalArray) agg.agg(accumulator,
singletonArray(current));
+ assertThat(unnest(accumulator, elementGetter))
+ .containsExactlyInAnyOrderElementsOf(Arrays.asList(row(0, 1,
"B"), row(0, 1, "b")));
+ }
+
private List<Object> unnest(InternalArray array,
InternalArray.ElementGetter elementGetter) {
return IntStream.range(0, array.size())
.mapToObj(i -> elementGetter.getElementOrNull(array, i))