This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new bd9317e74b [core] Check that all fields with aggregate functions in
partial-update should be protected by sequence-group (#5034)
bd9317e74b is described below
commit bd9317e74be3755226ad5d92f4e207a1c1185722
Author: yuzelin <[email protected]>
AuthorDate: Sun Feb 9 19:56:10 2025 +0800
[core] Check that all fields with aggregate functions in partial-update
should be protected by sequence-group (#5034)
---
.../compact/PartialUpdateMergeFunction.java | 9 ++++--
.../compact/PartialUpdateMergeFunctionTest.java | 37 +++++++++++++++++++---
2 files changed, 39 insertions(+), 7 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index a28ac52df4..3ce51127b1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastNonNullValueAggFactory;
import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldPrimaryKeyAggFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataField;
@@ -548,9 +549,13 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
String aggFuncName = getAggFuncName(options, fieldName);
if (aggFuncName != null) {
+ // last_non_null_value doesn't require sequence group
checkArgument(
- !fieldSeqComparators.isEmpty(),
- "Must use sequence group for aggregation
functions.");
+
aggFuncName.equals(FieldLastNonNullValueAggFactory.NAME)
+ || fieldSeqComparators.containsKey(
+ fieldNames.indexOf(fieldName)),
+ "Must use sequence group for aggregation functions
but not found for field %s.",
+ fieldName);
fieldAggregators.put(
i,
() ->
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
index 529110cabc..28625a9bf3 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.mergetree.compact;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
@@ -31,6 +32,7 @@ import
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.junit.jupiter.api.Test;
import static org.apache.paimon.CoreOptions.FIELDS_DEFAULT_AGG_FUNC;
+import static
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -820,14 +822,39 @@ public class PartialUpdateMergeFunctionTest {
@Test
public void testAggregationWithoutSequenceGroup() {
- Options options = new Options();
- options.set("fields.f1.aggregate-function", "listagg");
- RowType rowType = RowType.of(DataTypes.INT(), DataTypes.INT());
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT()
+ },
+ new String[] {"pk", "f0", "g0", "f1", "g1"});
+
+ Options options1 = new Options();
+ options1.set("fields.f0.aggregate-function", "listagg");
+ options1.set("fields.f1.aggregate-function", "listagg");
assertThatThrownBy(
() ->
PartialUpdateMergeFunction.factory(
- options, rowType,
ImmutableList.of("f0")))
- .hasMessageContaining("Must use sequence group for aggregation
functions");
+ options1, rowType,
ImmutableList.of("pk")))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "Must use sequence group for aggregation
functions but not found for field f0."));
+
+ Options options2 = new Options(options1.toMap());
+ options2.set("fields.g0.sequence-group", "f0");
+ assertThatThrownBy(
+ () ->
+ PartialUpdateMergeFunction.factory(
+ options2, rowType,
ImmutableList.of("pk")))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "Must use sequence group for aggregation
functions but not found for field f1."));
}
private void add(MergeFunction<KeyValue> function, Integer... f) {