twalthr commented on code in PR #25921:
URL: https://github.com/apache/flink/pull/25921#discussion_r1907011666
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SourceWatermarkITCase.scala:
##########
@@ -305,4 +306,56 @@ class SourceWatermarkITCase extends StreamingTestBase {
assertThat(actualWatermark).isEqualTo(expectedWatermarkOutput)
assertThat(sink.getAppendResults.sorted).isEqualTo(expectedData.sorted)
}
+
+ @Test
+ def testWatermarkNotMovingBack(): Unit = {
+ val data = Seq(
+ row(1, LocalDateTime.parse("2024-01-01T00:00:00")),
+ row(3, LocalDateTime.parse("2024-01-03T00:00:00")),
+ row(2, LocalDateTime.parse("2024-01-02T00:00:00"))
+ )
+
+ val dataId = TestValuesTableFactory.registerData(data)
+
+ val ddl =
+ s"""
+ | CREATE Table VirtualTable (
+ | a INT,
+ | c TIMESTAMP(3),
+ | WATERMARK FOR c as c
+ | ) with (
+ | 'connector' = 'values',
+ | 'bounded' = 'false',
+ | 'scan.watermark.emit.strategy' = 'on-periodic',
+ | 'enable-watermark-push-down' = 'true',
+ | 'disable-lookup' = 'true',
+ | 'data-id' = '$dataId'
+ | )
+ |""".stripMargin
+
+ tEnv.executeSql(ddl)
+ tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(), "1")
Review Comment:
```suggestion
tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM, 1)
```
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SourceWatermarkITCase.scala:
##########
@@ -305,4 +306,56 @@ class SourceWatermarkITCase extends StreamingTestBase {
assertThat(actualWatermark).isEqualTo(expectedWatermarkOutput)
assertThat(sink.getAppendResults.sorted).isEqualTo(expectedData.sorted)
}
+
+ @Test
+ def testWatermarkNotMovingBack(): Unit = {
+ val data = Seq(
+ row(1, LocalDateTime.parse("2024-01-01T00:00:00")),
+ row(3, LocalDateTime.parse("2024-01-03T00:00:00")),
+ row(2, LocalDateTime.parse("2024-01-02T00:00:00"))
+ )
+
+ val dataId = TestValuesTableFactory.registerData(data)
+
+ val ddl =
+ s"""
+ | CREATE Table VirtualTable (
+ | a INT,
+ | c TIMESTAMP(3),
+ | WATERMARK FOR c as c
+ | ) with (
+ | 'connector' = 'values',
+ | 'bounded' = 'false',
+ | 'scan.watermark.emit.strategy' = 'on-periodic',
+ | 'enable-watermark-push-down' = 'true',
+ | 'disable-lookup' = 'true',
+ | 'data-id' = '$dataId'
+ | )
+ |""".stripMargin
+
+ tEnv.executeSql(ddl)
+ tEnv.getConfig.set(CoreOptions.DEFAULT_PARALLELISM.key(), "1")
+
+ val expectedWatermarkOutput = Seq("2024-01-01T00:00", "2024-01-03T00:00",
"2024-01-03T00:00")
Review Comment:
That is indeed a good question. Usually duplicate watermarks should be
filtered out by the framework? Is this due to our testing setup?
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SourceWatermarkITCase.scala:
##########
@@ -305,4 +306,56 @@ class SourceWatermarkITCase extends StreamingTestBase {
assertThat(actualWatermark).isEqualTo(expectedWatermarkOutput)
assertThat(sink.getAppendResults.sorted).isEqualTo(expectedData.sorted)
}
+
+ @Test
+ def testWatermarkNotMovingBack(): Unit = {
+ val data = Seq(
+ row(1, LocalDateTime.parse("2024-01-01T00:00:00")),
+ row(3, LocalDateTime.parse("2024-01-03T00:00:00")),
+ row(2, LocalDateTime.parse("2024-01-02T00:00:00"))
+ )
+
+ val dataId = TestValuesTableFactory.registerData(data)
+
+ val ddl =
+ s"""
+ | CREATE Table VirtualTable (
+ | a INT,
+ | c TIMESTAMP(3),
+ | WATERMARK FOR c as c
Review Comment:
This test was intended for `WATERMARK FOR c as SOURCE_WATERMARK()`. Let's
implement this test in Java in a fresh class.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]