This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 86b1f4ce2 [test][kv] Ensure ordered inserts for auto-increment ID 
tests (#2779)
86b1f4ce2 is described below

commit 86b1f4ce2e08110d9cb197b4b82be2c4ac791ad1
Author: Junbo Wang <[email protected]>
AuthorDate: Thu Mar 19 10:46:22 2026 +0800

    [test][kv] Ensure ordered inserts for auto-increment ID tests (#2779)
    
    This closes #2779.
---
 .../fluss/flink/sink/FlinkTableSinkITCase.java     | 59 ++++++++++++----------
 1 file changed, 32 insertions(+), 27 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index 08b6b6c9f..d5d1d5c49 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -29,6 +29,7 @@ import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.utils.types.Tuple2;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.typeinfo.Types;
@@ -1687,21 +1688,24 @@ abstract class FlinkTableSinkITCase extends 
AbstractTestBase {
                                 + ") with 
('auto-increment.fields'='auto_increment_id')",
                         tableName));
 
-        // Insert initial data specifying only id and amount
-        tEnv.executeSql(
-                        String.format(
-                                "INSERT INTO %s (id, amount) VALUES "
-                                        + "(1, 100), "
-                                        + "(2, 200), "
-                                        + "(3, 150), "
-                                        + "(4, 250)",
-                                tableName))
-                .await();
+        // Insert rows one by one to preserve auto-increment ID ordering.
+        // Partial-column INSERT ... VALUES is rewritten into a UNION ALL of 
Value Sources
+        // by Flink, whose runtime does not guarantee UNION ALL input ordering.
+        List<Tuple2<Integer, Integer>> inserts =
+                Arrays.asList(
+                        Tuple2.of(1, 100), Tuple2.of(2, 200), Tuple2.of(3, 
150), Tuple2.of(4, 250));
+        for (Tuple2<Integer, Integer> record : inserts) {
+            tEnv.executeSql(
+                            String.format(
+                                    "INSERT INTO %s (id, amount) VALUES (%d, 
%d)",
+                                    tableName, record.f0, record.f1))
+                    .await();
+        }
 
+        // Update and insert more records
         tEnv.executeSql(
                         String.format(
-                                "INSERT INTO %s (id, amount) VALUES " + "(3, 
350), " + "(5, 500)",
-                                tableName))
+                                "INSERT INTO %s (id, amount) VALUES (3, 350), 
(5, 500)", tableName))
                 .await();
 
         List<String> expectedResults =
@@ -1780,25 +1784,26 @@ abstract class FlinkTableSinkITCase extends 
AbstractTestBase {
                                 + ") with ('table.changelog.image' = 'wal', 
'auto-increment.fields'='auto_increment_id')",
                         tableName));
 
-        // Insert initial data
-        tEnv.executeSql(
-                        String.format(
-                                "INSERT INTO %s (id, amount) VALUES "
-                                        + "(1, 100), "
-                                        + "(2, 200), "
-                                        + "(3, 150), "
-                                        + "(4, 250)",
-                                tableName))
-                .await();
-
-        // Use batch mode to update and delete records
+        // Insert rows one by one to preserve auto-increment ID ordering.
+        // Partial-column INSERT ... VALUES is rewritten into a UNION ALL of 
Value Sources
+        // by Flink, whose runtime does not guarantee UNION ALL input ordering.
+        List<Tuple2<Integer, Integer>> inserts =
+                Arrays.asList(
+                        Tuple2.of(1, 100), Tuple2.of(2, 200), Tuple2.of(3, 
150), Tuple2.of(4, 250));
+        for (Tuple2<Integer, Integer> record : inserts) {
+            tEnv.executeSql(
+                            String.format(
+                                    "INSERT INTO %s (id, amount) VALUES (%d, 
%d)",
+                                    tableName, record.f0, record.f1))
+                    .await();
+        }
 
-        // Upsert data, not support update/delete rows in table with auto-inc 
column for now.
+        // Upsert data - these don't need sequential execution since auto 
increment IDs
+        // are already assigned from initial inserts above. Updates reuse 
existing IDs.
         // TODO: Support Batch Update
         tEnv.executeSql(
                         String.format(
-                                "INSERT INTO %s (id, amount) VALUES " + "(1, 
120), " + "(3, 180)",
-                                tableName))
+                                "INSERT INTO %s (id, amount) VALUES (1, 120), 
(3, 180)", tableName))
                 .await();
 
         List<String> expectedResults =

Reply via email to