This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 0fa75ebf2 [flink] Allow partial updates for AGGREGATION merge engine
(#2888)
0fa75ebf2 is described below
commit 0fa75ebf2147615b1abea1da7a4e7881d9044823
Author: Yang Wang <[email protected]>
AuthorDate: Thu Mar 19 11:33:00 2026 +0800
[flink] Allow partial updates for AGGREGATION merge engine (#2888)
The connector-side validation in FlinkTableSink.getSinkRuntimeProvider()
was rejecting partial updates for ALL merge engines (mergeEngineType !=
null),
but the server-side AggregateRowMerger fully supports partial updates via
PartialAggregateRowMerger. Narrow the check to only block FIRST_ROW and
VERSIONED engines which genuinely do not support partial updates.
Add IT case for partial update on aggregation merge engine.
---
.../apache/fluss/flink/sink/FlinkTableSink.java | 2 +-
.../fluss/flink/sink/FlinkTableSinkITCase.java | 34 ++++++++++++++++++++++
2 files changed, 35 insertions(+), 1 deletion(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java
index fba180323..8035028de 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java
@@ -160,7 +160,7 @@ public class FlinkTableSink
"Fluss table sink does not support partial updates
for table without primary key. Please make sure the "
+ "number of specified columns in INSERT
INTO matches columns of the Fluss table.");
}
- if (mergeEngineType != null) {
+ if (mergeEngineType != null && mergeEngineType !=
MergeEngineType.AGGREGATION) {
throw new ValidationException(
String.format(
"Table %s uses the '%s' merge engine which
does not support partial updates. Please make sure the "
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index d5d1d5c49..02a8af15f 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -1438,6 +1438,40 @@ abstract class FlinkTableSinkITCase extends
AbstractTestBase {
assertResultsIgnoreOrder(rowIter, expectedRows, true);
}
+ @Test
+ void testPartialUpdateOnAggregationMergeEngine() throws Exception {
+ tEnv.executeSql(
+ "create table agg_partial_update ("
+ + "id int not null primary key not enforced, "
+ + "sum_a int, "
+ + "sum_b int"
+ + ") with ("
+ + "'table.merge-engine' = 'aggregation', "
+ + "'fields.sum_a.agg' = 'sum', "
+ + "'fields.sum_b.agg' = 'sum')");
+
+ // Full insert: all fields
+ tEnv.executeSql("INSERT INTO agg_partial_update VALUES (1, 100,
200)").await();
+
+ // Partial insert: only update sum_a
+ tEnv.executeSql("INSERT INTO agg_partial_update(id, sum_a) VALUES (1,
50)").await();
+
+ CloseableIterator<Row> rowIter =
+ tEnv.executeSql("SELECT * FROM agg_partial_update").collect();
+
+ List<String> expectedRows =
+ Arrays.asList("+I[1, 100, 200]", "-U[1, 100, 200]", "+U[1,
150, 200]");
+
+ assertResultsIgnoreOrder(rowIter, expectedRows, false);
+
+ // Partial insert on a new key (no prior full insert) — unspecified
columns should be null
+ tEnv.executeSql("INSERT INTO agg_partial_update(id, sum_a) VALUES (2,
77)").await();
+
+ expectedRows = Arrays.asList("+I[2, 77, null]");
+
+ assertResultsIgnoreOrder(rowIter, expectedRows, true);
+ }
+
private InsertAndExpectValues rowsToInsertInto(Collection<String>
partitions) {
List<String> insertValues = new ArrayList<>();
List<String> expectedValues = new ArrayList<>();