This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch release-2.2 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 41455ec73f58012e2f1e22dc0beec42a324882d3 Author: Sergey Nuyanzin <[email protected]> AuthorDate: Tue Nov 25 09:55:28 2025 +0100 [FLINK-38720][table] Nested nullability might lead to class cast `RexCall` cannot be cast to class `RexFieldAccess` --- .../apache/calcite/sql2rel/SqlToRelConverter.java | 54 +++++++--- .../exec/common/JoinSemanticTestPrograms.java | 113 +++++++++++++++++++++ .../plan/nodes/exec/common/JoinTestPrograms.java | 60 ----------- .../plan/nodes/exec/stream/JoinSemanticTests.java | 6 +- 4 files changed, 155 insertions(+), 78 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java index b226cc57014..4f00abc858b 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java @@ -245,19 +245,21 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * <p>FLINK modifications are at lines * * <ol> - * <li>Added in FLINK-29081, FLINK-28682, FLINK-33395: Lines 686 ~ 703 - * <li>Added in Flink-24024: Lines 1453 ~ 1459 - * <li>Added in Flink-24024: Lines 1473 ~ 1512 - * <li>Added in Flink-37269: Lines 2250 ~ 2272 - * <li>Added in FLINK-28682: Lines 2383 ~ 2400 - * <li>Added in FLINK-28682: Lines 2437 ~ 2465 - * <li>Added in FLINK-32474: Lines 2522 ~ 2524 - * <li>Added in FLINK-32474: Lines 2528 ~ 2530 - * <li>Added in FLINK-32474: Lines 2546 ~ 2548 - * <li>Added in CALCITE-7217: Lines 2587 ~ 2595, should be dropped with upgrade to Calcite 1.41.0 - * <li>Added in FLINK-32474: Lines 2970 ~ 2982 - * <li>Added in FLINK-32474: Lines 3083 ~ 3117 - * <li>Added in FLINK-34312: Lines 5947 ~ 5958 + * <li>Added in FLINK-29081, FLINK-28682, FLINK-33395: Lines 688 ~ 705 + * <li>Added in FLINK-24024: Lines 1455 ~ 1461 + * <li>Added in FLINK-24024: Lines 1475 ~ 1514 + * <li>Added in FLINK-37269: Lines 2252 ~ 2274 + * <li>Added in FLINK-28682: Lines 2385 ~ 2402 + * <li>Added in FLINK-28682: Lines 2439 ~ 2467 + * <li>Added in FLINK-32474: Lines 2524 ~ 2526 + * <li>Added in FLINK-32474: Lines 2530 ~ 2532 + * <li>Added in FLINK-32474: Lines 2548 ~ 2550 + * <li>Added in CALCITE-7217: Lines 2589 ~ 2597, should be dropped with upgrade to Calcite 1.41.0 + * <li>Added in FLINK-32474: Lines 2972 ~ 2984 + * <li>Added in FLINK-32474: Lines 3085 ~ 3119 + * <li>Added in FLINK-38720: Lines 4579 ~ 4585 + * <li>Added in FLINK-38720: Lines 4591 ~ 4607 + * <li>Added in FLINK-34312: Lines 5971 ~ 5982 * </ol> * * <p>In official extension point (i.e. {@link #convertExtendedExpression(SqlNode, Blackboard)}): @@ -4574,14 +4576,34 @@ public class SqlToRelConverter { } if (e0.left instanceof RexCorrelVariable) { - assert e instanceof RexFieldAccess; - final RexNode prev = - bb.mapCorrelateToRex.put(((RexCorrelVariable) e0.left).id, (RexFieldAccess) e); + // ----- FLINK MODIFICATION BEGIN ----- + // adjust the type to account for nulls introduced by FlinkRexBuilder#makeFieldAccess + final RexFieldAccess rfa = adjustRexFieldAccess(e); + final RexNode prev = bb.mapCorrelateToRex.put(((RexCorrelVariable) e0.left).id, rfa); + // ----- FLINK MODIFICATION END ----- assert prev == null; } return e; } + // ----- FLINK MODIFICATION BEGIN ----- + private RexFieldAccess adjustRexFieldAccess(RexNode rexNode) { + // Either RexFieldAccess or CAST of RexFieldAccess to nullable + assert rexNode instanceof RexFieldAccess + || rexNode instanceof RexCall + && rexNode.getKind() == SqlKind.CAST + && ((RexCall) rexNode).getOperands().size() == 1 + && ((RexCall) rexNode).getOperands().get(0) instanceof RexFieldAccess; + + if (rexNode instanceof RexFieldAccess) { + return (RexFieldAccess) rexNode; + } else { + return (RexFieldAccess) ((RexCall) rexNode).getOperands().get(0); + } + } + + // ----- FLINK MODIFICATION END ----- + /** * Adjusts the type of a reference to an input field to account for nulls introduced by outer * joins; and adjusts the offset to match the physical implementation. diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/JoinSemanticTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/JoinSemanticTestPrograms.java new file mode 100644 index 00000000000..b687f5d4a18 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/JoinSemanticTestPrograms.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.common; + +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +import java.util.stream.IntStream; + +/** {@link TableTestProgram} definitions for semantic testing {@link StreamExecJoin}. */ +public class JoinSemanticTestPrograms { + public static final TableTestProgram OUTER_JOIN_CHANGELOG_TEST = + TableTestProgram.of("join-duplicate-emission-bug", "bug with CTE and left join") + .setupTableSource( + SourceTestStep.newBuilder("upsert_table_with_duplicates") + .addSchema( + "`execution_plan_id` VARCHAR(2147483647) NOT NULL", + "`workflow_id` VARCHAR(2147483647) NOT NULL", + "`event_section_id` VARCHAR(2147483647) NOT NULL", + "CONSTRAINT `PRIMARY` PRIMARY KEY (`execution_plan_id`, `event_section_id`) NOT ENFORCED") + .addOption("changelog-mode", "I, UA,D") + .producedValues( + IntStream.range(0, 13) + .mapToObj( + i -> + Row.ofKind( + RowKind.UPDATE_AFTER, + "section_id_1", + "section_id_2", + "section_id_3")) + .toArray(Row[]::new)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("event_element_id STRING", "cnt BIGINT") + .testMaterializedData() + .consumedValues(Row.of("pk-1", 1), Row.of("pk-2", 1)) + .build()) + .runSql( + "INSERT INTO sink WITH\n" + + " section_detail as (\n" + + " SELECT s.event_section_id\n" + + " \n" + + " FROM upsert_table_with_duplicates s\n" + + " ),\n" + + "\n" + + " event_element as (\n" + + " SELECT\n" + + " ed.id as event_element_id\n" + + " FROM (\n" + + " SELECT\n" + + " 'pk-2' id,\n" + + " 'section_id_3' section_id\n" + + " UNION ALL\n" + + " SELECT\n" + + " 'pk-1' id,\n" + + " 'section_id_3' section_id\n" + + " ) ed \n" + + " LEFT JOIN\n" + + " section_detail as s\n" + + " ON s.event_section_id = ed.section_id\n" + + " )\n" + + "\n" + + "SELECT event_element_id, COUNT(*) cnt\n" + + "FROM event_element\n" + + "GROUP BY event_element_id") + .build(); + + public static final TableTestProgram ANTI_JOIN_ON_NESTED = + TableTestProgram.of("anti-join-on-nested", "anti join on nested fields") + .setupTableSource( + SourceTestStep.newBuilder("source_t1") + .addSchema("`ext` ROW<`nested` STRING NOT NULL>") + .producedValues(Row.of(Row.of("test_same"))) + .build()) + .setupTableSource( + SourceTestStep.newBuilder("source_t2") + .addSchema( + "`ext` ROW<`nested` ROW<`nested1` ROW<`nested2` STRING NOT NULL>>>") + .producedValues( + Row.of(Row.of(Row.of(Row.of("test_diff")))), + Row.of(Row.of(Row.of(Row.of("test_same"))))) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema("output STRING") + .consumedValues("+I[test_diff]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT t2.ext.nested.nested1.nested2 FROM source_t2 t2 WHERE" + + " NOT EXISTS (SELECT 1 FROM source_t1 t1 WHERE t1.ext.nested = t2.ext.nested.nested1.nested2)") + .build(); +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/JoinTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/JoinTestPrograms.java index 6f41ef4ff1e..02d3e92b711 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/JoinTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/JoinTestPrograms.java @@ -23,9 +23,6 @@ import org.apache.flink.table.test.program.SinkTestStep; import org.apache.flink.table.test.program.SourceTestStep; import org.apache.flink.table.test.program.TableTestProgram; import org.apache.flink.types.Row; -import org.apache.flink.types.RowKind; - -import java.util.stream.IntStream; /** {@link TableTestProgram} definitions for testing {@link StreamExecJoin}. */ public class JoinTestPrograms { @@ -45,63 +42,6 @@ public class JoinTestPrograms { public static final TableTestProgram JOIN_WITH_STATE_TTL_HINT; public static final TableTestProgram SEMI_ANTI_JOIN_WITH_LITERAL_AGG; - public static final TableTestProgram OUTER_JOIN_CHANGELOG_TEST = - TableTestProgram.of("join-duplicate-emission-bug", "bug with CTE and left join") - .setupTableSource( - SourceTestStep.newBuilder("upsert_table_with_duplicates") - .addSchema( - "`execution_plan_id` VARCHAR(2147483647) NOT NULL", - "`workflow_id` VARCHAR(2147483647) NOT NULL", - "`event_section_id` VARCHAR(2147483647) NOT NULL", - "CONSTRAINT `PRIMARY` PRIMARY KEY (`execution_plan_id`, `event_section_id`) NOT ENFORCED") - .addOption("changelog-mode", "I, UA,D") - .producedValues( - IntStream.range(0, 13) - .mapToObj( - i -> - Row.ofKind( - RowKind.UPDATE_AFTER, - "section_id_1", - "section_id_2", - "section_id_3")) - .toArray(Row[]::new)) - .build()) - .setupTableSink( - SinkTestStep.newBuilder("sink") - .addSchema("event_element_id STRING", "cnt BIGINT") - .testMaterializedData() - .consumedValues(Row.of("pk-1", 1), Row.of("pk-2", 1)) - .build()) - .runSql( - "INSERT INTO sink WITH\n" - + " section_detail as (\n" - + " SELECT s.event_section_id\n" - + " \n" - + " FROM upsert_table_with_duplicates s\n" - + " ),\n" - + "\n" - + " event_element as (\n" - + " SELECT\n" - + " ed.id as event_element_id\n" - + " FROM (\n" - + " SELECT\n" - + " 'pk-2' id,\n" - + " 'section_id_3' section_id\n" - + " UNION ALL\n" - + " SELECT\n" - + " 'pk-1' id,\n" - + " 'section_id_3' section_id\n" - + " ) ed \n" - + " LEFT JOIN\n" - + " section_detail as s\n" - + " ON s.event_section_id = ed.section_id\n" - + " )\n" - + "\n" - + "SELECT event_element_id, COUNT(*) cnt\n" - + "FROM event_element\n" - + "GROUP BY event_element_id") - .build(); - static final SourceTestStep EMPLOYEE = SourceTestStep.newBuilder("EMPLOYEE") .addSchema("deptno int", "salary bigint", "name varchar") diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinSemanticTests.java index b7970e3aa3f..2501a7e6261 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinSemanticTests.java @@ -18,7 +18,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream; -import org.apache.flink.table.planner.plan.nodes.exec.common.JoinTestPrograms; +import org.apache.flink.table.planner.plan.nodes.exec.common.JoinSemanticTestPrograms; import org.apache.flink.table.planner.plan.nodes.exec.testutils.SemanticTestBase; import org.apache.flink.table.test.program.TableTestProgram; @@ -28,6 +28,8 @@ import java.util.List; public class JoinSemanticTests extends SemanticTestBase { @Override public List<TableTestProgram> programs() { - return List.of(JoinTestPrograms.OUTER_JOIN_CHANGELOG_TEST); + return List.of( + JoinSemanticTestPrograms.OUTER_JOIN_CHANGELOG_TEST, + JoinSemanticTestPrograms.ANTI_JOIN_ON_NESTED); } }
