This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 774ecea [FLINK-12795][table-planner-blink] Extracted creation & configuration of FrameworkConfig & RelBuilder to separate class in blink planner 774ecea is described below commit 774ecea757861fa5e1088fb091c304956145870b Author: godfrey he <godfre...@163.com> AuthorDate: Tue Jun 11 17:21:46 2019 +0800 [FLINK-12795][table-planner-blink] Extracted creation & configuration of FrameworkConfig & RelBuilder to separate class in blink planner This closes #8677 --- .../apache/flink/table/planner/PlannerContext.java | 223 +++++++++++++++++++++ .../apache/flink/table/api/TableEnvironment.scala | 126 +++--------- .../flink/table/calcite/FlinkPlannerImpl.scala | 50 +---- .../flink/table/calcite/FlinkRelBuilder.scala | 42 +--- .../flink/table/util/JavaScalaConversionUtil.scala | 67 +++++++ .../flink/table/validate/FunctionCatalog.scala | 9 +- .../flink/table/codegen/agg/AggTestBase.scala | 9 +- .../expressions/utils/ExpressionTestBase.scala | 10 +- .../plan/metadata/FlinkRelMdHandlerTestBase.scala | 33 ++- 9 files changed, 364 insertions(+), 205 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/PlannerContext.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/PlannerContext.java new file mode 100644 index 0000000..205f0e7 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/PlannerContext.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.calcite.CalciteConfig; +import org.apache.flink.table.calcite.CalciteConfig$; +import org.apache.flink.table.calcite.FlinkCalciteCatalogReader; +import org.apache.flink.table.calcite.FlinkContextImpl; +import org.apache.flink.table.calcite.FlinkPlannerImpl; +import org.apache.flink.table.calcite.FlinkRelBuilder; +import org.apache.flink.table.calcite.FlinkRelOptClusterFactory; +import org.apache.flink.table.calcite.FlinkTypeFactory; +import org.apache.flink.table.calcite.FlinkTypeSystem; +import org.apache.flink.table.codegen.ExpressionReducer; +import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable; +import org.apache.flink.table.plan.cost.FlinkCostFactory; +import org.apache.flink.table.util.JavaScalaConversionUtil; +import org.apache.flink.table.validate.FunctionCatalog; + +import org.apache.calcite.config.Lex; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.plan.volcano.VolcanoPlanner; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.util.ChainedSqlOperatorTable; +import org.apache.calcite.sql.util.ListSqlOperatorTable; +import org.apache.calcite.sql2rel.SqlToRelConverter; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Frameworks; + +import java.util.Collections; +import java.util.List; + +/** + * Utility class to create {@link org.apache.calcite.tools.RelBuilder} or {@link FrameworkConfig} used to create + * a corresponding {@link org.apache.calcite.tools.Planner}. It tries to separate static elements in a + * {@link org.apache.flink.table.api.TableEnvironment} like: root schema, cost factory, type system etc. + * from a dynamic properties like e.g. default path to look for objects in the schema. + */ +@Internal +public class PlannerContext { + private final RelDataTypeSystem typeSystem = new FlinkTypeSystem(); + private final FlinkTypeFactory typeFactory = new FlinkTypeFactory(typeSystem); + private final TableConfig tableConfig; + private final FunctionCatalog functionCatalog; + private final FrameworkConfig frameworkConfig; + private final RelOptCluster cluster; + + public PlannerContext( + TableConfig tableConfig, + FunctionCatalog functionCatalog, + CalciteSchema rootSchema, + List<RelTraitDef> traitDefs) { + this.tableConfig = tableConfig; + this.functionCatalog = functionCatalog; + this.frameworkConfig = createFrameworkConfig(rootSchema, traitDefs); + + RelOptPlanner planner = new VolcanoPlanner(frameworkConfig.getCostFactory(), frameworkConfig.getContext()); + planner.setExecutor(frameworkConfig.getExecutor()); + for (RelTraitDef traitDef : frameworkConfig.getTraitDefs()) { + planner.addRelTraitDef(traitDef); + } + this.cluster = FlinkRelOptClusterFactory.create(planner, new RexBuilder(typeFactory)); + } + + private FrameworkConfig createFrameworkConfig(CalciteSchema rootSchema, List<RelTraitDef> traitDefs) { + return Frameworks.newConfigBuilder() + .defaultSchema(rootSchema.plus()) + .parserConfig(getSqlParserConfig()) + .costFactory(new FlinkCostFactory()) + .typeSystem(typeSystem) + .sqlToRelConverterConfig(getSqlToRelConverterConfig(getCalciteConfig(tableConfig))) + .operatorTable(getSqlOperatorTable(getCalciteConfig(tableConfig), functionCatalog)) + // set the executor to evaluate constant expressions + .executor(new ExpressionReducer(tableConfig, false)) + .context(new FlinkContextImpl(tableConfig)) + .traitDefs(traitDefs) + .build(); + } + + /** Returns the {@link FlinkTypeFactory} that will be used. */ + public FlinkTypeFactory getTypeFactory() { + return typeFactory; + } + + public SchemaPlus getRootSchema() { + return frameworkConfig.getDefaultSchema(); + } + + /** + * Creates a configured {@link FlinkRelBuilder} for a planning session. + * + * @return configured rel builder + */ + public FlinkRelBuilder createRelBuilder() { + FlinkCalciteCatalogReader relOptSchema = createCatalogReader(false); + return new FlinkRelBuilder(frameworkConfig.getContext(), cluster, relOptSchema); + } + + /** + * Creates a configured {@link FlinkPlannerImpl} for a planning session. + * + * @return configured flink planner + */ + public FlinkPlannerImpl createFlinkPlanner() { + return new FlinkPlannerImpl( + frameworkConfig, + this::createCatalogReader, + typeFactory, + cluster); + } + + private FlinkCalciteCatalogReader createCatalogReader(boolean lenientCaseSensitivity) { + SqlParser.Config sqlParserConfig = frameworkConfig.getParserConfig(); + final boolean caseSensitive; + if (lenientCaseSensitivity) { + caseSensitive = false; + } else { + caseSensitive = sqlParserConfig.caseSensitive(); + } + + SqlParser.Config newSqlParserConfig = SqlParser.configBuilder(sqlParserConfig) + .setCaseSensitive(caseSensitive) + .build(); + + SchemaPlus rootSchema = getRootSchema(frameworkConfig.getDefaultSchema()); + return new FlinkCalciteCatalogReader( + CalciteSchema.from(rootSchema), + Collections.emptyList(), + typeFactory, + CalciteConfig$.MODULE$.connectionConfig(newSqlParserConfig)); + } + + private SchemaPlus getRootSchema(SchemaPlus schema) { + if (schema.getParentSchema() == null) { + return schema; + } else { + return getRootSchema(schema.getParentSchema()); + } + } + + private CalciteConfig getCalciteConfig(TableConfig tableConfig) { + return tableConfig.getCalciteConfig(); + } + + /** + * Returns the SQL parser config for this environment including a custom Calcite configuration. + */ + private SqlParser.Config getSqlParserConfig() { + return JavaScalaConversionUtil.toJava(getCalciteConfig(tableConfig).getSqlParserConfig()).orElseGet( + // we use Java lex because back ticks are easier than double quotes in programming + // and cases are preserved + () -> SqlParser + .configBuilder() + .setLex(Lex.JAVA) + .setIdentifierMaxLength(256) + .build()); + } + + /** + * Returns the {@link SqlToRelConverter} config. + * + * <p>`expand` is set as false, and each sub-query becomes a [[org.apache.calcite.rex.RexSubQuery]]. + */ + private SqlToRelConverter.Config getSqlToRelConverterConfig(CalciteConfig calciteConfig) { + return JavaScalaConversionUtil.toJava(calciteConfig.getSqlToRelConverterConfig()).orElseGet( + () -> SqlToRelConverter.configBuilder() + .withTrimUnusedFields(false) + .withConvertTableAccess(false) + .withInSubQueryThreshold(Integer.MAX_VALUE) + .withExpand(false) + .build() + ); + } + + /** + * Returns the operator table for this environment including a custom Calcite configuration. + */ + private SqlOperatorTable getSqlOperatorTable(CalciteConfig calciteConfig, FunctionCatalog functionCatalog) { + return JavaScalaConversionUtil.toJava(calciteConfig.getSqlOperatorTable()).map(operatorTable -> { + if (calciteConfig.replacesSqlOperatorTable()) { + return operatorTable; + } else { + return ChainedSqlOperatorTable.of(getBuiltinSqlOperatorTable(functionCatalog), operatorTable); + } + } + ).orElseGet(() -> getBuiltinSqlOperatorTable(functionCatalog)); + } + + /** + * Returns builtin the operator table for this environment. + */ + private SqlOperatorTable getBuiltinSqlOperatorTable(FunctionCatalog functionCatalog) { + return ChainedSqlOperatorTable.of( + new ListSqlOperatorTable(functionCatalog.sqlFunctions()), + FlinkSqlOperatorTable.instance()); + } + +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 506f724..82d1655 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -29,12 +29,9 @@ import org.apache.flink.streaming.api.transformations.StreamTransformation import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableEnvironment, StreamTableEnvironment => JavaStreamTableEnv} import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnvironment, StreamTableEnvironment => ScalaStreamTableEnv} import org.apache.flink.table.calcite._ -import org.apache.flink.table.codegen.ExpressionReducer import org.apache.flink.table.dataformat.BaseRow -import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, extractResultTypeFromTableFunction, getAccumulatorTypeOfAggregateFunction, getResultTypeOfAggregateFunction} import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction} -import org.apache.flink.table.plan.cost.FlinkCostFactory import org.apache.flink.table.plan.nodes.calcite.{LogicalSink, Sink} import org.apache.flink.table.plan.nodes.exec.ExecNode import org.apache.flink.table.plan.nodes.physical.FlinkPhysicalRel @@ -43,6 +40,7 @@ import org.apache.flink.table.plan.reuse.SubplanReuser import org.apache.flink.table.plan.schema.RelTable import org.apache.flink.table.plan.stats.FlinkStatistic import org.apache.flink.table.plan.util.SameRelObjectShuttle +import org.apache.flink.table.planner.PlannerContext import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.sources.TableSource import org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType @@ -52,16 +50,12 @@ import org.apache.flink.table.types.{ClassLogicalTypeConverter, DataType} import org.apache.flink.table.validate.FunctionCatalog import org.apache.flink.types.Row -import org.apache.calcite.config.Lex import org.apache.calcite.jdbc.CalciteSchema -import org.apache.calcite.plan.{RelOptPlanner, RelTrait, RelTraitDef} +import org.apache.calcite.plan.{RelTrait, RelTraitDef} import org.apache.calcite.rel.RelNode import org.apache.calcite.schema.SchemaPlus import org.apache.calcite.schema.impl.AbstractTable import org.apache.calcite.sql._ -import org.apache.calcite.sql.parser.SqlParser -import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, ListSqlOperatorTable} -import org.apache.calcite.sql2rel.SqlToRelConverter import org.apache.calcite.tools._ import _root_.java.lang.reflect.Modifier @@ -82,36 +76,29 @@ abstract class TableEnvironment(val config: TableConfig) { protected val DEFAULT_JOB_NAME = "Flink Exec Table Job" - // the catalog to hold all registered and translated tables - // we disable caching here to prevent side effects - private val internalSchema: CalciteSchema = CalciteSchema.createRootSchema(false, false) - private val rootSchema: SchemaPlus = internalSchema.plus() private val functionCatalog = new FunctionCatalog - // the configuration to create a Calcite planner - protected lazy val frameworkConfig: FrameworkConfig = Frameworks - .newConfigBuilder - .defaultSchema(rootSchema) - .parserConfig(getSqlParserConfig) - .costFactory(new FlinkCostFactory) - .typeSystem(new FlinkTypeSystem) - .sqlToRelConverterConfig(getSqlToRelConverterConfig) - .operatorTable(ChainedSqlOperatorTable.of( - new ListSqlOperatorTable(functionCatalog.sqlFunctions), - FlinkSqlOperatorTable.instance())) - // set the executor to evaluate constant expressions - .executor(new ExpressionReducer(config)) - .context(new FlinkContextImpl(config)) - .traitDefs(getTraitDefs: _*) - .build - - // the builder for Calcite RelNodes, Calcite's representation of a relational expression tree. - protected lazy val relBuilder: FlinkRelBuilder = FlinkRelBuilder.create(frameworkConfig) - - // the planner instance used to optimize queries of this TableEnvironment - private lazy val planner: RelOptPlanner = relBuilder.getPlanner - - private lazy val typeFactory: FlinkTypeFactory = relBuilder.getTypeFactory + private val plannerContext: PlannerContext = + new PlannerContext( + config, + functionCatalog, + // the catalog to hold all registered and translated tables + // we disable caching here to prevent side effects + CalciteSchema.createRootSchema(false, false), + getTraitDefs.toList + ) + + private lazy val rootSchema: SchemaPlus = plannerContext.getRootSchema + + /** Returns the [[FlinkRelBuilder]] of this TableEnvironment. */ + private[flink] def getRelBuilder: FlinkRelBuilder = plannerContext.createRelBuilder() + + /** Returns the Calcite [[FrameworkConfig]] of this TableEnvironment. */ + @VisibleForTesting + private[flink] def getFlinkPlanner: FlinkPlannerImpl = plannerContext.createFlinkPlanner() + + /** Returns the [[FlinkTypeFactory]] of this TableEnvironment. */ + private[flink] def getTypeFactory: FlinkTypeFactory = plannerContext.getTypeFactory // a counter for unique attribute names private[flink] val attrNameCntr: AtomicInteger = new AtomicInteger(0) @@ -130,53 +117,6 @@ abstract class TableEnvironment(val config: TableConfig) { /** Returns the table config to define the runtime behavior of the Table API. */ def getConfig: TableConfig = config - /** Returns the [[FlinkRelBuilder]] of this TableEnvironment. */ - private[flink] def getRelBuilder: FlinkRelBuilder = relBuilder - - /** Returns the Calcite [[org.apache.calcite.plan.RelOptPlanner]] of this TableEnvironment. */ - private[flink] def getPlanner: RelOptPlanner = planner - - /** Returns the [[FlinkTypeFactory]] of this TableEnvironment. */ - private[flink] def getTypeFactory: FlinkTypeFactory = typeFactory - - /** Returns the Calcite [[FrameworkConfig]] of this TableEnvironment. */ - private[flink] def getFrameworkConfig: FrameworkConfig = frameworkConfig - - /** - * Returns the SqlToRelConverter config. - * - * `expand` is set as false, and each sub-query becomes a [[org.apache.calcite.rex.RexSubQuery]]. - */ - protected def getSqlToRelConverterConfig: SqlToRelConverter.Config = { - SqlToRelConverter.configBuilder() - .withTrimUnusedFields(false) - .withConvertTableAccess(false) - .withInSubQueryThreshold(Integer.MAX_VALUE) - .withExpand(false) - .build() - } - - /** - * Returns the SQL parser config for this environment including a custom Calcite configuration. - */ - protected def getSqlParserConfig: SqlParser.Config = { - val calciteConfig = config.getCalciteConfig - calciteConfig.getSqlParserConfig match { - - case None => - // we use Java lex because back ticks are easier than double quotes in programming - // and cases are preserved - SqlParser - .configBuilder() - .setLex(Lex.JAVA) - .setIdentifierMaxLength(256) - .build() - - case Some(sqlParserConfig) => - sqlParserConfig - } - } - /** Returns the [[QueryConfig]] depends on the concrete type of this TableEnvironment. */ private[flink] def queryConfig: QueryConfig @@ -392,7 +332,7 @@ abstract class TableEnvironment(val config: TableConfig) { val tableName = tablePath(tablePath.length - 1) val table = schema.getTable(tableName) if (table != null) { - val scan = relBuilder.scan(JArrays.asList(tablePath: _*)).build() + val scan = getRelBuilder.scan(JArrays.asList(tablePath: _*)).build() return Some(new TableImpl(this, scan)) } } @@ -428,11 +368,7 @@ abstract class TableEnvironment(val config: TableConfig) { * @return completion hints that fit at the current cursor position */ def getCompletionHints(statement: String, position: Int): Array[String] = { - val planner = new FlinkPlannerImpl( - getFrameworkConfig, - getPlanner, - getTypeFactory, - relBuilder.getCluster) + val planner = getFlinkPlanner planner.getCompletionHints(statement, position) } @@ -485,11 +421,7 @@ abstract class TableEnvironment(val config: TableConfig) { * @return The result of the query as Table */ def sqlQuery(query: String): Table = { - val planner = new FlinkPlannerImpl( - getFrameworkConfig, - getPlanner, - getTypeFactory, - relBuilder.getCluster) + val planner = getFlinkPlanner // parse the sql query val parsed = planner.parse(query) if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) { @@ -595,7 +527,7 @@ abstract class TableEnvironment(val config: TableConfig) { functionCatalog.registerScalarFunction( name, function, - typeFactory) + getTypeFactory) } /** @@ -626,7 +558,7 @@ abstract class TableEnvironment(val config: TableConfig) { name, function, fromLegacyInfoToDataType(implicitly[TypeInformation[T]]), - typeFactory) + getTypeFactory) } /** @@ -676,7 +608,7 @@ abstract class TableEnvironment(val config: TableConfig) { function, resultTypeInfo, accTypeInfo, - typeFactory) + getTypeFactory) } /** Returns a unique temporary attribute name. */ diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala index a31f8c3..c3490f9 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala @@ -22,13 +22,11 @@ import org.apache.flink.table.api.{SqlParserException, TableException, Validatio import com.google.common.collect.ImmutableList import org.apache.calcite.config.NullCollation -import org.apache.calcite.jdbc.CalciteSchema import org.apache.calcite.plan.RelOptTable.ViewExpander import org.apache.calcite.plan._ import org.apache.calcite.prepare.CalciteCatalogReader import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.{RelFieldCollation, RelRoot} -import org.apache.calcite.schema.SchemaPlus import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator} import org.apache.calcite.sql.parser.{SqlParser, SqlParseException => CSqlParseException} import org.apache.calcite.sql.validate.SqlValidator @@ -36,7 +34,9 @@ import org.apache.calcite.sql.{SqlNode, SqlOperatorTable} import org.apache.calcite.sql2rel.{RelDecorrelator, SqlRexConvertletTable, SqlToRelConverter} import org.apache.calcite.tools.{FrameworkConfig, RelConversionException} +import java.lang.{Boolean => JBoolean} import java.util +import java.util.function.{Function => JFunction} import scala.collection.JavaConversions._ @@ -48,7 +48,7 @@ import scala.collection.JavaConversions._ */ class FlinkPlannerImpl( config: FrameworkConfig, - planner: RelOptPlanner, + catalogReaderSupplier: JFunction[JBoolean, CalciteCatalogReader], typeFactory: FlinkTypeFactory, cluster: RelOptCluster) { @@ -57,7 +57,6 @@ class FlinkPlannerImpl( val traitDefs: ImmutableList[RelTraitDef[_ <: RelTrait]] = config.getTraitDefs val parserConfig: SqlParser.Config = config.getParserConfig val convertletTable: SqlRexConvertletTable = config.getConvertletTable - val defaultSchema: SchemaPlus = config.getDefaultSchema val sqlToRelConverterConfig: SqlToRelConverter.Config = config.getSqlToRelConverterConfig var validator: FlinkCalciteSqlValidator = _ @@ -65,9 +64,9 @@ class FlinkPlannerImpl( private def ready() { if (this.traitDefs != null) { - planner.clearRelTraitDefs() + cluster.getPlanner.clearRelTraitDefs() for (traitDef <- this.traitDefs) { - planner.addRelTraitDef(traitDef) + cluster.getPlanner.addRelTraitDef(traitDef) } } } @@ -75,7 +74,7 @@ class FlinkPlannerImpl( def getCompletionHints(sql: String, cursor: Int): Array[String] = { val advisorValidator = new SqlAdvisorValidator( operatorTable, - createCatalogReader(true), // ignore cases for lenient completion + catalogReaderSupplier.apply(true), // ignore cases for lenient completion typeFactory, config.getParserConfig.conformance()) val advisor = new SqlAdvisor(advisorValidator) @@ -98,7 +97,9 @@ class FlinkPlannerImpl( } def validate(sqlNode: SqlNode): SqlNode = { - validator = new FlinkCalciteSqlValidator(operatorTable, createCatalogReader(false), typeFactory) + validator = new FlinkCalciteSqlValidator( + operatorTable, + catalogReaderSupplier(false), typeFactory) validator.setIdentifierExpansion(true) validator.setDefaultNullCollation(FlinkPlannerImpl.defaultNullCollation) @@ -117,7 +118,7 @@ class FlinkPlannerImpl( val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter( new ViewExpanderImpl, validator, - createCatalogReader(false), + catalogReaderSupplier.apply(false), cluster, convertletTable, sqlToRelConverterConfig) @@ -155,7 +156,7 @@ class FlinkPlannerImpl( case e: CSqlParseException => throw new SqlParserException(s"SQL parse failed. ${e.getMessage}", e) } - val catalogReader: CalciteCatalogReader = createCatalogReader(false) + val catalogReader: CalciteCatalogReader = catalogReaderSupplier.apply(false) .withSchemaPath(schemaPath) val validator: SqlValidator = new FlinkCalciteSqlValidator(operatorTable, catalogReader, typeFactory) @@ -175,38 +176,9 @@ class FlinkPlannerImpl( } } - private def createCatalogReader(lenientCaseSensitivity: Boolean): CalciteCatalogReader = { - val rootSchema: SchemaPlus = FlinkPlannerImpl.rootSchema(defaultSchema) - - val caseSensitive = if (lenientCaseSensitivity) { - false - } else { - this.parserConfig.caseSensitive() - } - - val parserConfig = SqlParser.configBuilder(this.parserConfig) - .setCaseSensitive(caseSensitive) - .build() - - new FlinkCalciteCatalogReader( - CalciteSchema.from(rootSchema), - CalciteSchema.from(defaultSchema).path(null), - typeFactory, - CalciteConfig.connectionConfig(parserConfig) - ) - } - } object FlinkPlannerImpl { - private def rootSchema(schema: SchemaPlus): SchemaPlus = { - if (schema.getParentSchema == null) { - schema - } - else { - rootSchema(schema.getParentSchema) - } - } /** * the null default direction if not specified. Consistent with HIVE/SPARK/MYSQL/FLINK-RUNTIME. diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala index b8e2c1a..11de839 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala @@ -23,20 +23,14 @@ import org.apache.flink.table.expressions.WindowProperty import org.apache.flink.table.runtime.rank.{RankRange, RankType} import org.apache.flink.table.sinks.TableSink -import org.apache.calcite.config.{CalciteConnectionConfigImpl, CalciteConnectionProperty} -import org.apache.calcite.jdbc.CalciteSchema import org.apache.calcite.plan._ -import org.apache.calcite.plan.volcano.VolcanoPlanner import org.apache.calcite.rel.RelCollation import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} -import org.apache.calcite.rex.{RexBuilder, RexNode} -import org.apache.calcite.tools.{FrameworkConfig, RelBuilder, RelBuilderFactory} +import org.apache.calcite.rex.RexNode +import org.apache.calcite.tools.{RelBuilder, RelBuilderFactory} import org.apache.calcite.util.{ImmutableBitSet, Util} import java.util -import java.util.{Collections, Properties} - -import scala.collection.JavaConversions._ /** * Flink specific [[RelBuilder]] that changes the default type factory to a [[FlinkTypeFactory]]. @@ -66,8 +60,6 @@ class FlinkRelBuilder( def getRelOptSchema: RelOptSchema = relOptSchema - def getPlanner: RelOptPlanner = cluster.getPlanner - def getCluster: RelOptCluster = relOptCluster override def getTypeFactory: FlinkTypeFactory = @@ -104,36 +96,6 @@ class FlinkRelBuilder( object FlinkRelBuilder { - def create(config: FrameworkConfig): FlinkRelBuilder = { - - // create Flink type factory - val typeSystem = config.getTypeSystem - val typeFactory = new FlinkTypeFactory(typeSystem) - - // create context instances with Flink type factory - val context = config.getContext - val planner = new VolcanoPlanner(config.getCostFactory, context) - planner.setExecutor(config.getExecutor) - config.getTraitDefs.foreach(planner.addRelTraitDef) - - val cluster = FlinkRelOptClusterFactory.create(planner, new RexBuilder(typeFactory)) - val calciteSchema = CalciteSchema.from(config.getDefaultSchema) - - val prop = new Properties() - prop.setProperty( - CalciteConnectionProperty.CASE_SENSITIVE.camelName, - String.valueOf(config.getParserConfig.caseSensitive)) - val connectionConfig = new CalciteConnectionConfigImpl(prop) - - val relOptSchema = new FlinkCalciteCatalogReader( - calciteSchema, - Collections.emptyList(), - typeFactory, - connectionConfig) - - new FlinkRelBuilder(context, cluster, relOptSchema) - } - /** * Information necessary to create a window aggregate. * diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/util/JavaScalaConversionUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/util/JavaScalaConversionUtil.scala new file mode 100644 index 0000000..bee0b0e --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/util/JavaScalaConversionUtil.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.util + +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} + +import java.util.function.{BiConsumer, Consumer, Function} +import java.util.{Optional, List => JList} + +import scala.collection.JavaConverters._ + +/** + * Utilities for interoperability between Scala and Java classes. + */ +object JavaScalaConversionUtil { + + // most of these methods are not necessary once we upgraded to Scala 2.12 + + def toJava[T](option: Option[T]): Optional[T] = option match { + case Some(v) => Optional.of(v) + case None => Optional.empty() + } + + def toScala[T](option: Optional[T]): Option[T] = Option(option.orElse(null.asInstanceOf[T])) + + def toJava[T](func: (T) => Unit): Consumer[T] = new Consumer[T] { + override def accept(t: T): Unit = { + func.apply(t) + } + } + + def toJava[K, V](func: (K, V) => Unit): BiConsumer[K, V] = new BiConsumer[K, V] { + override def accept(k: K, v: V): Unit = { + func.apply(k ,v) + } + } + + def toJava[I, O](func: (I) => O): Function[I, O] = new Function[I, O] { + override def apply(in: I): O = { + func.apply(in) + } + } + + def toJava[T0, T1](tuple: (T0, T1)): JTuple2[T0, T1] = { + new JTuple2[T0, T1](tuple._1, tuple._2) + } + + def toJava[T](seq: Seq[T]): JList[T] = { + seq.asJava + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala index 6de2f4a..c48c4e6 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala @@ -26,11 +26,10 @@ import org.apache.flink.table.types.DataType import org.apache.flink.table.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo import org.apache.calcite.sql._ -import org.apache.calcite.sql.util.ListSqlOperatorTable + +import java.util import _root_.scala.collection.JavaConversions._ -import _root_.scala.collection.mutable -import scala.collection.mutable.ListBuffer /** * A catalog for looking up (user-defined) functions, used during validation phases @@ -39,7 +38,7 @@ import scala.collection.mutable.ListBuffer */ class FunctionCatalog() { - val sqlFunctions: ListBuffer[SqlFunction] = mutable.ListBuffer[SqlFunction]() + val sqlFunctions: util.List[SqlOperator] = new util.ArrayList[SqlOperator]() def registerScalarFunction( name: String, @@ -96,6 +95,4 @@ class FunctionCatalog() { sqlFunctions.map(_.getName) } - def getSqlOperatorTable: SqlOperatorTable = - new ListSqlOperatorTable(sqlFunctions) } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggTestBase.scala index 9d925c9..cf19da9 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggTestBase.scala @@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.api.environment.LocalStreamEnvironment import org.apache.flink.table.api.java.StreamTableEnvironment import org.apache.flink.table.api.{DataTypes, TableConfig} -import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem} +import org.apache.flink.table.calcite.{FlinkTypeFactory, FlinkTypeSystem} import org.apache.flink.table.codegen.CodeGeneratorContext import org.apache.flink.table.dataview.DataViewSpec import org.apache.flink.table.functions.aggfunctions.AvgAggFunction.{DoubleAvgAggFunction, IntegralAvgAggFunction} @@ -32,7 +32,7 @@ import org.apache.flink.table.types.logical.{BigIntType, DoubleType, LogicalType import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType import org.apache.calcite.rel.core.AggregateCall -import org.apache.calcite.tools.{FrameworkConfig, RelBuilder} +import org.apache.calcite.tools.RelBuilder import org.powermock.api.mockito.PowerMockito.{mock, when} /** @@ -44,14 +44,13 @@ abstract class AggTestBase { val env = new LocalStreamEnvironment val conf = new TableConfig val tEnv = new StreamTableEnvironment(env, conf) - val frameworkConfig: FrameworkConfig = tEnv.getFrameworkConfig val inputNames = Array("f0", "f1", "f2", "f3", "f4") val inputTypes: Array[LogicalType] = Array( new VarCharType(VarCharType.MAX_LENGTH), new BigIntType(), new DoubleType(), new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH)) - val inputType = RowType.of(inputTypes, inputNames) + val inputType: RowType = RowType.of(inputTypes, inputNames) - val relBuilder: RelBuilder = FlinkRelBuilder.create(frameworkConfig).values( + val relBuilder: RelBuilder = tEnv.getRelBuilder.values( typeFactory.buildRelNodeRowType(inputNames, inputTypes)) val aggInfo1: AggregateInfo = { val aggInfo = mock(classOf[AggregateInfo]) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala index 867b380..1c345b1 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala @@ -26,7 +26,6 @@ import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.api.TableConfig import org.apache.flink.table.api.java.StreamTableEnvironment -import org.apache.flink.table.calcite.FlinkPlannerImpl import org.apache.flink.table.codegen.{CodeGeneratorContext, ExprCodeGenerator, FunctionCodeGenerator} import org.apache.flink.table.dataformat.{BaseRow, BinaryRow, DataFormatConverters} import org.apache.flink.table.types.DataType @@ -58,11 +57,7 @@ abstract class ExpressionTestBase { private val env = StreamExecutionEnvironment.createLocalEnvironment(4) private val tEnv = StreamTableEnvironment.create(env, config) private val relBuilder = tEnv.getRelBuilder - private val planner = new FlinkPlannerImpl( - tEnv.getFrameworkConfig, - tEnv.getPlanner, - tEnv.getTypeFactory, - relBuilder.getCluster) + private val planner = tEnv.getFlinkPlanner // setup test utils private val tableName = "testTable" @@ -196,8 +191,7 @@ abstract class ExpressionTestBase { def testSqlApi( sqlExpr: String, - expected: String) - : Unit = { + expected: String): Unit = { addSqlTestExpr(sqlExpr, expected) } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala index ee69520..249da20 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala @@ -20,14 +20,14 @@ package org.apache.flink.table.plan.metadata import org.apache.flink.table.api.{TableConfig, TableException} import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty -import org.apache.flink.table.calcite.{FlinkCalciteCatalogReader, FlinkRelBuilder, FlinkTypeFactory} +import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory} import org.apache.flink.table.expressions.ApiExpressionUtils.intervalOfMillis import org.apache.flink.table.expressions._ import org.apache.flink.table.functions.aggfunctions.SumAggFunction.DoubleSumAggFunction import org.apache.flink.table.functions.aggfunctions.{DenseRankAggFunction, RankAggFunction, RowNumberAggFunction} import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable import org.apache.flink.table.plan.PartialFinalType -import org.apache.flink.table.plan.`trait`.FlinkRelDistribution +import org.apache.flink.table.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef} import org.apache.flink.table.plan.logical.{LogicalWindow, TumblingGroupWindow} import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.calcite.{LogicalExpand, LogicalRank, LogicalWindowAggregate} @@ -37,13 +37,17 @@ import org.apache.flink.table.plan.nodes.physical.stream._ import org.apache.flink.table.plan.schema.FlinkRelOptTable import org.apache.flink.table.plan.util.AggregateUtil.transformToStreamAggregateInfoList import org.apache.flink.table.plan.util._ +import org.apache.flink.table.planner.PlannerContext import org.apache.flink.table.runtime.rank.{ConstantRankRange, RankType, VariableRankRange} import org.apache.flink.table.types.AtomicDataType import org.apache.flink.table.types.logical.{BigIntType, DoubleType, IntType, LogicalType, TimestampKind, TimestampType, VarCharType} import org.apache.flink.table.util.CountAggFunction +import org.apache.flink.table.validate.FunctionCatalog import com.google.common.collect.{ImmutableList, Lists} +import org.apache.calcite.jdbc.CalciteSchema import org.apache.calcite.plan._ +import org.apache.calcite.prepare.CalciteCatalogReader import org.apache.calcite.rel._ import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl} import org.apache.calcite.rel.core._ @@ -57,7 +61,6 @@ import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.calcite.sql.fun.SqlStdOperatorTable._ import org.apache.calcite.sql.fun.{SqlCountAggFunction, SqlStdOperatorTable} import org.apache.calcite.sql.parser.SqlParserPos -import org.apache.calcite.tools.FrameworkConfig import org.apache.calcite.util.{DateString, ImmutableBitSet, TimeString, TimestampString} import org.junit.{Before, BeforeClass} @@ -70,11 +73,20 @@ class FlinkRelMdHandlerTestBase { val tableConfig = new TableConfig() val rootSchema: SchemaPlus = MetadataTestUtil.initRootSchema() - val frameworkConfig: FrameworkConfig = - MetadataTestUtil.createFrameworkConfig(rootSchema, tableConfig) - val typeFactory: FlinkTypeFactory = new FlinkTypeFactory(frameworkConfig.getTypeSystem) - val catalogReader: FlinkCalciteCatalogReader = - MetadataTestUtil.createCatalogReader(rootSchema, typeFactory) + // TODO batch RelNode and stream RelNode should have different PlanningConfigurationBuilder + // and RelOptCluster due to they have different trait definitions. + val plannerContext: PlannerContext = + new PlannerContext( + tableConfig, + new FunctionCatalog, + CalciteSchema.from(rootSchema), + util.Arrays.asList( + ConventionTraitDef.INSTANCE, + FlinkRelDistributionTraitDef.INSTANCE, + RelCollationTraitDef.INSTANCE + ) + ) + val typeFactory: FlinkTypeFactory = plannerContext.getTypeFactory val mq: FlinkRelMetadataQuery = FlinkRelMetadataQuery.instance() var relBuilder: FlinkRelBuilder = _ @@ -88,7 +100,7 @@ class FlinkRelMdHandlerTestBase { @Before def setUp(): Unit = { - relBuilder = FlinkRelBuilder.create(frameworkConfig) + relBuilder = plannerContext.createRelBuilder() rexBuilder = relBuilder.getRexBuilder cluster = relBuilder.getCluster @@ -2033,7 +2045,8 @@ class FlinkRelMdHandlerTestBase { protected def createDataStreamScan[T]( tableNames: util.List[String], traitSet: RelTraitSet): T = { - val table = catalogReader.getTable(tableNames).asInstanceOf[FlinkRelOptTable] + val table = relBuilder.getRelOptSchema.asInstanceOf[CalciteCatalogReader].getTable(tableNames) + .asInstanceOf[FlinkRelOptTable] val conventionTrait = traitSet.getTrait(ConventionTraitDef.INSTANCE) val scan = conventionTrait match { case Convention.NONE =>