This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch release-2.2 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6290979fa1a95d6b7e19c465ec03dfefcbd63c9d Author: Sergey Nuyanzin <[email protected]> AuthorDate: Mon Nov 10 18:17:37 2025 +0100 [FLINK-33217][table] `UNNEST` fails with on `LEFT JOIN` with `NOT NULL` type in array --- .../plan/rules/logical/LogicalUnnestRule.java | 37 ++++- .../batch/BatchPhysicalCorrelateRule.scala | 4 +- .../table/planner/plan/batch/sql/UnnestTest.xml | 138 +++++++++++++++++++ .../plan/rules/logical/LogicalUnnestRuleTest.xml | 150 +++++++++++++++++++++ .../table/planner/plan/stream/sql/UnnestTest.xml | 138 +++++++++++++++++++ .../table/planner/plan/common/UnnestTestBase.scala | 52 ++++++- 6 files changed, 516 insertions(+), 3 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.java index b9eb4f24f11..a0805e4b110 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.java @@ -33,17 +33,20 @@ import org.apache.calcite.plan.RelRule; import org.apache.calcite.plan.hep.HepRelVertex; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Correlate; +import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.core.Uncollect; import org.apache.calcite.rel.logical.LogicalCorrelate; import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.logical.LogicalTableFunctionScan; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexNode; import org.immutables.value.Value; import java.util.Collections; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.flink.table.types.logical.utils.LogicalTypeUtils.toRowType; @@ -103,7 +106,10 @@ public class LogicalUnnestRule extends RelRule<LogicalUnnestRule.LogicalUnnestRu relNode = convert(getRel(hepRelVertex), correlate); } if (relNode instanceof LogicalProject) { - LogicalProject logicalProject = (LogicalProject) relNode; + final LogicalProject logicalProject = + correlate.getJoinType() == JoinRelType.LEFT + ? getLogicalProjectWithAdjustedNullability((LogicalProject) relNode) + : (LogicalProject) relNode; return logicalProject.copy( logicalProject.getTraitSet(), ImmutableList.of(convert(getRel(logicalProject.getInput()), correlate))); @@ -161,6 +167,35 @@ public class LogicalUnnestRule extends RelRule<LogicalUnnestRule.LogicalUnnestRu return rel; } + /** + * If unnesting type is {@code NOT NULL} however at the same time {@code LEFT JOIN} makes it + * nullable, this method adjusts nullability by inserting extra {@code CAST}. + */ + private LogicalProject getLogicalProjectWithAdjustedNullability(LogicalProject logicalProject) { + final RelOptCluster cluster = logicalProject.getCluster(); + FlinkTypeFactory typeFactory = (FlinkTypeFactory) cluster.getTypeFactory(); + RexBuilder rexBuilder = cluster.getRexBuilder(); + final RelDataType rowType = logicalProject.getRowType(); + return logicalProject.copy( + logicalProject.getTraitSet(), + logicalProject.getInput(), + logicalProject.getProjects().stream() + .map( + t -> { + if (t.getType().isNullable()) { + return t; + } + return rexBuilder.makeCast( + createNullableType(typeFactory, t.getType()), t); + }) + .collect(Collectors.toList()), + rowType.isNullable() ? rowType : createNullableType(typeFactory, rowType)); + } + + private static RelDataType createNullableType(FlinkTypeFactory typeFactory, RelDataType type) { + return typeFactory.createTypeWithNullability(type, true); + } + /** Rule configuration. */ @Value.Immutable(singleton = false) public interface LogicalUnnestRuleConfig extends RelRule.Config { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala index 9f714f2057c..471f40cce18 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala @@ -64,7 +64,9 @@ class BatchPhysicalCorrelateRule(config: Config) extends ConverterRule(config) { case calc: FlinkLogicalCalc => convertToCorrelate( calc.getInput.asInstanceOf[RelSubset].getOriginal, - Some(calc.getProgram.expandLocalRef(calc.getProgram.getCondition))) + if (calc.getProgram.getCondition == null) None + else Some(calc.getProgram.expandLocalRef(calc.getProgram.getCondition)) + ) case scan: FlinkLogicalTableFunctionScan => new BatchPhysicalCorrelate( diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml index 2b8cfaaffc1..c0c753fbe87 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml @@ -128,6 +128,144 @@ Calc(select=[a, f0 AS s]) +- Sort(orderBy=[a ASC]) +- Calc(select=[a, b], where=[(a < 5)]) +- BoundedStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c]) +]]> + </Resource> + </TestCase> + <TestCase name="testNullMismatchCrossJoin"> + <Resource name="sql"> + <![CDATA[SELECT bd_name FROM nested_not_null CROSS JOIN UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name)]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(bd_name=[$3]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}]) + :- LogicalTableScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]]) + +- LogicalProject(bd_name=[$0]) + +- Uncollect + +- LogicalProject(business_data=[$cor0.business_data]) + +- LogicalValues(tuples=[[{ 0 }]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[f0 AS bd_name]) ++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], correlate=[table($UNNEST_ROWS$1($cor0.business_data))], select=[business_data,nested,nested_array,f0], rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, VARCHAR(2147483647) f0)], joinType=[INNER]) + +- LegacyTableSourceScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]], fields=[business_data, nested, nested_array]) +]]> + </Resource> + </TestCase> + <TestCase name="testNullMismatchLeftJoin"> + <Resource name="sql"> + <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name) ON TRUE]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(bd_name=[$3]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0}]) + :- LogicalTableScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]]) + +- LogicalProject(bd_name=[$0]) + +- Uncollect + +- LogicalProject(business_data=[$cor0.business_data]) + +- LogicalValues(tuples=[[{ 0 }]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[bd_name]) ++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], correlate=[table($UNNEST_ROWS$1($cor0.business_data))], select=[business_data,nested,nested_array,bd_name], rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, VARCHAR(2147483647) bd_name)], joinType=[LEFT]) + +- LegacyTableSourceScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]], fields=[business_data, nested, nested_array]) +]]> + </Resource> + </TestCase> + <TestCase name="testNullMismatchLeftJoinOnNested"> + <Resource name="sql"> + <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN UNNEST(nested_not_null.nested.data) AS exploded_bd(bd_name) ON TRUE]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(bd_name=[$3]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{1}]) + :- LogicalTableScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]]) + +- LogicalProject(bd_name=[$0]) + +- Uncollect + +- LogicalProject(data=[$cor0.nested.data]) + +- LogicalValues(tuples=[[{ 0 }]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[bd_name]) ++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)], correlate=[table($UNNEST_ROWS$1($cor0.nested.data))], select=[business_data,nested,nested_array,bd_name], rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, VARCHAR(2147483647) bd_name)], joinType=[LEFT]) + +- LegacyTableSourceScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]], fields=[business_data, nested, nested_array]) +]]> + </Resource> + </TestCase> + <TestCase name="testNullMismatchLeftJoinOnNestedArray"> + <Resource name="sql"> + <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN UNNEST(nested_not_null.nested_array[0].data) AS exploded_bd(bd_name) ON TRUE]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(bd_name=[$3]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{2}]) + :- LogicalTableScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]]) + +- LogicalProject(bd_name=[$0]) + +- Uncollect + +- LogicalProject(EXPR$0=[ITEM($cor0.nested_array, 0).data]) + +- LogicalValues(tuples=[[{ 0 }]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[bd_name]) ++- Correlate(invocation=[$UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data)], correlate=[table($UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data))], select=[business_data,nested,nested_array,bd_name], rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, VARCHAR(2147483647) bd_name)], joinType=[LEFT]) + +- LegacyTableSourceScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]], fields=[business_data, nested, nested_array]) +]]> + </Resource> + </TestCase> + <TestCase name="testNullMismatchNaturalJoin"> + <Resource name="sql"> + <![CDATA[SELECT bd_name FROM nested_not_null NATURAL JOIN UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name)]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(bd_name=[$3]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}]) + :- LogicalTableScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]]) + +- LogicalProject(bd_name=[$0]) + +- Uncollect + +- LogicalProject(business_data=[$cor0.business_data]) + +- LogicalValues(tuples=[[{ 0 }]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[f0 AS bd_name]) ++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], correlate=[table($UNNEST_ROWS$1($cor0.business_data))], select=[business_data,nested,nested_array,f0], rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, VARCHAR(2147483647) f0)], joinType=[INNER]) + +- LegacyTableSourceScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]], fields=[business_data, nested, nested_array]) +]]> + </Resource> + </TestCase> + <TestCase name="testNullMismatchNaturalJoinOnNested"> + <Resource name="sql"> + <![CDATA[SELECT bd_name FROM nested_not_null NATURAL JOIN UNNEST(nested_not_null.nested.data) AS exploded_bd(bd_name)]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(bd_name=[$3]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1}]) + :- LogicalTableScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]]) + +- LogicalProject(bd_name=[$0]) + +- Uncollect + +- LogicalProject(data=[$cor0.nested.data]) + +- LogicalValues(tuples=[[{ 0 }]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[f0 AS bd_name]) ++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)], correlate=[table($UNNEST_ROWS$1($cor0.nested.data))], select=[business_data,nested,nested_array,f0], rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, VARCHAR(2147483647) f0)], joinType=[INNER]) + +- LegacyTableSourceScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]], fields=[business_data, nested, nested_array]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml index 6c744e43ca0..930a6d94bf5 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml @@ -133,6 +133,156 @@ LogicalProject(a=[$0], s=[$2]) : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +- LogicalProject(s=[$0]) +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.set)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)]) +]]> + </Resource> + </TestCase> + <TestCase name="testNullMismatchCrossJoin"> + <Resource name="sql"> + <![CDATA[SELECT bd_name FROM nested_not_null CROSS JOIN UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name)]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(bd_name=[$3]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}]) + :- LogicalTableScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]]) + +- LogicalProject(bd_name=[$0]) + +- Uncollect + +- LogicalProject(business_data=[$cor0.business_data]) + +- LogicalValues(tuples=[[{ 0 }]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +LogicalProject(bd_name=[$3]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}]) + :- LogicalTableScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]]) + +- LogicalProject(bd_name=[$0]) + +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.business_data)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)]) +]]> + </Resource> + </TestCase> + <TestCase name="testNullMismatchLeftJoin"> + <Resource name="sql"> + <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name) ON TRUE]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(bd_name=[$3]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0}]) + :- LogicalTableScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]]) + +- LogicalProject(bd_name=[$0]) + +- Uncollect + +- LogicalProject(business_data=[$cor0.business_data]) + +- LogicalValues(tuples=[[{ 0 }]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +LogicalProject(bd_name=[$3]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0}]) + :- LogicalTableScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]]) + +- LogicalProject(bd_name=[CAST($0):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]) + +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.business_data)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)]) +]]> + </Resource> + </TestCase> + <TestCase name="testNullMismatchLeftJoinOnNested"> + <Resource name="sql"> + <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN UNNEST(nested_not_null.nested.data) AS exploded_bd(bd_name) ON TRUE]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(bd_name=[$3]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{1}]) + :- LogicalTableScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]]) + +- LogicalProject(bd_name=[$0]) + +- Uncollect + +- LogicalProject(data=[$cor0.nested.data]) + +- LogicalValues(tuples=[[{ 0 }]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +LogicalProject(bd_name=[$3]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{1}]) + :- LogicalTableScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]]) + +- LogicalProject(bd_name=[CAST($0):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]) + +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.nested.data)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)]) +]]> + </Resource> + </TestCase> + <TestCase name="testNullMismatchLeftJoinOnNestedArray"> + <Resource name="sql"> + <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN UNNEST(nested_not_null.nested_array[0].data) AS exploded_bd(bd_name) ON TRUE]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(bd_name=[$3]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{2}]) + :- LogicalTableScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]]) + +- LogicalProject(bd_name=[$0]) + +- Uncollect + +- LogicalProject(EXPR$0=[ITEM($cor0.nested_array, 0).data]) + +- LogicalValues(tuples=[[{ 0 }]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +LogicalProject(bd_name=[$3]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{2}]) + :- LogicalTableScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]]) + +- LogicalProject(bd_name=[CAST($0):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]) + +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)]) +]]> + </Resource> + </TestCase> + <TestCase name="testNullMismatchNaturalJoin"> + <Resource name="sql"> + <![CDATA[SELECT bd_name FROM nested_not_null NATURAL JOIN UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name)]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(bd_name=[$3]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}]) + :- LogicalTableScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]]) + +- LogicalProject(bd_name=[$0]) + +- Uncollect + +- LogicalProject(business_data=[$cor0.business_data]) + +- LogicalValues(tuples=[[{ 0 }]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +LogicalProject(bd_name=[$3]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}]) + :- LogicalTableScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]]) + +- LogicalProject(bd_name=[$0]) + +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.business_data)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)]) +]]> + </Resource> + </TestCase> + <TestCase name="testNullMismatchNaturalJoinOnNested"> + <Resource name="sql"> + <![CDATA[SELECT bd_name FROM nested_not_null NATURAL JOIN UNNEST(nested_not_null.nested.data) AS exploded_bd(bd_name)]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(bd_name=[$3]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1}]) + :- LogicalTableScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]]) + +- LogicalProject(bd_name=[$0]) + +- Uncollect + +- LogicalProject(data=[$cor0.nested.data]) + +- LogicalValues(tuples=[[{ 0 }]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +LogicalProject(bd_name=[$3]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1}]) + :- LogicalTableScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]]) + +- LogicalProject(bd_name=[$0]) + +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.nested.data)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml index c33bc89966d..b925b5ce5c7 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml @@ -123,6 +123,144 @@ Calc(select=[a, f0 AS s]) +- Exchange(distribution=[hash[a]]) +- Calc(select=[a, b], where=[(a < 5)]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c]) +]]> + </Resource> + </TestCase> + <TestCase name="testNullMismatchCrossJoin"> + <Resource name="sql"> + <![CDATA[SELECT bd_name FROM nested_not_null CROSS JOIN UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name)]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(bd_name=[$3]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}]) + :- LogicalTableScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]]) + +- LogicalProject(bd_name=[$0]) + +- Uncollect + +- LogicalProject(business_data=[$cor0.business_data]) + +- LogicalValues(tuples=[[{ 0 }]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[f0 AS bd_name]) ++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], correlate=[table($UNNEST_ROWS$1($cor0.business_data))], select=[business_data,nested,nested_array,f0], rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, VARCHAR(2147483647) f0)], joinType=[INNER]) + +- LegacyTableSourceScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]], fields=[business_data, nested, nested_array]) +]]> + </Resource> + </TestCase> + <TestCase name="testNullMismatchLeftJoin"> + <Resource name="sql"> + <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name) ON TRUE]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(bd_name=[$3]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0}]) + :- LogicalTableScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]]) + +- LogicalProject(bd_name=[$0]) + +- Uncollect + +- LogicalProject(business_data=[$cor0.business_data]) + +- LogicalValues(tuples=[[{ 0 }]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[bd_name]) ++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], correlate=[table($UNNEST_ROWS$1($cor0.business_data))], select=[business_data,nested,nested_array,bd_name], rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, VARCHAR(2147483647) bd_name)], joinType=[LEFT]) + +- LegacyTableSourceScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]], fields=[business_data, nested, nested_array]) +]]> + </Resource> + </TestCase> + <TestCase name="testNullMismatchLeftJoinOnNested"> + <Resource name="sql"> + <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN UNNEST(nested_not_null.nested.data) AS exploded_bd(bd_name) ON TRUE]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(bd_name=[$3]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{1}]) + :- LogicalTableScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]]) + +- LogicalProject(bd_name=[$0]) + +- Uncollect + +- LogicalProject(data=[$cor0.nested.data]) + +- LogicalValues(tuples=[[{ 0 }]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[bd_name]) ++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)], correlate=[table($UNNEST_ROWS$1($cor0.nested.data))], select=[business_data,nested,nested_array,bd_name], rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, VARCHAR(2147483647) bd_name)], joinType=[LEFT]) + +- LegacyTableSourceScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]], fields=[business_data, nested, nested_array]) +]]> + </Resource> + </TestCase> + <TestCase name="testNullMismatchLeftJoinOnNestedArray"> + <Resource name="sql"> + <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN UNNEST(nested_not_null.nested_array[0].data) AS exploded_bd(bd_name) ON TRUE]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(bd_name=[$3]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{2}]) + :- LogicalTableScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]]) + +- LogicalProject(bd_name=[$0]) + +- Uncollect + +- LogicalProject(EXPR$0=[ITEM($cor0.nested_array, 0).data]) + +- LogicalValues(tuples=[[{ 0 }]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[bd_name]) ++- Correlate(invocation=[$UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data)], correlate=[table($UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data))], select=[business_data,nested,nested_array,bd_name], rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, VARCHAR(2147483647) bd_name)], joinType=[LEFT]) + +- LegacyTableSourceScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]], fields=[business_data, nested, nested_array]) +]]> + </Resource> + </TestCase> + <TestCase name="testNullMismatchNaturalJoin"> + <Resource name="sql"> + <![CDATA[SELECT bd_name FROM nested_not_null NATURAL JOIN UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name)]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(bd_name=[$3]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}]) + :- LogicalTableScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]]) + +- LogicalProject(bd_name=[$0]) + +- Uncollect + +- LogicalProject(business_data=[$cor0.business_data]) + +- LogicalValues(tuples=[[{ 0 }]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[f0 AS bd_name]) ++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], correlate=[table($UNNEST_ROWS$1($cor0.business_data))], select=[business_data,nested,nested_array,f0], rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, VARCHAR(2147483647) f0)], joinType=[INNER]) + +- LegacyTableSourceScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]], fields=[business_data, nested, nested_array]) +]]> + </Resource> + </TestCase> + <TestCase name="testNullMismatchNaturalJoinOnNested"> + <Resource name="sql"> + <![CDATA[SELECT bd_name FROM nested_not_null NATURAL JOIN UNNEST(nested_not_null.nested.data) AS exploded_bd(bd_name)]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(bd_name=[$3]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1}]) + :- LogicalTableScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]]) + +- LogicalProject(bd_name=[$0]) + +- Uncollect + +- LogicalProject(data=[$cor0.nested.data]) + +- LogicalValues(tuples=[[{ 0 }]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[f0 AS bd_name]) ++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)], correlate=[table($UNNEST_ROWS$1($cor0.nested.data))], select=[business_data,nested,nested_array,f0], rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, VARCHAR(2147483647) f0)], joinType=[INNER]) + +- LegacyTableSourceScan(table=[[default_catalog, default_database, nested_not_null, source: [CollectionTableSource(business_data, nested, nested_array)]]], fields=[business_data, nested, nested_array]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/UnnestTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/UnnestTestBase.scala index f0dabc8eeac..d1ee04db109 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/UnnestTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/UnnestTestBase.scala @@ -21,7 +21,7 @@ import org.apache.flink.table.api._ import org.apache.flink.table.planner.utils.{TableTestBase, TableTestUtil} import org.apache.flink.table.types.AbstractDataType -import org.junit.jupiter.api.Test +import org.junit.jupiter.api.{BeforeEach, Test} import java.sql.Timestamp @@ -32,6 +32,20 @@ abstract class UnnestTestBase(withExecPlan: Boolean) extends TableTestBase { protected def getTableTestUtil: TableTestUtil + @BeforeEach + def setupBeforeEach(): Unit = { + util.addTable(""" + |CREATE TABLE nested_not_null ( + | business_data ARRAY<STRING NOT NULL>, + | nested ROW<`data` ARRAY<STRING NOT NULL>>, + | nested_array ARRAY<ROW<`data` ARRAY<STRING NOT NULL>> NOT NULL> + |) WITH ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'true' + |) + """.stripMargin) + } + @Test def testUnnestPrimitiveArrayFromTable(): Unit = { util.addTableSource[(Int, Array[Int], Array[Array[Int]])]("MyTable", 'a, 'b, 'c) @@ -232,6 +246,42 @@ abstract class UnnestTestBase(withExecPlan: Boolean) extends TableTestBase { |""".stripMargin) } + @Test + def testNullMismatchLeftJoin(): Unit = { + util.verifyRelPlan( + "SELECT bd_name FROM nested_not_null LEFT JOIN UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name) ON TRUE") + } + + @Test + def testNullMismatchCrossJoin(): Unit = { + util.verifyRelPlan( + "SELECT bd_name FROM nested_not_null CROSS JOIN UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name)") + } + + @Test + def testNullMismatchNaturalJoin(): Unit = { + util.verifyRelPlan( + "SELECT bd_name FROM nested_not_null NATURAL JOIN UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name)") + } + + @Test + def testNullMismatchNaturalJoinOnNested(): Unit = { + util.verifyRelPlan( + "SELECT bd_name FROM nested_not_null NATURAL JOIN UNNEST(nested_not_null.nested.data) AS exploded_bd(bd_name)") + } + + @Test + def testNullMismatchLeftJoinOnNested(): Unit = { + util.verifyRelPlan( + "SELECT bd_name FROM nested_not_null LEFT JOIN UNNEST(nested_not_null.nested.data) AS exploded_bd(bd_name) ON TRUE") + } + + @Test + def testNullMismatchLeftJoinOnNestedArray(): Unit = { + util.verifyRelPlan( + "SELECT bd_name FROM nested_not_null LEFT JOIN UNNEST(nested_not_null.nested_array[0].data) AS exploded_bd(bd_name) ON TRUE") + } + def verifyPlan(sql: String): Unit = { if (withExecPlan) { util.verifyExecPlan(sql)
