This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 0fa75ebf2 [flink] Allow partial updates for AGGREGATION merge engine 
(#2888)
0fa75ebf2 is described below

commit 0fa75ebf2147615b1abea1da7a4e7881d9044823
Author: Yang Wang <[email protected]>
AuthorDate: Thu Mar 19 11:33:00 2026 +0800

    [flink] Allow partial updates for AGGREGATION merge engine (#2888)
    
    The connector-side validation in FlinkTableSink.getSinkRuntimeProvider()
    was rejecting partial updates for ALL merge engines (mergeEngineType != 
null),
    but the server-side AggregateRowMerger fully supports partial updates via
    PartialAggregateRowMerger. Narrow the check to only block FIRST_ROW and
    VERSIONED engines which genuinely do not support partial updates.
    
    Add IT case for partial update on aggregation merge engine.
---
 .../apache/fluss/flink/sink/FlinkTableSink.java    |  2 +-
 .../fluss/flink/sink/FlinkTableSinkITCase.java     | 34 ++++++++++++++++++++++
 2 files changed, 35 insertions(+), 1 deletion(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java
index fba180323..8035028de 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java
@@ -160,7 +160,7 @@ public class FlinkTableSink
                             "Fluss table sink does not support partial updates 
for table without primary key. Please make sure the "
                                     + "number of specified columns in INSERT 
INTO matches columns of the Fluss table.");
                 }
-                if (mergeEngineType != null) {
+                if (mergeEngineType != null && mergeEngineType != 
MergeEngineType.AGGREGATION) {
                     throw new ValidationException(
                             String.format(
                                     "Table %s uses the '%s' merge engine which 
does not support partial updates. Please make sure the "
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index d5d1d5c49..02a8af15f 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -1438,6 +1438,40 @@ abstract class FlinkTableSinkITCase extends 
AbstractTestBase {
         assertResultsIgnoreOrder(rowIter, expectedRows, true);
     }
 
+    @Test
+    void testPartialUpdateOnAggregationMergeEngine() throws Exception {
+        tEnv.executeSql(
+                "create table agg_partial_update ("
+                        + "id int not null primary key not enforced, "
+                        + "sum_a int, "
+                        + "sum_b int"
+                        + ") with ("
+                        + "'table.merge-engine' = 'aggregation', "
+                        + "'fields.sum_a.agg' = 'sum', "
+                        + "'fields.sum_b.agg' = 'sum')");
+
+        // Full insert: all fields
+        tEnv.executeSql("INSERT INTO agg_partial_update VALUES (1, 100, 
200)").await();
+
+        // Partial insert: only update sum_a
+        tEnv.executeSql("INSERT INTO agg_partial_update(id, sum_a) VALUES (1, 
50)").await();
+
+        CloseableIterator<Row> rowIter =
+                tEnv.executeSql("SELECT * FROM agg_partial_update").collect();
+
+        List<String> expectedRows =
+                Arrays.asList("+I[1, 100, 200]", "-U[1, 100, 200]", "+U[1, 
150, 200]");
+
+        assertResultsIgnoreOrder(rowIter, expectedRows, false);
+
+        // Partial insert on a new key (no prior full insert) — unspecified 
columns should be null
+        tEnv.executeSql("INSERT INTO agg_partial_update(id, sum_a) VALUES (2, 
77)").await();
+
+        expectedRows = Arrays.asList("+I[2, 77, null]");
+
+        assertResultsIgnoreOrder(rowIter, expectedRows, true);
+    }
+
     private InsertAndExpectValues rowsToInsertInto(Collection<String> 
partitions) {
         List<String> insertValues = new ArrayList<>();
         List<String> expectedValues = new ArrayList<>();

Reply via email to