This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.0
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 231718c6fc7f88003579d61940231e611ab71645
Author: yuzelin <[email protected]>
AuthorDate: Wed Jan 15 11:25:41 2025 +0800

    [core] Fix that sequence group fields are mistakenly aggregated by default 
aggregator in partial update (#4897)
---
 .../mergetree/compact/PartialUpdateMergeFunction.java  | 14 +++++++++++---
 .../org/apache/paimon/flink/PartialUpdateITCase.java   | 18 ++++++++++++++++++
 2 files changed, 29 insertions(+), 3 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index ab25794129..357461f74d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -184,6 +184,7 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                             row.setField(
                                     fieldIndex, 
getters[fieldIndex].getFieldOrNull(kv.value()));
                         }
+                        continue;
                     }
                     row.setField(
                             i, aggregator == null ? field : 
aggregator.agg(accumulator, field));
@@ -304,6 +305,7 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
             List<String> fieldNames = rowType.getFieldNames();
             this.fieldSeqComparators = new HashMap<>();
             Map<String, Integer> sequenceGroupMap = new HashMap<>();
+            List<String> allSequenceFields = new ArrayList<>();
             for (Map.Entry<String, String> entry : options.toMap().entrySet()) 
{
                 String k = entry.getKey();
                 String v = entry.getValue();
@@ -318,6 +320,7 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                                                     .split(FIELDS_SEPARATOR))
                                     .map(fieldName -> 
validateFieldName(fieldName, fieldNames))
                                     .collect(Collectors.toList());
+                    allSequenceFields.addAll(sequenceFields);
 
                     Supplier<FieldsComparator> userDefinedSeqComparator =
                             () -> UserDefinedSeqComparator.create(rowType, 
sequenceFields, true);
@@ -347,7 +350,8 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                 }
             }
             this.fieldAggregators =
-                    createFieldAggregators(rowType, primaryKeys, new 
CoreOptions(options));
+                    createFieldAggregators(
+                            rowType, primaryKeys, allSequenceFields, new 
CoreOptions(options));
             if (!fieldAggregators.isEmpty() && fieldSeqComparators.isEmpty()) {
                 throw new IllegalArgumentException(
                         "Must use sequence group for aggregation functions.");
@@ -514,7 +518,10 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
          * @return The aggregators for each column.
          */
         private Map<Integer, Supplier<FieldAggregator>> createFieldAggregators(
-                RowType rowType, List<String> primaryKeys, CoreOptions 
options) {
+                RowType rowType,
+                List<String> primaryKeys,
+                List<String> allSequenceFields,
+                CoreOptions options) {
 
             List<String> fieldNames = rowType.getFieldNames();
             List<DataType> fieldTypes = rowType.getFieldTypes();
@@ -539,7 +546,8 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                                             isPrimaryKey,
                                             options,
                                             fieldName));
-                } else if (defaultAggFunc != null) {
+                } else if (defaultAggFunc != null && 
!allSequenceFields.contains(fieldName)) {
+                    // no agg for sequence fields
                     fieldAggregators.put(
                             i,
                             () ->
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index be2d6b3433..c52fa42e2a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -723,4 +723,22 @@ public class PartialUpdateITCase extends CatalogITCaseBase 
{
                         Row.ofKind(RowKind.UPDATE_AFTER, 1, "A", "apache"));
         iterator.close();
     }
+
+    @Test
+    public void testSequenceGroupWithDefaultAgg() {
+        sql(
+                "CREATE TABLE seq_default_agg ("
+                        + " pk INT PRIMARY KEY NOT ENFORCED,"
+                        + " seq INT,"
+                        + " v INT) WITH ("
+                        + " 'merge-engine'='partial-update',"
+                        + " 'fields.seq.sequence-group'='v',"
+                        + " 'fields.default-aggregate-function'='sum'"
+                        + ")");
+
+        sql("INSERT INTO seq_default_agg VALUES (0, 1, 1)");
+        sql("INSERT INTO seq_default_agg VALUES (0, 2, 2)");
+
+        assertThat(sql("SELECT * FROM 
seq_default_agg")).containsExactly(Row.of(0, 2, 3));
+    }
 }

Reply via email to