This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 58d2bca9c3 [core] Fix that AggregateMergeFunction handles multiple 
sequence fields mistakenly (#5065)
58d2bca9c3 is described below

commit 58d2bca9c38326c341d59deabe14bb1500cf4038
Author: yuzelin <[email protected]>
AuthorDate: Wed Feb 12 17:23:17 2025 +0800

    [core] Fix that AggregateMergeFunction handles multiple sequence fields 
mistakenly (#5065)
---
 .../compact/aggregate/AggregateMergeFunction.java    |  5 +++--
 .../apache/paimon/flink/BatchFileStoreITCase.java    | 20 ++++++++++++++++++++
 2 files changed, 23 insertions(+), 2 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
index ca380d2778..9af786ae6c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
@@ -26,6 +26,7 @@ import org.apache.paimon.mergetree.compact.MergeFunction;
 import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
 import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory;
 import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastNonNullValueAggFactory;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastValueAggFactory;
 import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldPrimaryKeyAggFactory;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.types.DataType;
@@ -149,8 +150,8 @@ public class AggregateMergeFunction implements 
MergeFunction<KeyValue> {
 
         private String getAggFuncName(String fieldName, List<String> 
sequenceFields) {
             if (sequenceFields.contains(fieldName)) {
-                // no agg for sequence fields, use last_non_null_value to do 
cover
-                return FieldLastNonNullValueAggFactory.NAME;
+                // no agg for sequence fields, use last_value to do cover
+                return FieldLastValueAggFactory.NAME;
             }
 
             if (primaryKeys.contains(fieldName)) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 486bfcb69b..e92a8a30b4 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -682,6 +682,26 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
         }
     }
 
+    @Test
+    public void testAggregationWithNullSequenceField() {
+        sql(
+                "CREATE TABLE test ("
+                        + "  pk INT PRIMARY KEY NOT ENFORCED,"
+                        + "  v STRING,"
+                        + "  s0 INT,"
+                        + "  s1 INT"
+                        + ") WITH ("
+                        + "  'merge-engine' = 'aggregation',"
+                        + "  'sequence.field' = 's0,s1')");
+
+        sql(
+                "INSERT INTO test VALUES (1, 'A1', CAST (NULL AS INT), 1), (1, 
'A2', 1, CAST (NULL AS INT))");
+        assertThat(sql("SELECT * FROM test")).containsExactly(Row.of(1, "A2", 
1, null));
+
+        sql("INSERT INTO test VALUES (1, 'A3', 1, 0)");
+        assertThat(sql("SELECT * FROM test")).containsExactly(Row.of(1, "A3", 
1, 0));
+    }
+
     private void validateCount1PushDown(String sql) {
         Transformation<?> transformation = AbstractTestBase.translate(tEnv, 
sql);
         while (!transformation.getInputs().isEmpty()) {

Reply via email to