This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f3889e7eec [FLINK-39313][table-planner] Fix NDU analyzer to detect 
non-deterministic functions in ChangelogNormalize filter conditions
0f3889e7eec is described below

commit 0f3889e7eec677723ceed92835414038d754a32c
Author: Gustavo de Morais <[email protected]>
AuthorDate: Thu Mar 26 16:16:59 2026 +0100

    [FLINK-39313][table-planner] Fix NDU analyzer to detect non-deterministic 
functions in ChangelogNormalize filter conditions
    
    This closes #27819.
---
 .../StreamNonDeterministicUpdatePlanVisitor.java   |  21 +++-
 .../NonDeterministicUpdateAnalyzerTest.java        |  83 +++++++++++++++
 ...tDeterministicFilterOnUpsertSourceIsAllowed.out | 117 +++++++++++++++++++++
 3 files changed, 219 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
index c34dda9ea6b..99016be46fa 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
@@ -202,8 +202,10 @@ public class StreamNonDeterministicUpdatePlanVisitor {
         } else if (rel instanceof StreamPhysicalMLPredictTableFunction) {
             return visitMLPredictTableFunction(
                     (StreamPhysicalMLPredictTableFunction) rel, 
requireDeterminism);
-        } else if (rel instanceof StreamPhysicalChangelogNormalize
-                || rel instanceof StreamPhysicalDropUpdateBefore
+        } else if (rel instanceof StreamPhysicalChangelogNormalize) {
+            return visitChangelogNormalize(
+                    (StreamPhysicalChangelogNormalize) rel, 
requireDeterminism);
+        } else if (rel instanceof StreamPhysicalDropUpdateBefore
                 || rel instanceof StreamPhysicalMiniBatchAssigner
                 || rel instanceof StreamPhysicalUnion
                 || rel instanceof StreamPhysicalSort
@@ -342,6 +344,21 @@ public class StreamNonDeterministicUpdatePlanVisitor {
         return transmitDeterminismRequirement(predictTableFunction, 
NO_REQUIRED_DETERMINISM);
     }
 
+    /**
+     * ChangelogNormalize may have a filter condition pushed down by the 
optimizer. A
+     * non-deterministic filter (e.g. NOW()) can asymmetrically drop -U and +U 
records, corrupting
+     * the retract contract for downstream operators.
+     */
+    private StreamPhysicalRel visitChangelogNormalize(
+            final StreamPhysicalChangelogNormalize changelogNormalize,
+            final ImmutableBitSet requireDeterminism) {
+        final RexNode filterCondition = changelogNormalize.filterCondition();
+        if (filterCondition != null) {
+            checkNonDeterministicCondition(filterCondition, 
changelogNormalize);
+        }
+        return transmitDeterminismRequirement(changelogNormalize, 
requireDeterminism);
+    }
+
     private StreamPhysicalRel visitCorrelate(
             final StreamPhysicalCorrelateBase correlate, final ImmutableBitSet 
requireDeterminism) {
         if (inputInsertOnly(correlate) || requireDeterminism.isEmpty()) {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.java
index 509bd759a3b..5c7d2db0291 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.java
@@ -35,6 +35,7 @@ import org.junit.jupiter.api.Test;
 
 import scala.Enumeration;
 
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static scala.runtime.BoxedUnit.UNIT;
@@ -424,4 +425,86 @@ class NonDeterministicUpdateAnalyzerTest extends 
TableTestBase {
 
         assertEquals(expectedOverAggNduErrorMsg, tableException.getMessage());
     }
+
+    @Test
+    void testNowFilterPushedIntoChangelogNormalizeOnUpsertSource() {
+        tEnv.getConfig()
+                .set(
+                        
OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
+                        NonDeterministicUpdateStrategy.TRY_RESOLVE);
+
+        tEnv.executeSql(
+                "CREATE TEMPORARY TABLE upsert_src (\n"
+                        + "  a INT,\n"
+                        + "  b BIGINT,\n"
+                        + "  c STRING,\n"
+                        + "  ts TIMESTAMP(3),\n"
+                        + "  PRIMARY KEY (a) NOT ENFORCED\n"
+                        + ") WITH (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'changelog-mode' = 'I,UA,D'\n"
+                        + ")");
+
+        final String sql =
+                "INSERT INTO sink_with_pk SELECT a, b, c FROM upsert_src"
+                        + " WHERE ts >= NOW() - INTERVAL '90' DAY";
+
+        assertThatThrownBy(() -> util.verifyJsonPlan(sql))
+                .isInstanceOf(TableException.class)
+                .hasMessageContaining("non deterministic function")
+                .hasMessageContaining("NOW");
+    }
+
+    @Test
+    void 
testCurrentTimestampFilterPushedIntoChangelogNormalizeOnUpsertSource() {
+        tEnv.getConfig()
+                .set(
+                        
OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
+                        NonDeterministicUpdateStrategy.TRY_RESOLVE);
+
+        tEnv.executeSql(
+                "CREATE TEMPORARY TABLE upsert_src2 (\n"
+                        + "  a INT,\n"
+                        + "  b BIGINT,\n"
+                        + "  c STRING,\n"
+                        + "  ts TIMESTAMP(3),\n"
+                        + "  PRIMARY KEY (a) NOT ENFORCED\n"
+                        + ") WITH (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'changelog-mode' = 'I,UA,D'\n"
+                        + ")");
+
+        final String sql =
+                "INSERT INTO sink_with_pk SELECT a, b, c FROM upsert_src2"
+                        + " WHERE ts >= CURRENT_TIMESTAMP - INTERVAL '90' DAY";
+
+        assertThatThrownBy(() -> util.verifyJsonPlan(sql))
+                .isInstanceOf(TableException.class)
+                .hasMessageContaining("non deterministic function")
+                .hasMessageContaining("CURRENT_TIMESTAMP");
+    }
+
+    @Test
+    void testDeterministicFilterOnUpsertSourceIsAllowed() {
+        tEnv.getConfig()
+                .set(
+                        
OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
+                        NonDeterministicUpdateStrategy.TRY_RESOLVE);
+
+        tEnv.executeSql(
+                "CREATE TEMPORARY TABLE upsert_src3 (\n"
+                        + "  a INT,\n"
+                        + "  b BIGINT,\n"
+                        + "  c STRING,\n"
+                        + "  PRIMARY KEY (a) NOT ENFORCED\n"
+                        + ") WITH (\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'changelog-mode' = 'I,UA,D'\n"
+                        + ")");
+
+        // Deterministic filter should compile without error
+        final String sql = "INSERT INTO sink_with_pk SELECT a, b, c FROM 
upsert_src3 WHERE b > 100";
+
+        util.verifyJsonPlan(sql);
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest_jsonplan/testDeterministicFilterOnUpsertSourceIsAllowed.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest_jsonplan/testDeterministicFilterOnUpsertSourceIsAllowed.out
new file mode 100644
index 00000000000..d818ee13db9
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest_jsonplan/testDeterministicFilterOnUpsertSourceIsAllowed.out
@@ -0,0 +1,117 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_2",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`upsert_src3`"
+      },
+      "abilities" : [ {
+        "type" : "FilterPushDown",
+        "predicates" : [ ]
+      } ]
+    },
+    "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` 
VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, upsert_src3, filter=[]]], fields=[a, b, c])"
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` 
VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[hash[a]])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-changelog-normalize_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "false",
+      "table.exec.mini-batch.size" : "-1"
+    },
+    "uniqueKeys" : [ 0 ],
+    "generateUpdateBefore" : false,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "changelogNormalizeState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` 
VARCHAR(2147483647)>",
+    "description" : "ChangelogNormalize(key=[a], condition=[(b > 100)])",
+    "filterCondition" : {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$>$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "BIGINT"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 100,
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "BOOLEAN"
+    }
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-sink_2",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_with_pk`"
+      }
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_AFTER", "DELETE" ],
+    "inputUpsertKey" : [ 0 ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` 
VARCHAR(2147483647)>",
+    "description" : 
"Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file

Reply via email to