This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6161043a2afbd458f2587f3f63df60f620c755b7
Author: Timo Walther <twal...@apache.org>
AuthorDate: Fri Jun 28 13:34:29 2019 +0200

    [FLINK-13028][table-api-java] Extract legacy type inference logic
---
 .../table/delegation/PlannerTypeInferenceUtil.java |  75 +++++++++++
 .../expressions/PlannerTypeInferenceUtilImpl.java  | 140 +++++++++++++++++++++
 .../rules/ResolveCallByArgumentsRule.java          | 127 ++++---------------
 3 files changed, 237 insertions(+), 105 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java
new file mode 100644
index 0000000..4e003ff
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.delegation;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.inference.TypeInferenceUtil;
+
+import java.lang.reflect.Constructor;
+import java.util.List;
+
+/**
+ * Temporary utility for validation and output type inference until all {@code 
PlannerExpression} are
+ * upgraded to work with {@link TypeInferenceUtil}.
+ */
+@Internal
+public interface PlannerTypeInferenceUtil {
+
+       static PlannerTypeInferenceUtil create() {
+               return 
SingletonPlannerTypeInferenceUtil.getPlannerTypeInferenceUtil();
+       }
+
+       /**
+        * Same behavior as {@link 
TypeInferenceUtil#runTypeInference(TypeInference, CallContext)}.
+        */
+       TypeInferenceUtil.Result runTypeInference(
+               UnresolvedCallExpression unresolvedCall,
+               List<ResolvedExpression> resolvedArgs);
+
+       /**
+        * A singleton pattern utility for avoiding creating many {@link 
PlannerTypeInferenceUtil}.
+        */
+       class SingletonPlannerTypeInferenceUtil {
+
+               private static PlannerTypeInferenceUtil 
plannerTypeInferenceUtil;
+
+               public static PlannerTypeInferenceUtil 
getPlannerTypeInferenceUtil() {
+                       if (plannerTypeInferenceUtil == null) {
+                               try {
+                                       final Class<?> clazz =
+                                               
Class.forName("org.apache.flink.table.expressions.PlannerTypeInferenceUtilImpl");
+                                       final Constructor<?> con = 
clazz.getConstructor();
+                                       plannerTypeInferenceUtil = 
(PlannerTypeInferenceUtil) con.newInstance();
+                               } catch (Throwable t) {
+                                       throw new TableException("Instantiation 
of PlannerTypeInferenceUtil failed.", t);
+                               }
+                       }
+                       return plannerTypeInferenceUtil;
+               }
+
+               private SingletonPlannerTypeInferenceUtil() {
+                       // no instantiation
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java
new file mode 100644
index 0000000..30c3421
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.TypeInferenceUtil;
+import org.apache.flink.table.typeutils.TypeCoercion;
+import org.apache.flink.table.validate.ValidationFailure;
+import org.apache.flink.table.validate.ValidationResult;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+import static org.apache.flink.table.util.JavaScalaConversionUtil.toJava;
+
+/**
+ * Implementation of {@link PlannerTypeInferenceUtil}.
+ */
+@Internal
+public final class PlannerTypeInferenceUtilImpl implements 
PlannerTypeInferenceUtil {
+
+       private static final PlannerExpressionConverter CONVERTER = 
PlannerExpressionConverter.INSTANCE();
+
+       @Override
+       public TypeInferenceUtil.Result runTypeInference(
+                       UnresolvedCallExpression unresolvedCall,
+                       List<ResolvedExpression> resolvedArgs) {
+               final PlannerExpression plannerCall = 
unresolvedCall.accept(CONVERTER);
+
+               if (plannerCall instanceof InputTypeSpec) {
+                       return resolveWithCastedAssignment(
+                               unresolvedCall,
+                               resolvedArgs,
+                               toJava(((InputTypeSpec) 
plannerCall).expectedTypes()),
+                               plannerCall.resultType());
+               } else {
+                       validateArguments(plannerCall);
+
+                       final List<DataType> expectedArgumentTypes = 
resolvedArgs.stream()
+                               .map(ResolvedExpression::getOutputDataType)
+                               .collect(Collectors.toList());
+
+                       return new TypeInferenceUtil.Result(
+                               expectedArgumentTypes,
+                               null,
+                               
fromLegacyInfoToDataType(plannerCall.resultType()));
+               }
+       }
+
+       private TypeInferenceUtil.Result resolveWithCastedAssignment(
+                       UnresolvedCallExpression unresolvedCall,
+                       List<ResolvedExpression> args,
+                       List<TypeInformation<?>> expectedTypes,
+                       TypeInformation<?> resultType) {
+
+               final List<PlannerExpression> plannerArgs = 
unresolvedCall.getChildren()
+                       .stream()
+                       .map(e -> e.accept(CONVERTER))
+                       .collect(Collectors.toList());
+
+               final List<DataType> castedArgs = IntStream.range(0, 
plannerArgs.size())
+                       .mapToObj(idx -> castIfNeeded(
+                               args.get(idx),
+                               plannerArgs.get(idx),
+                               expectedTypes.get(idx)))
+                       .collect(Collectors.toList());
+
+               return new TypeInferenceUtil.Result(
+                       castedArgs,
+                       null,
+                       fromLegacyInfoToDataType(resultType));
+       }
+
+       private void validateArguments(PlannerExpression plannerCall) {
+               if (!plannerCall.valid()) {
+                       throw new ValidationException(
+                               getValidationErrorMessage(plannerCall)
+                                       .orElse("Unexpected behavior, 
validation failed but can't get error messages!"));
+               }
+       }
+
+       /**
+        * Return the validation error message of this {@link 
PlannerExpression} or return the
+        * validation error message of it's children if it passes the 
validation. Return empty if
+        * all validation succeeded.
+        */
+       private Optional<String> getValidationErrorMessage(PlannerExpression 
plannerCall) {
+               ValidationResult validationResult = plannerCall.validateInput();
+               if (validationResult instanceof ValidationFailure) {
+                       return Optional.of(((ValidationFailure) 
validationResult).message());
+               } else {
+                       for (Expression plannerExpression: 
plannerCall.getChildren()) {
+                               Optional<String> errorMessage = 
getValidationErrorMessage((PlannerExpression) plannerExpression);
+                               if (errorMessage.isPresent()) {
+                                       return errorMessage;
+                               }
+                       }
+               }
+               return Optional.empty();
+       }
+
+       private DataType castIfNeeded(
+                       ResolvedExpression child,
+                       PlannerExpression plannerChild,
+                       TypeInformation<?> expectedType) {
+               TypeInformation<?> actualType = plannerChild.resultType();
+               if (actualType.equals(expectedType)) {
+                       return child.getOutputDataType();
+               } else if (TypeCoercion.canSafelyCast(actualType, 
expectedType)) {
+                       return fromLegacyInfoToDataType(expectedType);
+               } else {
+                       throw new 
ValidationException(String.format("Incompatible type of argument: %s Expected: 
%s",
+                               child,
+                               expectedType));
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java
index d308891..d2ad89c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java
@@ -19,13 +19,11 @@
 package org.apache.flink.table.expressions.rules;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
+import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.InputTypeSpec;
-import org.apache.flink.table.expressions.PlannerExpression;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
@@ -37,9 +35,6 @@ import org.apache.flink.table.types.inference.CallContext;
 import org.apache.flink.table.types.inference.TypeInference;
 import org.apache.flink.table.types.inference.TypeInferenceUtil;
 import org.apache.flink.table.types.inference.TypeStrategies;
-import org.apache.flink.table.typeutils.TypeCoercion;
-import org.apache.flink.table.validate.ValidationFailure;
-import org.apache.flink.table.validate.ValidationResult;
 import org.apache.flink.util.Preconditions;
 
 import java.util.List;
@@ -47,12 +42,11 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
-import static org.apache.flink.table.util.JavaScalaConversionUtil.toJava;
-
 /**
- * It checks if a {@link UnresolvedCallExpression} can work with given 
arguments.
- * If the call expects different types of arguments, but the given arguments
+ * This rule checks if a {@link UnresolvedCallExpression} can work with the 
given arguments and infers
+ * the output data type. All function calls are resolved {@link 
CallExpression} after applying this rule.
+ *
+ * <p>If the call expects different types of arguments, but the given arguments
  * have types that can be casted, a {@link BuiltInFunctionDefinitions#CAST}
  * expression is inserted.
  */
@@ -88,6 +82,7 @@ final class ResolveCallByArgumentsRule implements 
ResolverRule {
                        if (unresolvedCall.getFunctionDefinition() instanceof 
BuiltInFunctionDefinition) {
                                final BuiltInFunctionDefinition definition =
                                        (BuiltInFunctionDefinition) 
unresolvedCall.getFunctionDefinition();
+
                                if 
(definition.getTypeInference().getOutputTypeStrategy() != 
TypeStrategies.MISSING) {
                                        return runTypeInference(
                                                unresolvedCall,
@@ -116,6 +111,21 @@ final class ResolveCallByArgumentsRule implements 
ResolverRule {
                        return unresolvedCall.resolve(adaptedArguments, 
inferenceResult.getOutputDataType());
                }
 
+               private ResolvedExpression runLegacyTypeInference(
+                               UnresolvedCallExpression unresolvedCall,
+                               List<ResolvedExpression> resolvedArgs) {
+
+                       final PlannerTypeInferenceUtil util = 
PlannerTypeInferenceUtil.create();
+
+                       final TypeInferenceUtil.Result inferenceResult = 
util.runTypeInference(
+                               unresolvedCall,
+                               resolvedArgs);
+
+                       final List<ResolvedExpression> adaptedArguments = 
adaptArguments(inferenceResult, resolvedArgs);
+
+                       return unresolvedCall.resolve(adaptedArguments, 
inferenceResult.getOutputDataType());
+               }
+
                /**
                 * Adapts the arguments according to the properties of the 
{@link TypeInferenceUtil.Result}.
                 */
@@ -142,99 +152,6 @@ final class ResolveCallByArgumentsRule implements 
ResolverRule {
                protected Expression defaultMethod(Expression expression) {
                        return expression;
                }
-
-               // 
----------------------------------------------------------------------------------------
-               // legacy code
-               // 
----------------------------------------------------------------------------------------
-
-               private ResolvedExpression runLegacyTypeInference(
-                               UnresolvedCallExpression unresolvedCall,
-                               List<ResolvedExpression> resolvedArgs) {
-                       final PlannerExpression plannerCall = 
resolutionContext.bridge(unresolvedCall);
-
-                       if (plannerCall instanceof InputTypeSpec) {
-                               return resolveWithCastedAssignment(
-                                       unresolvedCall,
-                                       resolvedArgs,
-                                       toJava(((InputTypeSpec) 
plannerCall).expectedTypes()),
-                                       plannerCall.resultType());
-                       } else {
-                               validateArguments(plannerCall);
-
-                               return unresolvedCall.resolve(
-                                       resolvedArgs,
-                                       
fromLegacyInfoToDataType(plannerCall.resultType()));
-                       }
-               }
-
-               private ResolvedExpression resolveWithCastedAssignment(
-                               UnresolvedCallExpression unresolvedCall,
-                               List<ResolvedExpression> args,
-                               List<TypeInformation<?>> expectedTypes,
-                               TypeInformation<?> resultType) {
-
-                       final List<PlannerExpression> plannerArgs = 
unresolvedCall.getChildren()
-                               .stream()
-                               .map(resolutionContext::bridge)
-                               .collect(Collectors.toList());
-
-                       final List<ResolvedExpression> castedArgs = 
IntStream.range(0, plannerArgs.size())
-                               .mapToObj(idx -> castIfNeeded(
-                                       args.get(idx),
-                                       plannerArgs.get(idx),
-                                       expectedTypes.get(idx)))
-                               .collect(Collectors.toList());
-
-                       return unresolvedCall.resolve(
-                               castedArgs,
-                               fromLegacyInfoToDataType(resultType));
-               }
-
-               private void validateArguments(PlannerExpression plannerCall) {
-                       if (!plannerCall.valid()) {
-                               throw new ValidationException(
-                                       getValidationErrorMessage(plannerCall)
-                                               .orElse("Unexpected behavior, 
validation failed but can't get error messages!"));
-                       }
-               }
-
-               /**
-                * Return the validation error message of this {@link 
PlannerExpression} or return the
-                * validation error message of it's children if it passes the 
validation. Return empty if
-                * all validation succeeded.
-                */
-               private Optional<String> 
getValidationErrorMessage(PlannerExpression plannerCall) {
-                       ValidationResult validationResult = 
plannerCall.validateInput();
-                       if (validationResult instanceof ValidationFailure) {
-                               return Optional.of(((ValidationFailure) 
validationResult).message());
-                       } else {
-                               for (Expression plannerExpression: 
plannerCall.getChildren()) {
-                                       Optional<String> errorMessage = 
getValidationErrorMessage((PlannerExpression) plannerExpression);
-                                       if (errorMessage.isPresent()) {
-                                               return errorMessage;
-                                       }
-                               }
-                       }
-                       return Optional.empty();
-               }
-
-               private ResolvedExpression castIfNeeded(
-                               ResolvedExpression child,
-                               PlannerExpression plannerChild,
-                               TypeInformation<?> expectedType) {
-                       TypeInformation<?> actualType = 
plannerChild.resultType();
-                       if (actualType.equals(expectedType)) {
-                               return child;
-                       } else if (TypeCoercion.canSafelyCast(actualType, 
expectedType)) {
-                               return resolutionContext
-                                       .postResolutionFactory()
-                                       .cast(child, 
fromLegacyInfoToDataType(expectedType));
-                       } else {
-                               throw new 
ValidationException(String.format("Incompatible type of argument: %s Expected: 
%s",
-                                       child,
-                                       expectedType));
-                       }
-               }
        }
 
        // 
--------------------------------------------------------------------------------------------

Reply via email to