This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0fe634c  [FLINK-13264][table] Let the planner supply its type 
inference util
0fe634c is described below

commit 0fe634ccbc9e6caf82b40af7ae900a94221b9a8f
Author: Timo Walther <twal...@apache.org>
AuthorDate: Mon Jul 15 13:13:50 2019 +0200

    [FLINK-13264][table] Let the planner supply its type inference util
    
    This closes #9116.
---
 .../flink/table/catalog/FunctionCatalog.java       | 23 +++++++++++++++-
 .../apache/flink/table/catalog/FunctionLookup.java |  6 ++++
 .../table/delegation/PlannerTypeInferenceUtil.java | 32 ----------------------
 .../resolver/rules/ResolveCallByArgumentsRule.java |  2 +-
 .../expressions/PlannerTypeInferenceUtilImpl.java  |  2 ++
 .../apache/flink/table/planner/PlannerBase.scala   |  6 ++--
 .../expressions/PlannerTypeInferenceUtilImpl.java  |  2 ++
 .../flink/table/api/internal/TableEnvImpl.scala    |  2 ++
 .../apache/flink/table/planner/StreamPlanner.scala |  9 +++---
 9 files changed, 44 insertions(+), 40 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index be37a9f..0d18ffe 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.catalog;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.AggregateFunctionDefinition;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
@@ -33,6 +34,7 @@ import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.functions.TableFunctionDefinition;
 import org.apache.flink.table.functions.UserDefinedAggregateFunction;
 import org.apache.flink.table.functions.UserFunctionsTypeHelper;
+import org.apache.flink.util.Preconditions;
 
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -51,11 +53,22 @@ public class FunctionCatalog implements FunctionLookup {
 
        private final Map<String, FunctionDefinition> userFunctions = new 
LinkedHashMap<>();
 
-       public FunctionCatalog(String defaultCatalogName, String 
defaultDatabaseName) {
+       /**
+        * Temporary utility until the new type inference is fully functional. 
It needs to be set by the planner.
+        */
+       private PlannerTypeInferenceUtil plannerTypeInferenceUtil;
+
+       public FunctionCatalog(
+                       String defaultCatalogName,
+                       String defaultDatabaseName) {
                this.defaultCatalogName = defaultCatalogName;
                this.defaultDatabaseName = defaultDatabaseName;
        }
 
+       public void setPlannerTypeInferenceUtil(PlannerTypeInferenceUtil 
plannerTypeInferenceUtil) {
+               this.plannerTypeInferenceUtil = plannerTypeInferenceUtil;
+       }
+
        public void registerScalarFunction(String name, ScalarFunction 
function) {
                
UserFunctionsTypeHelper.validateInstantiation(function.getClass());
                registerFunction(
@@ -147,6 +160,14 @@ public class FunctionCatalog implements FunctionLookup {
                );
        }
 
+       @Override
+       public PlannerTypeInferenceUtil getPlannerTypeInferenceUtil() {
+               Preconditions.checkNotNull(
+                       plannerTypeInferenceUtil,
+                       "A planner should have set the type inference 
utility.");
+               return plannerTypeInferenceUtil;
+       }
+
        private void registerFunction(String name, FunctionDefinition 
functionDefinition) {
                userFunctions.put(normalizeName(name), functionDefinition);
        }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionLookup.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionLookup.java
index 20510a3..347f105 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionLookup.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionLookup.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
 import org.apache.flink.table.functions.BuiltInFunctionDefinition;
 import org.apache.flink.table.functions.FunctionDefinition;
 
@@ -50,6 +51,11 @@ public interface FunctionLookup {
        }
 
        /**
+        * Temporary utility until the new type inference is fully functional.
+        */
+       PlannerTypeInferenceUtil getPlannerTypeInferenceUtil();
+
+       /**
         * Result of a function lookup.
         */
        class Result {
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java
index 4e003ff..23e4638 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java
@@ -19,14 +19,12 @@
 package org.apache.flink.table.delegation;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
 import org.apache.flink.table.types.inference.CallContext;
 import org.apache.flink.table.types.inference.TypeInference;
 import org.apache.flink.table.types.inference.TypeInferenceUtil;
 
-import java.lang.reflect.Constructor;
 import java.util.List;
 
 /**
@@ -36,40 +34,10 @@ import java.util.List;
 @Internal
 public interface PlannerTypeInferenceUtil {
 
-       static PlannerTypeInferenceUtil create() {
-               return 
SingletonPlannerTypeInferenceUtil.getPlannerTypeInferenceUtil();
-       }
-
        /**
         * Same behavior as {@link 
TypeInferenceUtil#runTypeInference(TypeInference, CallContext)}.
         */
        TypeInferenceUtil.Result runTypeInference(
                UnresolvedCallExpression unresolvedCall,
                List<ResolvedExpression> resolvedArgs);
-
-       /**
-        * A singleton pattern utility for avoiding creating many {@link 
PlannerTypeInferenceUtil}.
-        */
-       class SingletonPlannerTypeInferenceUtil {
-
-               private static PlannerTypeInferenceUtil 
plannerTypeInferenceUtil;
-
-               public static PlannerTypeInferenceUtil 
getPlannerTypeInferenceUtil() {
-                       if (plannerTypeInferenceUtil == null) {
-                               try {
-                                       final Class<?> clazz =
-                                               
Class.forName("org.apache.flink.table.expressions.PlannerTypeInferenceUtilImpl");
-                                       final Constructor<?> con = 
clazz.getConstructor();
-                                       plannerTypeInferenceUtil = 
(PlannerTypeInferenceUtil) con.newInstance();
-                               } catch (Throwable t) {
-                                       throw new TableException("Instantiation 
of PlannerTypeInferenceUtil failed.", t);
-                               }
-                       }
-                       return plannerTypeInferenceUtil;
-               }
-
-               private SingletonPlannerTypeInferenceUtil() {
-                       // no instantiation
-               }
-       }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
index 1f184c8..d8deba7 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
@@ -162,7 +162,7 @@ final class ResolveCallByArgumentsRule implements 
ResolverRule {
                                UnresolvedCallExpression unresolvedCall,
                                List<ResolvedExpression> resolvedArgs) {
 
-                       final PlannerTypeInferenceUtil util = 
PlannerTypeInferenceUtil.create();
+                       final PlannerTypeInferenceUtil util = 
resolutionContext.functionLookup().getPlannerTypeInferenceUtil();
 
                        final TypeInferenceUtil.Result inferenceResult = 
util.runTypeInference(
                                unresolvedCall,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java
index 816e783..34dc3bb 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java
@@ -43,6 +43,8 @@ import static 
org.apache.flink.table.util.JavaScalaConversionUtil.toJava;
 @Internal
 public final class PlannerTypeInferenceUtilImpl implements 
PlannerTypeInferenceUtil {
 
+       public static final PlannerTypeInferenceUtil INSTANCE = new 
PlannerTypeInferenceUtilImpl();
+
        private static final PlannerExpressionConverter CONVERTER = 
PlannerExpressionConverter.INSTANCE();
 
        @Override
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
index 00af797..be81a0b 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
@@ -27,6 +27,7 @@ import org.apache.flink.table.calcite.{FlinkPlannerImpl, 
FlinkRelBuilder, FlinkT
 import org.apache.flink.table.catalog.{CatalogManager, 
CatalogManagerCalciteSchema, CatalogTable, ConnectorCatalogTable, 
FunctionCatalog}
 import org.apache.flink.table.delegation.{Executor, Planner}
 import org.apache.flink.table.executor.ExecutorBase
+import org.apache.flink.table.expressions.PlannerTypeInferenceUtilImpl
 import org.apache.flink.table.factories.{TableFactoryService, 
TableFactoryUtil, TableSinkFactory}
 import 
org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode
 import org.apache.flink.table.operations.{CatalogSinkModifyOperation, 
ModifyOperation, Operation, OutputConversionModifyOperation, 
PlannerQueryOperation, UnregisteredSinkModifyOperation}
@@ -40,13 +41,11 @@ import org.apache.flink.table.sinks.{DataStreamTableSink, 
TableSink, TableSinkUt
 import org.apache.flink.table.sqlexec.SqlToOperationConverter
 import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
 import org.apache.flink.table.util.JavaScalaConversionUtil
-
 import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
 import org.apache.calcite.plan.{RelTrait, RelTraitDef}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.sql.{SqlIdentifier, SqlInsert, SqlKind}
 import org.apache.calcite.tools.FrameworkConfig
-
 import _root_.java.util.{List => JList}
 import java.util
 
@@ -71,6 +70,9 @@ abstract class PlannerBase(
     catalogManager: CatalogManager)
   extends Planner {
 
+  // temporary utility until we don't use planner expressions anymore
+  
functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE)
+
   executor.asInstanceOf[ExecutorBase].setTableConfig(config)
 
   private val plannerContext: PlannerContext =
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java
index 30c3421..21a8533 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java
@@ -42,6 +42,8 @@ import static 
org.apache.flink.table.util.JavaScalaConversionUtil.toJava;
 @Internal
 public final class PlannerTypeInferenceUtilImpl implements 
PlannerTypeInferenceUtil {
 
+       public static final PlannerTypeInferenceUtil INSTANCE = new 
PlannerTypeInferenceUtilImpl();
+
        private static final PlannerExpressionConverter CONVERTER = 
PlannerExpressionConverter.INSTANCE();
 
        @Override
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 3bdfa56..abb94c9 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -67,6 +67,8 @@ abstract class TableEnvImpl(
   private[flink] val functionCatalog: FunctionCatalog = new FunctionCatalog(
     builtinCatalogName,
     builtinDatabaseName)
+  // temporary utility until we don't use planner expressions anymore
+  
functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE)
 
   // temporary bridge between API and planner
   private[flink] val expressionBridge: ExpressionBridge[PlannerExpression] =
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index 502ea36..aa41b0c 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -29,7 +29,7 @@ import org.apache.flink.table.catalog.{CatalogManager, 
CatalogManagerCalciteSche
 import org.apache.flink.table.delegation.{Executor, Planner}
 import org.apache.flink.table.executor.StreamExecutor
 import org.apache.flink.table.explain.PlanJsonParser
-import org.apache.flink.table.expressions.{ExpressionBridge, 
PlannerExpression, PlannerExpressionConverter}
+import org.apache.flink.table.expressions.{ExpressionBridge, 
PlannerExpression, PlannerExpressionConverter, PlannerTypeInferenceUtilImpl}
 import org.apache.flink.table.factories.{TableFactoryService, 
TableFactoryUtil, TableSinkFactory}
 import 
org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode
 import org.apache.flink.table.operations._
@@ -41,13 +41,11 @@ import org.apache.flink.table.sinks._
 import org.apache.flink.table.sqlexec.SqlToOperationConverter
 import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.table.util.JavaScalaConversionUtil
-
 import org.apache.calcite.jdbc.CalciteSchema
 import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.sql.{SqlIdentifier, SqlInsert, SqlKind}
-
 import _root_.java.lang.{Boolean => JBool}
 import _root_.java.util
 import _root_.java.util.{Objects, List => JList}
@@ -71,7 +69,10 @@ class StreamPlanner(
     config: TableConfig,
     functionCatalog: FunctionCatalog,
     catalogManager: CatalogManager)
-  extends Planner{
+  extends Planner {
+
+  // temporary utility until we don't use planner expressions anymore
+  
functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE)
 
   private val internalSchema: CalciteSchema =
     asRootSchema(new CatalogManagerCalciteSchema(catalogManager, false))

Reply via email to