This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 86b1f4ce2 [test][kv] Ensure ordered inserts for auto-increment ID
tests (#2779)
86b1f4ce2 is described below
commit 86b1f4ce2e08110d9cb197b4b82be2c4ac791ad1
Author: Junbo Wang <[email protected]>
AuthorDate: Thu Mar 19 10:46:22 2026 +0800
[test][kv] Ensure ordered inserts for auto-increment ID tests (#2779)
This closes #2779.
---
.../fluss/flink/sink/FlinkTableSinkITCase.java | 59 ++++++++++++----------
1 file changed, 32 insertions(+), 27 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index 08b6b6c9f..d5d1d5c49 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -29,6 +29,7 @@ import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.utils.types.Tuple2;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
@@ -1687,21 +1688,24 @@ abstract class FlinkTableSinkITCase extends
AbstractTestBase {
+ ") with
('auto-increment.fields'='auto_increment_id')",
tableName));
- // Insert initial data specifying only id and amount
- tEnv.executeSql(
- String.format(
- "INSERT INTO %s (id, amount) VALUES "
- + "(1, 100), "
- + "(2, 200), "
- + "(3, 150), "
- + "(4, 250)",
- tableName))
- .await();
+ // Insert rows one by one to preserve auto-increment ID ordering.
+ // Partial-column INSERT ... VALUES is rewritten into a UNION ALL of
Value Sources
+ // by Flink, whose runtime does not guarantee UNION ALL input ordering.
+ List<Tuple2<Integer, Integer>> inserts =
+ Arrays.asList(
+ Tuple2.of(1, 100), Tuple2.of(2, 200), Tuple2.of(3,
150), Tuple2.of(4, 250));
+ for (Tuple2<Integer, Integer> record : inserts) {
+ tEnv.executeSql(
+ String.format(
+ "INSERT INTO %s (id, amount) VALUES (%d,
%d)",
+ tableName, record.f0, record.f1))
+ .await();
+ }
+ // Update and insert more records
tEnv.executeSql(
String.format(
- "INSERT INTO %s (id, amount) VALUES " + "(3,
350), " + "(5, 500)",
- tableName))
+ "INSERT INTO %s (id, amount) VALUES (3, 350),
(5, 500)", tableName))
.await();
List<String> expectedResults =
@@ -1780,25 +1784,26 @@ abstract class FlinkTableSinkITCase extends
AbstractTestBase {
+ ") with ('table.changelog.image' = 'wal',
'auto-increment.fields'='auto_increment_id')",
tableName));
- // Insert initial data
- tEnv.executeSql(
- String.format(
- "INSERT INTO %s (id, amount) VALUES "
- + "(1, 100), "
- + "(2, 200), "
- + "(3, 150), "
- + "(4, 250)",
- tableName))
- .await();
-
- // Use batch mode to update and delete records
+ // Insert rows one by one to preserve auto-increment ID ordering.
+ // Partial-column INSERT ... VALUES is rewritten into a UNION ALL of
Value Sources
+ // by Flink, whose runtime does not guarantee UNION ALL input ordering.
+ List<Tuple2<Integer, Integer>> inserts =
+ Arrays.asList(
+ Tuple2.of(1, 100), Tuple2.of(2, 200), Tuple2.of(3,
150), Tuple2.of(4, 250));
+ for (Tuple2<Integer, Integer> record : inserts) {
+ tEnv.executeSql(
+ String.format(
+ "INSERT INTO %s (id, amount) VALUES (%d,
%d)",
+ tableName, record.f0, record.f1))
+ .await();
+ }
- // Upsert data, not support update/delete rows in table with auto-inc
column for now.
+ // Upsert data - these don't need sequential execution since auto
increment IDs
+ // are already assigned from initial inserts above. Updates reuse
existing IDs.
// TODO: Support Batch Update
tEnv.executeSql(
String.format(
- "INSERT INTO %s (id, amount) VALUES " + "(1,
120), " + "(3, 180)",
- tableName))
+ "INSERT INTO %s (id, amount) VALUES (1, 120),
(3, 180)", tableName))
.await();
List<String> expectedResults =