This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch release-2.2
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 41455ec73f58012e2f1e22dc0beec42a324882d3
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Tue Nov 25 09:55:28 2025 +0100

    [FLINK-38720][table] Nested nullability might lead to class cast `RexCall` 
cannot be cast to class `RexFieldAccess`
---
 .../apache/calcite/sql2rel/SqlToRelConverter.java  |  54 +++++++---
 .../exec/common/JoinSemanticTestPrograms.java      | 113 +++++++++++++++++++++
 .../plan/nodes/exec/common/JoinTestPrograms.java   |  60 -----------
 .../plan/nodes/exec/stream/JoinSemanticTests.java  |   6 +-
 4 files changed, 155 insertions(+), 78 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
index b226cc57014..4f00abc858b 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
@@ -245,19 +245,21 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * <p>FLINK modifications are at lines
  *
  * <ol>
- *   <li>Added in FLINK-29081, FLINK-28682, FLINK-33395: Lines 686 ~ 703
- *   <li>Added in Flink-24024: Lines 1453 ~ 1459
- *   <li>Added in Flink-24024: Lines 1473 ~ 1512
- *   <li>Added in Flink-37269: Lines 2250 ~ 2272
- *   <li>Added in FLINK-28682: Lines 2383 ~ 2400
- *   <li>Added in FLINK-28682: Lines 2437 ~ 2465
- *   <li>Added in FLINK-32474: Lines 2522 ~ 2524
- *   <li>Added in FLINK-32474: Lines 2528 ~ 2530
- *   <li>Added in FLINK-32474: Lines 2546 ~ 2548
- *   <li>Added in CALCITE-7217: Lines 2587 ~ 2595, should be dropped with 
upgrade to Calcite 1.41.0
- *   <li>Added in FLINK-32474: Lines 2970 ~ 2982
- *   <li>Added in FLINK-32474: Lines 3083 ~ 3117
- *   <li>Added in FLINK-34312: Lines 5947 ~ 5958
+ *   <li>Added in FLINK-29081, FLINK-28682, FLINK-33395: Lines 688 ~ 705
+ *   <li>Added in FLINK-24024: Lines 1455 ~ 1461
+ *   <li>Added in FLINK-24024: Lines 1475 ~ 1514
+ *   <li>Added in FLINK-37269: Lines 2252 ~ 2274
+ *   <li>Added in FLINK-28682: Lines 2385 ~ 2402
+ *   <li>Added in FLINK-28682: Lines 2439 ~ 2467
+ *   <li>Added in FLINK-32474: Lines 2524 ~ 2526
+ *   <li>Added in FLINK-32474: Lines 2530 ~ 2532
+ *   <li>Added in FLINK-32474: Lines 2548 ~ 2550
+ *   <li>Added in CALCITE-7217: Lines 2589 ~ 2597, should be dropped with 
upgrade to Calcite 1.41.0
+ *   <li>Added in FLINK-32474: Lines 2972 ~ 2984
+ *   <li>Added in FLINK-32474: Lines 3085 ~ 3119
+ *   <li>Added in FLINK-38720: Lines 4579 ~ 4585
+ *   <li>Added in FLINK-38720: Lines 4591 ~ 4607
+ *   <li>Added in FLINK-34312: Lines 5971 ~ 5982
  * </ol>
  *
  * <p>In official extension point (i.e. {@link 
#convertExtendedExpression(SqlNode, Blackboard)}):
@@ -4574,14 +4576,34 @@ public class SqlToRelConverter {
         }
 
         if (e0.left instanceof RexCorrelVariable) {
-            assert e instanceof RexFieldAccess;
-            final RexNode prev =
-                    bb.mapCorrelateToRex.put(((RexCorrelVariable) e0.left).id, 
(RexFieldAccess) e);
+            // ----- FLINK MODIFICATION BEGIN -----
+            // adjust the type to account for nulls introduced by 
FlinkRexBuilder#makeFieldAccess
+            final RexFieldAccess rfa = adjustRexFieldAccess(e);
+            final RexNode prev = bb.mapCorrelateToRex.put(((RexCorrelVariable) 
e0.left).id, rfa);
+            // ----- FLINK MODIFICATION END -----
             assert prev == null;
         }
         return e;
     }
 
+    // ----- FLINK MODIFICATION BEGIN -----
+    private RexFieldAccess adjustRexFieldAccess(RexNode rexNode) {
+        // Either RexFieldAccess or CAST of RexFieldAccess to nullable
+        assert rexNode instanceof RexFieldAccess
+                || rexNode instanceof RexCall
+                        && rexNode.getKind() == SqlKind.CAST
+                        && ((RexCall) rexNode).getOperands().size() == 1
+                        && ((RexCall) rexNode).getOperands().get(0) instanceof 
RexFieldAccess;
+
+        if (rexNode instanceof RexFieldAccess) {
+            return (RexFieldAccess) rexNode;
+        } else {
+            return (RexFieldAccess) ((RexCall) rexNode).getOperands().get(0);
+        }
+    }
+
+    // ----- FLINK MODIFICATION END -----
+
     /**
      * Adjusts the type of a reference to an input field to account for nulls 
introduced by outer
      * joins; and adjusts the offset to match the physical implementation.
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/JoinSemanticTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/JoinSemanticTestPrograms.java
new file mode 100644
index 00000000000..b687f5d4a18
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/JoinSemanticTestPrograms.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.common;
+
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import java.util.stream.IntStream;
+
+/** {@link TableTestProgram} definitions for semantic testing {@link 
StreamExecJoin}. */
+public class JoinSemanticTestPrograms {
+    public static final TableTestProgram OUTER_JOIN_CHANGELOG_TEST =
+            TableTestProgram.of("join-duplicate-emission-bug", "bug with CTE 
and left join")
+                    .setupTableSource(
+                            
SourceTestStep.newBuilder("upsert_table_with_duplicates")
+                                    .addSchema(
+                                            "`execution_plan_id` 
VARCHAR(2147483647) NOT NULL",
+                                            "`workflow_id` VARCHAR(2147483647) 
NOT NULL",
+                                            "`event_section_id` 
VARCHAR(2147483647) NOT NULL",
+                                            "CONSTRAINT `PRIMARY` PRIMARY KEY 
(`execution_plan_id`, `event_section_id`) NOT ENFORCED")
+                                    .addOption("changelog-mode", "I, UA,D")
+                                    .producedValues(
+                                            IntStream.range(0, 13)
+                                                    .mapToObj(
+                                                            i ->
+                                                                    Row.ofKind(
+                                                                            
RowKind.UPDATE_AFTER,
+                                                                            
"section_id_1",
+                                                                            
"section_id_2",
+                                                                            
"section_id_3"))
+                                                    .toArray(Row[]::new))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("event_element_id STRING", "cnt 
BIGINT")
+                                    .testMaterializedData()
+                                    .consumedValues(Row.of("pk-1", 1), 
Row.of("pk-2", 1))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink WITH\n"
+                                    + "    section_detail as (\n"
+                                    + "        SELECT s.event_section_id\n"
+                                    + "        \n"
+                                    + "        FROM 
upsert_table_with_duplicates s\n"
+                                    + "    ),\n"
+                                    + "\n"
+                                    + "    event_element as (\n"
+                                    + "        SELECT\n"
+                                    + "            ed.id as event_element_id\n"
+                                    + "        FROM (\n"
+                                    + "          SELECT\n"
+                                    + "                 'pk-2' id,\n"
+                                    + "                 'section_id_3' 
section_id\n"
+                                    + "           UNION ALL\n"
+                                    + "          SELECT\n"
+                                    + "                 'pk-1' id,\n"
+                                    + "                 'section_id_3' 
section_id\n"
+                                    + "        ) ed  \n"
+                                    + "        LEFT JOIN\n"
+                                    + "            section_detail as s\n"
+                                    + "            ON s.event_section_id = 
ed.section_id\n"
+                                    + "    )\n"
+                                    + "\n"
+                                    + "SELECT  event_element_id, COUNT(*) 
cnt\n"
+                                    + "FROM event_element\n"
+                                    + "GROUP BY event_element_id")
+                    .build();
+
+    public static final TableTestProgram ANTI_JOIN_ON_NESTED =
+            TableTestProgram.of("anti-join-on-nested", "anti join on nested 
fields")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t1")
+                                    .addSchema("`ext` ROW<`nested` STRING NOT 
NULL>")
+                                    
.producedValues(Row.of(Row.of("test_same")))
+                                    .build())
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t2")
+                                    .addSchema(
+                                            "`ext` ROW<`nested` ROW<`nested1` 
ROW<`nested2` STRING NOT NULL>>>")
+                                    .producedValues(
+                                            
Row.of(Row.of(Row.of(Row.of("test_diff")))),
+                                            
Row.of(Row.of(Row.of(Row.of("test_same")))))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("output STRING")
+                                    .consumedValues("+I[test_diff]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT 
t2.ext.nested.nested1.nested2 FROM source_t2 t2 WHERE"
+                                    + " NOT EXISTS (SELECT 1 FROM source_t1 t1 
WHERE t1.ext.nested = t2.ext.nested.nested1.nested2)")
+                    .build();
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/JoinTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/JoinTestPrograms.java
index 6f41ef4ff1e..02d3e92b711 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/JoinTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/JoinTestPrograms.java
@@ -23,9 +23,6 @@ import org.apache.flink.table.test.program.SinkTestStep;
 import org.apache.flink.table.test.program.SourceTestStep;
 import org.apache.flink.table.test.program.TableTestProgram;
 import org.apache.flink.types.Row;
-import org.apache.flink.types.RowKind;
-
-import java.util.stream.IntStream;
 
 /** {@link TableTestProgram} definitions for testing {@link StreamExecJoin}. */
 public class JoinTestPrograms {
@@ -45,63 +42,6 @@ public class JoinTestPrograms {
     public static final TableTestProgram JOIN_WITH_STATE_TTL_HINT;
     public static final TableTestProgram SEMI_ANTI_JOIN_WITH_LITERAL_AGG;
 
-    public static final TableTestProgram OUTER_JOIN_CHANGELOG_TEST =
-            TableTestProgram.of("join-duplicate-emission-bug", "bug with CTE 
and left join")
-                    .setupTableSource(
-                            
SourceTestStep.newBuilder("upsert_table_with_duplicates")
-                                    .addSchema(
-                                            "`execution_plan_id` 
VARCHAR(2147483647) NOT NULL",
-                                            "`workflow_id` VARCHAR(2147483647) 
NOT NULL",
-                                            "`event_section_id` 
VARCHAR(2147483647) NOT NULL",
-                                            "CONSTRAINT `PRIMARY` PRIMARY KEY 
(`execution_plan_id`, `event_section_id`) NOT ENFORCED")
-                                    .addOption("changelog-mode", "I, UA,D")
-                                    .producedValues(
-                                            IntStream.range(0, 13)
-                                                    .mapToObj(
-                                                            i ->
-                                                                    Row.ofKind(
-                                                                            
RowKind.UPDATE_AFTER,
-                                                                            
"section_id_1",
-                                                                            
"section_id_2",
-                                                                            
"section_id_3"))
-                                                    .toArray(Row[]::new))
-                                    .build())
-                    .setupTableSink(
-                            SinkTestStep.newBuilder("sink")
-                                    .addSchema("event_element_id STRING", "cnt 
BIGINT")
-                                    .testMaterializedData()
-                                    .consumedValues(Row.of("pk-1", 1), 
Row.of("pk-2", 1))
-                                    .build())
-                    .runSql(
-                            "INSERT INTO sink WITH\n"
-                                    + "    section_detail as (\n"
-                                    + "        SELECT s.event_section_id\n"
-                                    + "        \n"
-                                    + "        FROM 
upsert_table_with_duplicates s\n"
-                                    + "    ),\n"
-                                    + "\n"
-                                    + "    event_element as (\n"
-                                    + "        SELECT\n"
-                                    + "            ed.id as event_element_id\n"
-                                    + "        FROM (\n"
-                                    + "          SELECT\n"
-                                    + "                 'pk-2' id,\n"
-                                    + "                 'section_id_3' 
section_id\n"
-                                    + "           UNION ALL\n"
-                                    + "          SELECT\n"
-                                    + "                 'pk-1' id,\n"
-                                    + "                 'section_id_3' 
section_id\n"
-                                    + "        ) ed  \n"
-                                    + "        LEFT JOIN\n"
-                                    + "            section_detail as s\n"
-                                    + "            ON s.event_section_id = 
ed.section_id\n"
-                                    + "    )\n"
-                                    + "\n"
-                                    + "SELECT  event_element_id, COUNT(*) 
cnt\n"
-                                    + "FROM event_element\n"
-                                    + "GROUP BY event_element_id")
-                    .build();
-
     static final SourceTestStep EMPLOYEE =
             SourceTestStep.newBuilder("EMPLOYEE")
                     .addSchema("deptno int", "salary bigint", "name varchar")
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinSemanticTests.java
index b7970e3aa3f..2501a7e6261 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinSemanticTests.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
-import org.apache.flink.table.planner.plan.nodes.exec.common.JoinTestPrograms;
+import 
org.apache.flink.table.planner.plan.nodes.exec.common.JoinSemanticTestPrograms;
 import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.SemanticTestBase;
 import org.apache.flink.table.test.program.TableTestProgram;
 
@@ -28,6 +28,8 @@ import java.util.List;
 public class JoinSemanticTests extends SemanticTestBase {
     @Override
     public List<TableTestProgram> programs() {
-        return List.of(JoinTestPrograms.OUTER_JOIN_CHANGELOG_TEST);
+        return List.of(
+                JoinSemanticTestPrograms.OUTER_JOIN_CHANGELOG_TEST,
+                JoinSemanticTestPrograms.ANTI_JOIN_ON_NESTED);
     }
 }

Reply via email to