This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6161043a2afbd458f2587f3f63df60f620c755b7 Author: Timo Walther <twal...@apache.org> AuthorDate: Fri Jun 28 13:34:29 2019 +0200 [FLINK-13028][table-api-java] Extract legacy type inference logic --- .../table/delegation/PlannerTypeInferenceUtil.java | 75 +++++++++++ .../expressions/PlannerTypeInferenceUtilImpl.java | 140 +++++++++++++++++++++ .../rules/ResolveCallByArgumentsRule.java | 127 ++++--------------- 3 files changed, 237 insertions(+), 105 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java new file mode 100644 index 0000000..4e003ff --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.delegation; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.UnresolvedCallExpression; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.TypeInference; +import org.apache.flink.table.types.inference.TypeInferenceUtil; + +import java.lang.reflect.Constructor; +import java.util.List; + +/** + * Temporary utility for validation and output type inference until all {@code PlannerExpression} are + * upgraded to work with {@link TypeInferenceUtil}. + */ +@Internal +public interface PlannerTypeInferenceUtil { + + static PlannerTypeInferenceUtil create() { + return SingletonPlannerTypeInferenceUtil.getPlannerTypeInferenceUtil(); + } + + /** + * Same behavior as {@link TypeInferenceUtil#runTypeInference(TypeInference, CallContext)}. + */ + TypeInferenceUtil.Result runTypeInference( + UnresolvedCallExpression unresolvedCall, + List<ResolvedExpression> resolvedArgs); + + /** + * A singleton pattern utility for avoiding creating many {@link PlannerTypeInferenceUtil}. + */ + class SingletonPlannerTypeInferenceUtil { + + private static PlannerTypeInferenceUtil plannerTypeInferenceUtil; + + public static PlannerTypeInferenceUtil getPlannerTypeInferenceUtil() { + if (plannerTypeInferenceUtil == null) { + try { + final Class<?> clazz = + Class.forName("org.apache.flink.table.expressions.PlannerTypeInferenceUtilImpl"); + final Constructor<?> con = clazz.getConstructor(); + plannerTypeInferenceUtil = (PlannerTypeInferenceUtil) con.newInstance(); + } catch (Throwable t) { + throw new TableException("Instantiation of PlannerTypeInferenceUtil failed.", t); + } + } + return plannerTypeInferenceUtil; + } + + private SingletonPlannerTypeInferenceUtil() { + // no instantiation + } + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java new file mode 100644 index 0000000..30c3421 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.delegation.PlannerTypeInferenceUtil; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.TypeInferenceUtil; +import org.apache.flink.table.typeutils.TypeCoercion; +import org.apache.flink.table.validate.ValidationFailure; +import org.apache.flink.table.validate.ValidationResult; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType; +import static org.apache.flink.table.util.JavaScalaConversionUtil.toJava; + +/** + * Implementation of {@link PlannerTypeInferenceUtil}. + */ +@Internal +public final class PlannerTypeInferenceUtilImpl implements PlannerTypeInferenceUtil { + + private static final PlannerExpressionConverter CONVERTER = PlannerExpressionConverter.INSTANCE(); + + @Override + public TypeInferenceUtil.Result runTypeInference( + UnresolvedCallExpression unresolvedCall, + List<ResolvedExpression> resolvedArgs) { + final PlannerExpression plannerCall = unresolvedCall.accept(CONVERTER); + + if (plannerCall instanceof InputTypeSpec) { + return resolveWithCastedAssignment( + unresolvedCall, + resolvedArgs, + toJava(((InputTypeSpec) plannerCall).expectedTypes()), + plannerCall.resultType()); + } else { + validateArguments(plannerCall); + + final List<DataType> expectedArgumentTypes = resolvedArgs.stream() + .map(ResolvedExpression::getOutputDataType) + .collect(Collectors.toList()); + + return new TypeInferenceUtil.Result( + expectedArgumentTypes, + null, + fromLegacyInfoToDataType(plannerCall.resultType())); + } + } + + private TypeInferenceUtil.Result resolveWithCastedAssignment( + UnresolvedCallExpression unresolvedCall, + List<ResolvedExpression> args, + List<TypeInformation<?>> expectedTypes, + TypeInformation<?> resultType) { + + final List<PlannerExpression> plannerArgs = unresolvedCall.getChildren() + .stream() + .map(e -> e.accept(CONVERTER)) + .collect(Collectors.toList()); + + final List<DataType> castedArgs = IntStream.range(0, plannerArgs.size()) + .mapToObj(idx -> castIfNeeded( + args.get(idx), + plannerArgs.get(idx), + expectedTypes.get(idx))) + .collect(Collectors.toList()); + + return new TypeInferenceUtil.Result( + castedArgs, + null, + fromLegacyInfoToDataType(resultType)); + } + + private void validateArguments(PlannerExpression plannerCall) { + if (!plannerCall.valid()) { + throw new ValidationException( + getValidationErrorMessage(plannerCall) + .orElse("Unexpected behavior, validation failed but can't get error messages!")); + } + } + + /** + * Return the validation error message of this {@link PlannerExpression} or return the + * validation error message of it's children if it passes the validation. Return empty if + * all validation succeeded. + */ + private Optional<String> getValidationErrorMessage(PlannerExpression plannerCall) { + ValidationResult validationResult = plannerCall.validateInput(); + if (validationResult instanceof ValidationFailure) { + return Optional.of(((ValidationFailure) validationResult).message()); + } else { + for (Expression plannerExpression: plannerCall.getChildren()) { + Optional<String> errorMessage = getValidationErrorMessage((PlannerExpression) plannerExpression); + if (errorMessage.isPresent()) { + return errorMessage; + } + } + } + return Optional.empty(); + } + + private DataType castIfNeeded( + ResolvedExpression child, + PlannerExpression plannerChild, + TypeInformation<?> expectedType) { + TypeInformation<?> actualType = plannerChild.resultType(); + if (actualType.equals(expectedType)) { + return child.getOutputDataType(); + } else if (TypeCoercion.canSafelyCast(actualType, expectedType)) { + return fromLegacyInfoToDataType(expectedType); + } else { + throw new ValidationException(String.format("Incompatible type of argument: %s Expected: %s", + child, + expectedType)); + } + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java index d308891..d2ad89c 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java @@ -19,13 +19,11 @@ package org.apache.flink.table.expressions.rules; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.api.TableException; -import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.delegation.PlannerTypeInferenceUtil; +import org.apache.flink.table.expressions.CallExpression; import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.expressions.InputTypeSpec; -import org.apache.flink.table.expressions.PlannerExpression; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.expressions.UnresolvedCallExpression; import org.apache.flink.table.expressions.ValueLiteralExpression; @@ -37,9 +35,6 @@ import org.apache.flink.table.types.inference.CallContext; import org.apache.flink.table.types.inference.TypeInference; import org.apache.flink.table.types.inference.TypeInferenceUtil; import org.apache.flink.table.types.inference.TypeStrategies; -import org.apache.flink.table.typeutils.TypeCoercion; -import org.apache.flink.table.validate.ValidationFailure; -import org.apache.flink.table.validate.ValidationResult; import org.apache.flink.util.Preconditions; import java.util.List; @@ -47,12 +42,11 @@ import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; -import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType; -import static org.apache.flink.table.util.JavaScalaConversionUtil.toJava; - /** - * It checks if a {@link UnresolvedCallExpression} can work with given arguments. - * If the call expects different types of arguments, but the given arguments + * This rule checks if a {@link UnresolvedCallExpression} can work with the given arguments and infers + * the output data type. All function calls are resolved {@link CallExpression} after applying this rule. + * + * <p>If the call expects different types of arguments, but the given arguments * have types that can be casted, a {@link BuiltInFunctionDefinitions#CAST} * expression is inserted. */ @@ -88,6 +82,7 @@ final class ResolveCallByArgumentsRule implements ResolverRule { if (unresolvedCall.getFunctionDefinition() instanceof BuiltInFunctionDefinition) { final BuiltInFunctionDefinition definition = (BuiltInFunctionDefinition) unresolvedCall.getFunctionDefinition(); + if (definition.getTypeInference().getOutputTypeStrategy() != TypeStrategies.MISSING) { return runTypeInference( unresolvedCall, @@ -116,6 +111,21 @@ final class ResolveCallByArgumentsRule implements ResolverRule { return unresolvedCall.resolve(adaptedArguments, inferenceResult.getOutputDataType()); } + private ResolvedExpression runLegacyTypeInference( + UnresolvedCallExpression unresolvedCall, + List<ResolvedExpression> resolvedArgs) { + + final PlannerTypeInferenceUtil util = PlannerTypeInferenceUtil.create(); + + final TypeInferenceUtil.Result inferenceResult = util.runTypeInference( + unresolvedCall, + resolvedArgs); + + final List<ResolvedExpression> adaptedArguments = adaptArguments(inferenceResult, resolvedArgs); + + return unresolvedCall.resolve(adaptedArguments, inferenceResult.getOutputDataType()); + } + /** * Adapts the arguments according to the properties of the {@link TypeInferenceUtil.Result}. */ @@ -142,99 +152,6 @@ final class ResolveCallByArgumentsRule implements ResolverRule { protected Expression defaultMethod(Expression expression) { return expression; } - - // ---------------------------------------------------------------------------------------- - // legacy code - // ---------------------------------------------------------------------------------------- - - private ResolvedExpression runLegacyTypeInference( - UnresolvedCallExpression unresolvedCall, - List<ResolvedExpression> resolvedArgs) { - final PlannerExpression plannerCall = resolutionContext.bridge(unresolvedCall); - - if (plannerCall instanceof InputTypeSpec) { - return resolveWithCastedAssignment( - unresolvedCall, - resolvedArgs, - toJava(((InputTypeSpec) plannerCall).expectedTypes()), - plannerCall.resultType()); - } else { - validateArguments(plannerCall); - - return unresolvedCall.resolve( - resolvedArgs, - fromLegacyInfoToDataType(plannerCall.resultType())); - } - } - - private ResolvedExpression resolveWithCastedAssignment( - UnresolvedCallExpression unresolvedCall, - List<ResolvedExpression> args, - List<TypeInformation<?>> expectedTypes, - TypeInformation<?> resultType) { - - final List<PlannerExpression> plannerArgs = unresolvedCall.getChildren() - .stream() - .map(resolutionContext::bridge) - .collect(Collectors.toList()); - - final List<ResolvedExpression> castedArgs = IntStream.range(0, plannerArgs.size()) - .mapToObj(idx -> castIfNeeded( - args.get(idx), - plannerArgs.get(idx), - expectedTypes.get(idx))) - .collect(Collectors.toList()); - - return unresolvedCall.resolve( - castedArgs, - fromLegacyInfoToDataType(resultType)); - } - - private void validateArguments(PlannerExpression plannerCall) { - if (!plannerCall.valid()) { - throw new ValidationException( - getValidationErrorMessage(plannerCall) - .orElse("Unexpected behavior, validation failed but can't get error messages!")); - } - } - - /** - * Return the validation error message of this {@link PlannerExpression} or return the - * validation error message of it's children if it passes the validation. Return empty if - * all validation succeeded. - */ - private Optional<String> getValidationErrorMessage(PlannerExpression plannerCall) { - ValidationResult validationResult = plannerCall.validateInput(); - if (validationResult instanceof ValidationFailure) { - return Optional.of(((ValidationFailure) validationResult).message()); - } else { - for (Expression plannerExpression: plannerCall.getChildren()) { - Optional<String> errorMessage = getValidationErrorMessage((PlannerExpression) plannerExpression); - if (errorMessage.isPresent()) { - return errorMessage; - } - } - } - return Optional.empty(); - } - - private ResolvedExpression castIfNeeded( - ResolvedExpression child, - PlannerExpression plannerChild, - TypeInformation<?> expectedType) { - TypeInformation<?> actualType = plannerChild.resultType(); - if (actualType.equals(expectedType)) { - return child; - } else if (TypeCoercion.canSafelyCast(actualType, expectedType)) { - return resolutionContext - .postResolutionFactory() - .cast(child, fromLegacyInfoToDataType(expectedType)); - } else { - throw new ValidationException(String.format("Incompatible type of argument: %s Expected: %s", - child, - expectedType)); - } - } } // --------------------------------------------------------------------------------------------