This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 0fe634c [FLINK-13264][table] Let the planner supply its type inference util 0fe634c is described below commit 0fe634ccbc9e6caf82b40af7ae900a94221b9a8f Author: Timo Walther <twal...@apache.org> AuthorDate: Mon Jul 15 13:13:50 2019 +0200 [FLINK-13264][table] Let the planner supply its type inference util This closes #9116. --- .../flink/table/catalog/FunctionCatalog.java | 23 +++++++++++++++- .../apache/flink/table/catalog/FunctionLookup.java | 6 ++++ .../table/delegation/PlannerTypeInferenceUtil.java | 32 ---------------------- .../resolver/rules/ResolveCallByArgumentsRule.java | 2 +- .../expressions/PlannerTypeInferenceUtilImpl.java | 2 ++ .../apache/flink/table/planner/PlannerBase.scala | 6 ++-- .../expressions/PlannerTypeInferenceUtilImpl.java | 2 ++ .../flink/table/api/internal/TableEnvImpl.scala | 2 ++ .../apache/flink/table/planner/StreamPlanner.scala | 9 +++--- 9 files changed, 44 insertions(+), 40 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java index be37a9f..0d18ffe 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java @@ -21,6 +21,7 @@ package org.apache.flink.table.catalog; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.api.TableException; +import org.apache.flink.table.delegation.PlannerTypeInferenceUtil; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.AggregateFunctionDefinition; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; @@ -33,6 +34,7 @@ import org.apache.flink.table.functions.TableFunction; import org.apache.flink.table.functions.TableFunctionDefinition; import org.apache.flink.table.functions.UserDefinedAggregateFunction; import org.apache.flink.table.functions.UserFunctionsTypeHelper; +import org.apache.flink.util.Preconditions; import java.util.LinkedHashMap; import java.util.Map; @@ -51,11 +53,22 @@ public class FunctionCatalog implements FunctionLookup { private final Map<String, FunctionDefinition> userFunctions = new LinkedHashMap<>(); - public FunctionCatalog(String defaultCatalogName, String defaultDatabaseName) { + /** + * Temporary utility until the new type inference is fully functional. It needs to be set by the planner. + */ + private PlannerTypeInferenceUtil plannerTypeInferenceUtil; + + public FunctionCatalog( + String defaultCatalogName, + String defaultDatabaseName) { this.defaultCatalogName = defaultCatalogName; this.defaultDatabaseName = defaultDatabaseName; } + public void setPlannerTypeInferenceUtil(PlannerTypeInferenceUtil plannerTypeInferenceUtil) { + this.plannerTypeInferenceUtil = plannerTypeInferenceUtil; + } + public void registerScalarFunction(String name, ScalarFunction function) { UserFunctionsTypeHelper.validateInstantiation(function.getClass()); registerFunction( @@ -147,6 +160,14 @@ public class FunctionCatalog implements FunctionLookup { ); } + @Override + public PlannerTypeInferenceUtil getPlannerTypeInferenceUtil() { + Preconditions.checkNotNull( + plannerTypeInferenceUtil, + "A planner should have set the type inference utility."); + return plannerTypeInferenceUtil; + } + private void registerFunction(String name, FunctionDefinition functionDefinition) { userFunctions.put(normalizeName(name), functionDefinition); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionLookup.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionLookup.java index 20510a3..347f105 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionLookup.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionLookup.java @@ -20,6 +20,7 @@ package org.apache.flink.table.catalog; import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.TableException; +import org.apache.flink.table.delegation.PlannerTypeInferenceUtil; import org.apache.flink.table.functions.BuiltInFunctionDefinition; import org.apache.flink.table.functions.FunctionDefinition; @@ -50,6 +51,11 @@ public interface FunctionLookup { } /** + * Temporary utility until the new type inference is fully functional. + */ + PlannerTypeInferenceUtil getPlannerTypeInferenceUtil(); + + /** * Result of a function lookup. */ class Result { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java index 4e003ff..23e4638 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerTypeInferenceUtil.java @@ -19,14 +19,12 @@ package org.apache.flink.table.delegation; import org.apache.flink.annotation.Internal; -import org.apache.flink.table.api.TableException; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.expressions.UnresolvedCallExpression; import org.apache.flink.table.types.inference.CallContext; import org.apache.flink.table.types.inference.TypeInference; import org.apache.flink.table.types.inference.TypeInferenceUtil; -import java.lang.reflect.Constructor; import java.util.List; /** @@ -36,40 +34,10 @@ import java.util.List; @Internal public interface PlannerTypeInferenceUtil { - static PlannerTypeInferenceUtil create() { - return SingletonPlannerTypeInferenceUtil.getPlannerTypeInferenceUtil(); - } - /** * Same behavior as {@link TypeInferenceUtil#runTypeInference(TypeInference, CallContext)}. */ TypeInferenceUtil.Result runTypeInference( UnresolvedCallExpression unresolvedCall, List<ResolvedExpression> resolvedArgs); - - /** - * A singleton pattern utility for avoiding creating many {@link PlannerTypeInferenceUtil}. - */ - class SingletonPlannerTypeInferenceUtil { - - private static PlannerTypeInferenceUtil plannerTypeInferenceUtil; - - public static PlannerTypeInferenceUtil getPlannerTypeInferenceUtil() { - if (plannerTypeInferenceUtil == null) { - try { - final Class<?> clazz = - Class.forName("org.apache.flink.table.expressions.PlannerTypeInferenceUtilImpl"); - final Constructor<?> con = clazz.getConstructor(); - plannerTypeInferenceUtil = (PlannerTypeInferenceUtil) con.newInstance(); - } catch (Throwable t) { - throw new TableException("Instantiation of PlannerTypeInferenceUtil failed.", t); - } - } - return plannerTypeInferenceUtil; - } - - private SingletonPlannerTypeInferenceUtil() { - // no instantiation - } - } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java index 1f184c8..d8deba7 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java @@ -162,7 +162,7 @@ final class ResolveCallByArgumentsRule implements ResolverRule { UnresolvedCallExpression unresolvedCall, List<ResolvedExpression> resolvedArgs) { - final PlannerTypeInferenceUtil util = PlannerTypeInferenceUtil.create(); + final PlannerTypeInferenceUtil util = resolutionContext.functionLookup().getPlannerTypeInferenceUtil(); final TypeInferenceUtil.Result inferenceResult = util.runTypeInference( unresolvedCall, diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java index 816e783..34dc3bb 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java @@ -43,6 +43,8 @@ import static org.apache.flink.table.util.JavaScalaConversionUtil.toJava; @Internal public final class PlannerTypeInferenceUtilImpl implements PlannerTypeInferenceUtil { + public static final PlannerTypeInferenceUtil INSTANCE = new PlannerTypeInferenceUtilImpl(); + private static final PlannerExpressionConverter CONVERTER = PlannerExpressionConverter.INSTANCE(); @Override diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala index 00af797..be81a0b 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala @@ -27,6 +27,7 @@ import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkT import org.apache.flink.table.catalog.{CatalogManager, CatalogManagerCalciteSchema, CatalogTable, ConnectorCatalogTable, FunctionCatalog} import org.apache.flink.table.delegation.{Executor, Planner} import org.apache.flink.table.executor.ExecutorBase +import org.apache.flink.table.expressions.PlannerTypeInferenceUtilImpl import org.apache.flink.table.factories.{TableFactoryService, TableFactoryUtil, TableSinkFactory} import org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOperation, Operation, OutputConversionModifyOperation, PlannerQueryOperation, UnregisteredSinkModifyOperation} @@ -40,13 +41,11 @@ import org.apache.flink.table.sinks.{DataStreamTableSink, TableSink, TableSinkUt import org.apache.flink.table.sqlexec.SqlToOperationConverter import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter import org.apache.flink.table.util.JavaScalaConversionUtil - import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema import org.apache.calcite.plan.{RelTrait, RelTraitDef} import org.apache.calcite.rel.RelNode import org.apache.calcite.sql.{SqlIdentifier, SqlInsert, SqlKind} import org.apache.calcite.tools.FrameworkConfig - import _root_.java.util.{List => JList} import java.util @@ -71,6 +70,9 @@ abstract class PlannerBase( catalogManager: CatalogManager) extends Planner { + // temporary utility until we don't use planner expressions anymore + functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE) + executor.asInstanceOf[ExecutorBase].setTableConfig(config) private val plannerContext: PlannerContext = diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java index 30c3421..21a8533 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/PlannerTypeInferenceUtilImpl.java @@ -42,6 +42,8 @@ import static org.apache.flink.table.util.JavaScalaConversionUtil.toJava; @Internal public final class PlannerTypeInferenceUtilImpl implements PlannerTypeInferenceUtil { + public static final PlannerTypeInferenceUtil INSTANCE = new PlannerTypeInferenceUtilImpl(); + private static final PlannerExpressionConverter CONVERTER = PlannerExpressionConverter.INSTANCE(); @Override diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index 3bdfa56..abb94c9 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -67,6 +67,8 @@ abstract class TableEnvImpl( private[flink] val functionCatalog: FunctionCatalog = new FunctionCatalog( builtinCatalogName, builtinDatabaseName) + // temporary utility until we don't use planner expressions anymore + functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE) // temporary bridge between API and planner private[flink] val expressionBridge: ExpressionBridge[PlannerExpression] = diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala index 502ea36..aa41b0c 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala @@ -29,7 +29,7 @@ import org.apache.flink.table.catalog.{CatalogManager, CatalogManagerCalciteSche import org.apache.flink.table.delegation.{Executor, Planner} import org.apache.flink.table.executor.StreamExecutor import org.apache.flink.table.explain.PlanJsonParser -import org.apache.flink.table.expressions.{ExpressionBridge, PlannerExpression, PlannerExpressionConverter} +import org.apache.flink.table.expressions.{ExpressionBridge, PlannerExpression, PlannerExpressionConverter, PlannerTypeInferenceUtilImpl} import org.apache.flink.table.factories.{TableFactoryService, TableFactoryUtil, TableSinkFactory} import org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode import org.apache.flink.table.operations._ @@ -41,13 +41,11 @@ import org.apache.flink.table.sinks._ import org.apache.flink.table.sqlexec.SqlToOperationConverter import org.apache.flink.table.types.utils.TypeConversions import org.apache.flink.table.util.JavaScalaConversionUtil - import org.apache.calcite.jdbc.CalciteSchema import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.rel.RelNode import org.apache.calcite.sql.{SqlIdentifier, SqlInsert, SqlKind} - import _root_.java.lang.{Boolean => JBool} import _root_.java.util import _root_.java.util.{Objects, List => JList} @@ -71,7 +69,10 @@ class StreamPlanner( config: TableConfig, functionCatalog: FunctionCatalog, catalogManager: CatalogManager) - extends Planner{ + extends Planner { + + // temporary utility until we don't use planner expressions anymore + functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE) private val internalSchema: CalciteSchema = asRootSchema(new CatalogManagerCalciteSchema(catalogManager, false))