This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0f3889e7eec [FLINK-39313][table-planner] Fix NDU analyzer to detect
non-deterministic functions in ChangelogNormalize filter conditions
0f3889e7eec is described below
commit 0f3889e7eec677723ceed92835414038d754a32c
Author: Gustavo de Morais <[email protected]>
AuthorDate: Thu Mar 26 16:16:59 2026 +0100
[FLINK-39313][table-planner] Fix NDU analyzer to detect non-deterministic
functions in ChangelogNormalize filter conditions
This closes #27819.
---
.../StreamNonDeterministicUpdatePlanVisitor.java | 21 +++-
.../NonDeterministicUpdateAnalyzerTest.java | 83 +++++++++++++++
...tDeterministicFilterOnUpsertSourceIsAllowed.out | 117 +++++++++++++++++++++
3 files changed, 219 insertions(+), 2 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
index c34dda9ea6b..99016be46fa 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
@@ -202,8 +202,10 @@ public class StreamNonDeterministicUpdatePlanVisitor {
} else if (rel instanceof StreamPhysicalMLPredictTableFunction) {
return visitMLPredictTableFunction(
(StreamPhysicalMLPredictTableFunction) rel,
requireDeterminism);
- } else if (rel instanceof StreamPhysicalChangelogNormalize
- || rel instanceof StreamPhysicalDropUpdateBefore
+ } else if (rel instanceof StreamPhysicalChangelogNormalize) {
+ return visitChangelogNormalize(
+ (StreamPhysicalChangelogNormalize) rel,
requireDeterminism);
+ } else if (rel instanceof StreamPhysicalDropUpdateBefore
|| rel instanceof StreamPhysicalMiniBatchAssigner
|| rel instanceof StreamPhysicalUnion
|| rel instanceof StreamPhysicalSort
@@ -342,6 +344,21 @@ public class StreamNonDeterministicUpdatePlanVisitor {
return transmitDeterminismRequirement(predictTableFunction,
NO_REQUIRED_DETERMINISM);
}
+ /**
+ * ChangelogNormalize may have a filter condition pushed down by the
optimizer. A
+ * non-deterministic filter (e.g. NOW()) can asymmetrically drop -U and +U
records, corrupting
+ * the retract contract for downstream operators.
+ */
+ private StreamPhysicalRel visitChangelogNormalize(
+ final StreamPhysicalChangelogNormalize changelogNormalize,
+ final ImmutableBitSet requireDeterminism) {
+ final RexNode filterCondition = changelogNormalize.filterCondition();
+ if (filterCondition != null) {
+ checkNonDeterministicCondition(filterCondition,
changelogNormalize);
+ }
+ return transmitDeterminismRequirement(changelogNormalize,
requireDeterminism);
+ }
+
private StreamPhysicalRel visitCorrelate(
final StreamPhysicalCorrelateBase correlate, final ImmutableBitSet
requireDeterminism) {
if (inputInsertOnly(correlate) || requireDeterminism.isEmpty()) {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.java
index 509bd759a3b..5c7d2db0291 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.java
@@ -35,6 +35,7 @@ import org.junit.jupiter.api.Test;
import scala.Enumeration;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static scala.runtime.BoxedUnit.UNIT;
@@ -424,4 +425,86 @@ class NonDeterministicUpdateAnalyzerTest extends
TableTestBase {
assertEquals(expectedOverAggNduErrorMsg, tableException.getMessage());
}
+
+ @Test
+ void testNowFilterPushedIntoChangelogNormalizeOnUpsertSource() {
+ tEnv.getConfig()
+ .set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
+ NonDeterministicUpdateStrategy.TRY_RESOLVE);
+
+ tEnv.executeSql(
+ "CREATE TEMPORARY TABLE upsert_src (\n"
+ + " a INT,\n"
+ + " b BIGINT,\n"
+ + " c STRING,\n"
+ + " ts TIMESTAMP(3),\n"
+ + " PRIMARY KEY (a) NOT ENFORCED\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'changelog-mode' = 'I,UA,D'\n"
+ + ")");
+
+ final String sql =
+ "INSERT INTO sink_with_pk SELECT a, b, c FROM upsert_src"
+ + " WHERE ts >= NOW() - INTERVAL '90' DAY";
+
+ assertThatThrownBy(() -> util.verifyJsonPlan(sql))
+ .isInstanceOf(TableException.class)
+ .hasMessageContaining("non deterministic function")
+ .hasMessageContaining("NOW");
+ }
+
+ @Test
+ void
testCurrentTimestampFilterPushedIntoChangelogNormalizeOnUpsertSource() {
+ tEnv.getConfig()
+ .set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
+ NonDeterministicUpdateStrategy.TRY_RESOLVE);
+
+ tEnv.executeSql(
+ "CREATE TEMPORARY TABLE upsert_src2 (\n"
+ + " a INT,\n"
+ + " b BIGINT,\n"
+ + " c STRING,\n"
+ + " ts TIMESTAMP(3),\n"
+ + " PRIMARY KEY (a) NOT ENFORCED\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'changelog-mode' = 'I,UA,D'\n"
+ + ")");
+
+ final String sql =
+ "INSERT INTO sink_with_pk SELECT a, b, c FROM upsert_src2"
+ + " WHERE ts >= CURRENT_TIMESTAMP - INTERVAL '90' DAY";
+
+ assertThatThrownBy(() -> util.verifyJsonPlan(sql))
+ .isInstanceOf(TableException.class)
+ .hasMessageContaining("non deterministic function")
+ .hasMessageContaining("CURRENT_TIMESTAMP");
+ }
+
+ @Test
+ void testDeterministicFilterOnUpsertSourceIsAllowed() {
+ tEnv.getConfig()
+ .set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
+ NonDeterministicUpdateStrategy.TRY_RESOLVE);
+
+ tEnv.executeSql(
+ "CREATE TEMPORARY TABLE upsert_src3 (\n"
+ + " a INT,\n"
+ + " b BIGINT,\n"
+ + " c STRING,\n"
+ + " PRIMARY KEY (a) NOT ENFORCED\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'changelog-mode' = 'I,UA,D'\n"
+ + ")");
+
+ // Deterministic filter should compile without error
+ final String sql = "INSERT INTO sink_with_pk SELECT a, b, c FROM
upsert_src3 WHERE b > 100";
+
+ util.verifyJsonPlan(sql);
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest_jsonplan/testDeterministicFilterOnUpsertSourceIsAllowed.out
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest_jsonplan/testDeterministicFilterOnUpsertSourceIsAllowed.out
new file mode 100644
index 00000000000..d818ee13db9
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest_jsonplan/testDeterministicFilterOnUpsertSourceIsAllowed.out
@@ -0,0 +1,117 @@
+{
+ "flinkVersion" : "",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_2",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`upsert_src3`"
+ },
+ "abilities" : [ {
+ "type" : "FilterPushDown",
+ "predicates" : [ ]
+ } ]
+ },
+ "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c`
VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, upsert_src3, filter=[]]], fields=[a, b, c])"
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c`
VARCHAR(2147483647)>",
+ "description" : "Exchange(distribution=[hash[a]])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-changelog-normalize_1",
+ "configuration" : {
+ "table.exec.mini-batch.enabled" : "false",
+ "table.exec.mini-batch.size" : "-1"
+ },
+ "uniqueKeys" : [ 0 ],
+ "generateUpdateBefore" : false,
+ "state" : [ {
+ "index" : 0,
+ "ttl" : "0 ms",
+ "name" : "changelogNormalizeState"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c`
VARCHAR(2147483647)>",
+ "description" : "ChangelogNormalize(key=[a], condition=[(b > 100)])",
+ "filterCondition" : {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$>$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 100,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BOOLEAN"
+ }
+ }, {
+ "id" : 4,
+ "type" : "stream-exec-sink_2",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_with_pk`"
+ }
+ },
+ "inputChangelogMode" : [ "INSERT", "UPDATE_AFTER", "DELETE" ],
+ "inputUpsertKey" : [ 0 ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c`
VARCHAR(2147483647)>",
+ "description" :
"Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file