gustavodemorais commented on code in PR #26591: URL: https://github.com/apache/flink/pull/26591#discussion_r2106329686
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeTestPrograms.java:
##########
@@ -199,4 +200,87 @@ public class ChangelogNormalizeTestPrograms {
.build())
.runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t
WHERE b < 10")
.build();
+
+ static final TableTestProgram UPSERT_SOURCE_WITH_FILTER_ON_WATERMARK =
+ TableTestProgram.of(
+ "changelog-normalize-upsert-filter-watermark",
+ "validates changelog normalize upsert with filter
using current_watermark")
+ .setupConfig(
+
ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, true)
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t")
+ .addOption("changelog-mode", "I,UA,D")
+ .addSchema(
+ "a VARCHAR",
+ "b INT NOT NULL",
+ "c VARCHAR",
+ "d TIMESTAMP_LTZ(3)",
+ "WATERMARK FOR d AS d",
+ "PRIMARY KEY(a) NOT ENFORCED")
+ .producedBeforeRestore(
+ Row.ofKind(
+ RowKind.UPDATE_AFTER,
+ "one",
+ 1,
+ "a",
+ Instant.ofEpochMilli(1L)),
+ Row.ofKind(
+ RowKind.UPDATE_AFTER,
+ "one",
+ 2,
+ "b",
+ Instant.ofEpochMilli(1L)),
+ Row.ofKind(
+ RowKind.UPDATE_AFTER,
+ "one",
+ 12,
+ "b",
+ Instant.ofEpochMilli(1L)),
+ Row.ofKind(
+ RowKind.UPDATE_AFTER,
+ "one",
+ 13,
+ "b",
+ Instant.ofEpochMilli(1L)),
+ Row.ofKind(
+ RowKind.UPDATE_AFTER,
+ "three",
+ 3,
+ "cc",
+ Instant.ofEpochMilli(1L)))
+ .producedAfterRestore(
+ Row.ofKind(
+ RowKind.UPDATE_AFTER,
+ "one",
+ 15,
+ "aa",
+ Instant.ofEpochMilli(1L)),
+ Row.ofKind(
+ RowKind.DELETE,
+ "one",
+ 15,
+ "c",
+ Instant.ofEpochMilli(1L)),
+ Row.ofKind(
+ RowKind.DELETE,
+ "three",
+ 3,
+ "cc",
+ Instant.ofEpochMilli(1L)))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema(SINK_SCHEMA)
+ .consumedBeforeRestore(
+ "+I[one, 1, a]",
+ "-U[one, 1, a]",
+ "+U[one, 2, b]",
+ "-D[one, 2, b]",
+ "+I[three, 3, cc]")
+ .consumedAfterRestore("-D[three, 3, cc]")
+ .build())
+ .runSql(
+ "INSERT INTO sink_t SELECT a, b, c FROM source_t
WHERE b < 10 AND "
+ + "CURRENT_WATERMARK(d) IS NULL")
Review Comment:
It's always null for all the events here because we don't progress the
watermark, correct?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
