This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.0 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 231718c6fc7f88003579d61940231e611ab71645 Author: yuzelin <[email protected]> AuthorDate: Wed Jan 15 11:25:41 2025 +0800 [core] Fix that sequence group fields are mistakenly aggregated by default aggregator in partial update (#4897) --- .../mergetree/compact/PartialUpdateMergeFunction.java | 14 +++++++++++--- .../org/apache/paimon/flink/PartialUpdateITCase.java | 18 ++++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java index ab25794129..357461f74d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java @@ -184,6 +184,7 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> { row.setField( fieldIndex, getters[fieldIndex].getFieldOrNull(kv.value())); } + continue; } row.setField( i, aggregator == null ? field : aggregator.agg(accumulator, field)); @@ -304,6 +305,7 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> { List<String> fieldNames = rowType.getFieldNames(); this.fieldSeqComparators = new HashMap<>(); Map<String, Integer> sequenceGroupMap = new HashMap<>(); + List<String> allSequenceFields = new ArrayList<>(); for (Map.Entry<String, String> entry : options.toMap().entrySet()) { String k = entry.getKey(); String v = entry.getValue(); @@ -318,6 +320,7 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> { .split(FIELDS_SEPARATOR)) .map(fieldName -> validateFieldName(fieldName, fieldNames)) .collect(Collectors.toList()); + allSequenceFields.addAll(sequenceFields); Supplier<FieldsComparator> userDefinedSeqComparator = () -> UserDefinedSeqComparator.create(rowType, sequenceFields, true); @@ -347,7 +350,8 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> { } } this.fieldAggregators = - createFieldAggregators(rowType, primaryKeys, new CoreOptions(options)); + createFieldAggregators( + rowType, primaryKeys, allSequenceFields, new CoreOptions(options)); if (!fieldAggregators.isEmpty() && fieldSeqComparators.isEmpty()) { throw new IllegalArgumentException( "Must use sequence group for aggregation functions."); @@ -514,7 +518,10 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> { * @return The aggregators for each column. */ private Map<Integer, Supplier<FieldAggregator>> createFieldAggregators( - RowType rowType, List<String> primaryKeys, CoreOptions options) { + RowType rowType, + List<String> primaryKeys, + List<String> allSequenceFields, + CoreOptions options) { List<String> fieldNames = rowType.getFieldNames(); List<DataType> fieldTypes = rowType.getFieldTypes(); @@ -539,7 +546,8 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> { isPrimaryKey, options, fieldName)); - } else if (defaultAggFunc != null) { + } else if (defaultAggFunc != null && !allSequenceFields.contains(fieldName)) { + // no agg for sequence fields fieldAggregators.put( i, () -> diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java index be2d6b3433..c52fa42e2a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java @@ -723,4 +723,22 @@ public class PartialUpdateITCase extends CatalogITCaseBase { Row.ofKind(RowKind.UPDATE_AFTER, 1, "A", "apache")); iterator.close(); } + + @Test + public void testSequenceGroupWithDefaultAgg() { + sql( + "CREATE TABLE seq_default_agg (" + + " pk INT PRIMARY KEY NOT ENFORCED," + + " seq INT," + + " v INT) WITH (" + + " 'merge-engine'='partial-update'," + + " 'fields.seq.sequence-group'='v'," + + " 'fields.default-aggregate-function'='sum'" + + ")"); + + sql("INSERT INTO seq_default_agg VALUES (0, 1, 1)"); + sql("INSERT INTO seq_default_agg VALUES (0, 2, 2)"); + + assertThat(sql("SELECT * FROM seq_default_agg")).containsExactly(Row.of(0, 2, 3)); + } }
