This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 86e0a0b3842 [FLINK-31182][table] Fix CompiledPlan containing UNNEST 86e0a0b3842 is described below commit 86e0a0b384291f9d8bacc3bbef3c58fcfb79bb04 Author: Timo Walther <twal...@apache.org> AuthorDate: Mon Feb 27 14:37:30 2023 +0100 [FLINK-31182][table] Fix CompiledPlan containing UNNEST This closes #22040. --- .../functions/BuiltInFunctionDefinitions.java | 2 +- .../strategies/SpecificTypeStrategies.java | 3 ++ .../inference/strategies/UnusedTypeStrategy.java | 55 ++++++++++++++++++++++ .../stream/jsonplan/CorrelateJsonPlanITCase.java | 30 ++++++++---- 4 files changed, 79 insertions(+), 11 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index 9257de2a569..7b110e87685 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -192,7 +192,7 @@ public final class BuiltInFunctionDefinitions { BuiltInFunctionDefinition.newBuilder() .name("$UNNEST_ROWS$1") .kind(TABLE) - .outputTypeStrategy(TypeStrategies.MISSING) + .outputTypeStrategy(SpecificTypeStrategies.UNUSED) .runtimeClass( "org.apache.flink.table.runtime.functions.table.UnnestRowsFunction") .internal() diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java index 79c7cd47026..c5796b008f3 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java @@ -31,6 +31,9 @@ import org.apache.flink.table.types.inference.TypeStrategy; @Internal public final class SpecificTypeStrategies { + /** See {@link UnusedTypeStrategy}. */ + public static final TypeStrategy UNUSED = new UnusedTypeStrategy(); + /** See {@link RowTypeStrategy}. */ public static final TypeStrategy ROW = new RowTypeStrategy(); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/UnusedTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/UnusedTypeStrategy.java new file mode 100644 index 00000000000..4e292db4275 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/UnusedTypeStrategy.java @@ -0,0 +1,55 @@ +package org.apache.flink.table.types.inference.strategies; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.TypeStrategies; +import org.apache.flink.table.types.inference.TypeStrategy; + +import java.util.Optional; + +/** + * Placeholder for an unused type strategy. + * + * <p>For some internal helper functions the planner does not use a type inference. For example, + * {@link BuiltInFunctionDefinitions#INTERNAL_UNNEST_ROWS}). Those functions differ from {@link + * TypeStrategies#MISSING} in the sense that there is neither a Flink stack type inference nor a + * Calcite stack type inference being used. Instead, the types are created by a planner rule. + */ +@Internal +final class UnusedTypeStrategy implements TypeStrategy { + + @Override + public Optional<DataType> inferType(CallContext callContext) { + return Optional.empty(); + } + + @Override + public boolean equals(Object o) { + return o instanceof UnusedTypeStrategy; + } + + @Override + public int hashCode() { + return UnusedTypeStrategy.class.hashCode(); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CorrelateJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CorrelateJsonPlanITCase.java index a12a14f402a..1874771514f 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CorrelateJsonPlanITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CorrelateJsonPlanITCase.java @@ -26,7 +26,6 @@ import org.apache.flink.types.Row; import org.junit.Before; import org.junit.Test; -import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -42,8 +41,7 @@ public class CorrelateJsonPlanITCase extends JsonPlanTestBase { } @Test - public void testSystemFuncByObject() - throws ExecutionException, InterruptedException, IOException { + public void testSystemFuncByObject() throws ExecutionException, InterruptedException { tableEnv.createTemporarySystemFunction( "STRING_SPLIT", new JavaUserDefinedTableFunctions.StringSplit()); createTestValuesSinkTable("MySink", "a STRING", "b STRING"); @@ -55,8 +53,7 @@ public class CorrelateJsonPlanITCase extends JsonPlanTestBase { } @Test - public void testSystemFuncByClass() - throws ExecutionException, InterruptedException, IOException { + public void testSystemFuncByClass() throws ExecutionException, InterruptedException { tableEnv.createTemporarySystemFunction( "STRING_SPLIT", JavaUserDefinedTableFunctions.StringSplit.class); createTestValuesSinkTable("MySink", "a STRING", "b STRING"); @@ -68,8 +65,7 @@ public class CorrelateJsonPlanITCase extends JsonPlanTestBase { } @Test - public void testTemporaryFuncByObject() - throws ExecutionException, InterruptedException, IOException { + public void testTemporaryFuncByObject() throws ExecutionException, InterruptedException { tableEnv.createTemporaryFunction( "STRING_SPLIT", new JavaUserDefinedTableFunctions.StringSplit()); createTestValuesSinkTable("MySink", "a STRING", "b STRING"); @@ -81,8 +77,7 @@ public class CorrelateJsonPlanITCase extends JsonPlanTestBase { } @Test - public void testTemporaryFuncByClass() - throws ExecutionException, InterruptedException, IOException { + public void testTemporaryFuncByClass() throws ExecutionException, InterruptedException { tableEnv.createTemporaryFunction( "STRING_SPLIT", JavaUserDefinedTableFunctions.StringSplit.class); createTestValuesSinkTable("MySink", "a STRING", "b STRING"); @@ -94,7 +89,7 @@ public class CorrelateJsonPlanITCase extends JsonPlanTestBase { } @Test - public void testFilter() throws ExecutionException, InterruptedException, IOException { + public void testFilter() throws ExecutionException, InterruptedException { tableEnv.createTemporarySystemFunction( "STRING_SPLIT", new JavaUserDefinedTableFunctions.StringSplit()); createTestValuesSinkTable("MySink", "a STRING", "b STRING"); @@ -106,4 +101,19 @@ public class CorrelateJsonPlanITCase extends JsonPlanTestBase { List<String> expected = Arrays.asList("+I[1,1,hi, 1]", "+I[1,1,hi, 1]"); assertResult(expected, TestValuesTableFactory.getResults("MySink")); } + + @Test + public void testUnnest() throws ExecutionException, InterruptedException { + List<Row> data = + Collections.singletonList( + Row.of("Bob", new Row[] {Row.of("1"), Row.of("2"), Row.of("3")})); + createTestValuesSourceTable( + "MyNestedTable", data, "name STRING", "arr ARRAY<ROW<nested STRING>>"); + createTestValuesSinkTable("MySink", "name STRING", "nested STRING"); + String query = + "INSERT INTO MySink SELECT name, nested FROM MyNestedTable CROSS JOIN UNNEST(arr) AS t (nested)"; + compileSqlAndExecutePlan(query).await(); + List<String> expected = Arrays.asList("+I[Bob, 1]", "+I[Bob, 2]", "+I[Bob, 3]"); + assertResult(expected, TestValuesTableFactory.getResults("MySink")); + } }