This is an automated email from the ASF dual-hosted git repository.
hongshun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 71d00e40b [test][flink] Fix Unstable test
FlinkTableSinkITCase.testVersionMergeEngineWithTypeTimestampLTZ9 (#2139)
71d00e40b is described below
commit 71d00e40bdb1b03425a2f686c4f611cd1d75a130
Author: Hongshun Wang <[email protected]>
AuthorDate: Fri Dec 19 16:28:20 2025 +0800
[test][flink] Fix Unstable test
FlinkTableSinkITCase.testVersionMergeEngineWithTypeTimestampLTZ9 (#2139)
---
.../fluss/flink/sink/FlinkTableSinkITCase.java | 55 +++++++++++++---------
1 file changed, 32 insertions(+), 23 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index a22316d70..c8aeaaae8 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -1151,32 +1151,35 @@ abstract class FlinkTableSinkITCase extends
AbstractTestBase {
@Test
void testVersionMergeEngineWithTypeTimestamp() throws Exception {
tEnv.executeSql(
- "create table merge_engine_with_version (a int not null
primary key not enforced,"
- + " b string, ts TIMESTAMP(3))
with('table.merge-engine' = 'versioned',"
- + "'table.merge-engine.versioned.ver-column' = 'ts')");
-
+ "create table merge_engine_with_version (a int not
null primary key not enforced,"
+ + " b string, ts TIMESTAMP(3))
with('table.merge-engine' = 'versioned',"
+ + "'table.merge-engine.versioned.ver-column' =
'ts')")
+ .await();
// insert once
tEnv.executeSql(
"INSERT INTO merge_engine_with_version (a, b, ts)
VALUES "
+ "(1, 'v1', TIMESTAMP '2024-12-27
12:00:00.123'), "
+ "(2, 'v2', TIMESTAMP '2024-12-27
12:00:00.123'), "
- + "(1, 'v11', TIMESTAMP '2024-12-27
11:59:59.123'), "
- + "(3, 'v3', TIMESTAMP '2024-12-27
12:00:00.123'),"
- + "(3, 'v33', TIMESTAMP '2024-12-27
12:00:00.123');")
+ + "(3, 'v3', TIMESTAMP '2024-12-27
12:00:00.123');")
.await();
-
CloseableIterator<Row> rowIter =
tEnv.executeSql("select * from
merge_engine_with_version").collect();
-
- // id=1 not update, but id=3 updated
List<String> expectedRows =
Arrays.asList(
"+I[1, v1, 2024-12-27T12:00:00.123]",
"+I[2, v2, 2024-12-27T12:00:00.123]",
- "+I[3, v3, 2024-12-27T12:00:00.123]",
+ "+I[3, v3, 2024-12-27T12:00:00.123]");
+ assertResultsIgnoreOrder(rowIter, expectedRows, false);
+
+ // insert again. id=1 not update, but id=3 updated
+ tEnv.executeSql(
+ "INSERT INTO merge_engine_with_version (a, b, ts) VALUES "
+ + "(1, 'v11', TIMESTAMP '2024-12-27 11:59:59.123'), "
+ + "(3, 'v33', TIMESTAMP '2024-12-27 12:00:00.123');");
+ expectedRows =
+ Arrays.asList(
"-U[3, v3, 2024-12-27T12:00:00.123]",
"+U[3, v33, 2024-12-27T12:00:00.123]");
-
assertResultsIgnoreOrder(rowIter, expectedRows, true);
}
@@ -1185,30 +1188,36 @@ abstract class FlinkTableSinkITCase extends
AbstractTestBase {
tEnv.getConfig().set("table.local-time-zone", "UTC");
tEnv.executeSql(
- "create table merge_engine_with_version (a int not null
primary key not enforced,"
- + " b string, ts TIMESTAMP(9) WITH LOCAL TIME ZONE )
with("
- + "'table.merge-engine' = 'versioned',"
- + "'table.merge-engine.versioned.ver-column' = 'ts')");
+ "create table merge_engine_with_version (a int not
null primary key not enforced,"
+ + " b string, ts TIMESTAMP(9) WITH LOCAL TIME
ZONE ) with("
+ + "'table.merge-engine' = 'versioned',"
+ + "'table.merge-engine.versioned.ver-column' =
'ts')")
+ .await();
// insert once
tEnv.executeSql(
"INSERT INTO merge_engine_with_version (a, b, ts)
VALUES "
+ "(1, 'v1', CAST(TIMESTAMP '2024-12-27
12:00:00.123456789' AS TIMESTAMP(9) WITH LOCAL TIME ZONE)), "
+ "(2, 'v2', CAST(TIMESTAMP '2024-12-27
12:00:00.123456789' AS TIMESTAMP(9) WITH LOCAL TIME ZONE)), "
- + "(1, 'v11', CAST(TIMESTAMP '2024-12-27
12:00:00.123456788' AS TIMESTAMP(9) WITH LOCAL TIME ZONE)), "
- + "(3, 'v3', CAST(TIMESTAMP '2024-12-27
12:00:00.123456789' AS TIMESTAMP(9) WITH LOCAL TIME ZONE)), "
- + "(3, 'v33', CAST(TIMESTAMP '2024-12-27
12:00:00.123456789' AS TIMESTAMP(9) WITH LOCAL TIME ZONE));")
+ + "(3, 'v3', CAST(TIMESTAMP '2024-12-27
12:00:00.123456789' AS TIMESTAMP(9) WITH LOCAL TIME ZONE));")
.await();
-
CloseableIterator<Row> rowIter =
tEnv.executeSql("select * from
merge_engine_with_version").collect();
-
- // id=1 not update, but id=3 updated
List<String> expectedRows =
Arrays.asList(
"+I[1, v1, 2024-12-27T12:00:00.123456789Z]",
"+I[2, v2, 2024-12-27T12:00:00.123456789Z]",
- "+I[3, v3, 2024-12-27T12:00:00.123456789Z]",
+ "+I[3, v3, 2024-12-27T12:00:00.123456789Z]");
+ assertResultsIgnoreOrder(rowIter, expectedRows, false);
+
+ // insert again. id=1 not update, but id=3 updated
+ tEnv.executeSql(
+ "INSERT INTO merge_engine_with_version (a, b, ts)
VALUES "
+ + "(1, 'v11', CAST(TIMESTAMP '2024-12-27
12:00:00.123456788' AS TIMESTAMP(9) WITH LOCAL TIME ZONE)), "
+ + "(3, 'v33', CAST(TIMESTAMP '2024-12-27
12:00:00.123456789' AS TIMESTAMP(9) WITH LOCAL TIME ZONE));")
+ .await();
+ expectedRows =
+ Arrays.asList(
"-U[3, v3, 2024-12-27T12:00:00.123456789Z]",
"+U[3, v33, 2024-12-27T12:00:00.123456789Z]");