This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 774ecea  [FLINK-12795][table-planner-blink] Extracted creation & 
configuration of FrameworkConfig & RelBuilder to separate class in blink planner
774ecea is described below

commit 774ecea757861fa5e1088fb091c304956145870b
Author: godfrey he <godfre...@163.com>
AuthorDate: Tue Jun 11 17:21:46 2019 +0800

    [FLINK-12795][table-planner-blink] Extracted creation & configuration of 
FrameworkConfig & RelBuilder to separate class in blink planner
    
    This closes #8677
---
 .../apache/flink/table/planner/PlannerContext.java | 223 +++++++++++++++++++++
 .../apache/flink/table/api/TableEnvironment.scala  | 126 +++---------
 .../flink/table/calcite/FlinkPlannerImpl.scala     |  50 +----
 .../flink/table/calcite/FlinkRelBuilder.scala      |  42 +---
 .../flink/table/util/JavaScalaConversionUtil.scala |  67 +++++++
 .../flink/table/validate/FunctionCatalog.scala     |   9 +-
 .../flink/table/codegen/agg/AggTestBase.scala      |   9 +-
 .../expressions/utils/ExpressionTestBase.scala     |  10 +-
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |  33 ++-
 9 files changed, 364 insertions(+), 205 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/PlannerContext.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/PlannerContext.java
new file mode 100644
index 0000000..205f0e7
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/PlannerContext.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.calcite.CalciteConfig;
+import org.apache.flink.table.calcite.CalciteConfig$;
+import org.apache.flink.table.calcite.FlinkCalciteCatalogReader;
+import org.apache.flink.table.calcite.FlinkContextImpl;
+import org.apache.flink.table.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.calcite.FlinkRelBuilder;
+import org.apache.flink.table.calcite.FlinkRelOptClusterFactory;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.calcite.FlinkTypeSystem;
+import org.apache.flink.table.codegen.ExpressionReducer;
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.plan.cost.FlinkCostFactory;
+import org.apache.flink.table.util.JavaScalaConversionUtil;
+import org.apache.flink.table.validate.FunctionCatalog;
+
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
+import org.apache.calcite.sql.util.ListSqlOperatorTable;
+import org.apache.calcite.sql2rel.SqlToRelConverter;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Utility class to create {@link org.apache.calcite.tools.RelBuilder} or 
{@link FrameworkConfig} used to create
+ * a corresponding {@link org.apache.calcite.tools.Planner}. It tries to 
separate static elements in a
+ * {@link org.apache.flink.table.api.TableEnvironment} like: root schema, cost 
factory, type system etc.
+ * from a dynamic properties like e.g. default path to look for objects in the 
schema.
+ */
+@Internal
+public class PlannerContext {
+       private final RelDataTypeSystem typeSystem = new FlinkTypeSystem();
+       private final FlinkTypeFactory typeFactory = new 
FlinkTypeFactory(typeSystem);
+       private final TableConfig tableConfig;
+       private final FunctionCatalog functionCatalog;
+       private final FrameworkConfig frameworkConfig;
+       private final RelOptCluster cluster;
+
+       public PlannerContext(
+                       TableConfig tableConfig,
+                       FunctionCatalog functionCatalog,
+                       CalciteSchema rootSchema,
+                       List<RelTraitDef> traitDefs) {
+               this.tableConfig = tableConfig;
+               this.functionCatalog = functionCatalog;
+               this.frameworkConfig = createFrameworkConfig(rootSchema, 
traitDefs);
+
+               RelOptPlanner planner = new 
VolcanoPlanner(frameworkConfig.getCostFactory(), frameworkConfig.getContext());
+               planner.setExecutor(frameworkConfig.getExecutor());
+               for (RelTraitDef traitDef : frameworkConfig.getTraitDefs()) {
+                       planner.addRelTraitDef(traitDef);
+               }
+               this.cluster = FlinkRelOptClusterFactory.create(planner, new 
RexBuilder(typeFactory));
+       }
+
+       private FrameworkConfig createFrameworkConfig(CalciteSchema rootSchema, 
List<RelTraitDef> traitDefs) {
+               return Frameworks.newConfigBuilder()
+                               .defaultSchema(rootSchema.plus())
+                               .parserConfig(getSqlParserConfig())
+                               .costFactory(new FlinkCostFactory())
+                               .typeSystem(typeSystem)
+                               
.sqlToRelConverterConfig(getSqlToRelConverterConfig(getCalciteConfig(tableConfig)))
+                               
.operatorTable(getSqlOperatorTable(getCalciteConfig(tableConfig), 
functionCatalog))
+                               // set the executor to evaluate constant 
expressions
+                               .executor(new ExpressionReducer(tableConfig, 
false))
+                               .context(new FlinkContextImpl(tableConfig))
+                               .traitDefs(traitDefs)
+                               .build();
+       }
+
+       /** Returns the {@link FlinkTypeFactory} that will be used. */
+       public FlinkTypeFactory getTypeFactory() {
+               return typeFactory;
+       }
+
+       public SchemaPlus getRootSchema() {
+               return frameworkConfig.getDefaultSchema();
+       }
+
+       /**
+        * Creates a configured {@link FlinkRelBuilder} for a planning session.
+        *
+        * @return configured rel builder
+        */
+       public FlinkRelBuilder createRelBuilder() {
+               FlinkCalciteCatalogReader relOptSchema = 
createCatalogReader(false);
+               return new FlinkRelBuilder(frameworkConfig.getContext(), 
cluster, relOptSchema);
+       }
+
+       /**
+        * Creates a configured {@link FlinkPlannerImpl} for a planning session.
+        *
+        * @return configured flink planner
+        */
+       public FlinkPlannerImpl createFlinkPlanner() {
+               return new FlinkPlannerImpl(
+                               frameworkConfig,
+                               this::createCatalogReader,
+                               typeFactory,
+                               cluster);
+       }
+
+       private FlinkCalciteCatalogReader createCatalogReader(boolean 
lenientCaseSensitivity) {
+               SqlParser.Config sqlParserConfig = 
frameworkConfig.getParserConfig();
+               final boolean caseSensitive;
+               if (lenientCaseSensitivity) {
+                       caseSensitive = false;
+               } else {
+                       caseSensitive = sqlParserConfig.caseSensitive();
+               }
+
+               SqlParser.Config newSqlParserConfig = 
SqlParser.configBuilder(sqlParserConfig)
+                               .setCaseSensitive(caseSensitive)
+                               .build();
+
+               SchemaPlus rootSchema = 
getRootSchema(frameworkConfig.getDefaultSchema());
+               return new FlinkCalciteCatalogReader(
+                               CalciteSchema.from(rootSchema),
+                               Collections.emptyList(),
+                               typeFactory,
+                               
CalciteConfig$.MODULE$.connectionConfig(newSqlParserConfig));
+       }
+
+       private SchemaPlus getRootSchema(SchemaPlus schema) {
+               if (schema.getParentSchema() == null) {
+                       return schema;
+               } else {
+                       return getRootSchema(schema.getParentSchema());
+               }
+       }
+
+       private CalciteConfig getCalciteConfig(TableConfig tableConfig) {
+               return tableConfig.getCalciteConfig();
+       }
+
+       /**
+        * Returns the SQL parser config for this environment including a 
custom Calcite configuration.
+        */
+       private SqlParser.Config getSqlParserConfig() {
+               return 
JavaScalaConversionUtil.toJava(getCalciteConfig(tableConfig).getSqlParserConfig()).orElseGet(
+                               // we use Java lex because back ticks are 
easier than double quotes in programming
+                               // and cases are preserved
+                               () -> SqlParser
+                                               .configBuilder()
+                                               .setLex(Lex.JAVA)
+                                               .setIdentifierMaxLength(256)
+                                               .build());
+       }
+
+       /**
+        * Returns the {@link SqlToRelConverter} config.
+        *
+        * <p>`expand` is set as false, and each sub-query becomes a 
[[org.apache.calcite.rex.RexSubQuery]].
+        */
+       private SqlToRelConverter.Config 
getSqlToRelConverterConfig(CalciteConfig calciteConfig) {
+               return 
JavaScalaConversionUtil.toJava(calciteConfig.getSqlToRelConverterConfig()).orElseGet(
+                               () -> SqlToRelConverter.configBuilder()
+                                               .withTrimUnusedFields(false)
+                                               .withConvertTableAccess(false)
+                                               
.withInSubQueryThreshold(Integer.MAX_VALUE)
+                                               .withExpand(false)
+                                               .build()
+               );
+       }
+
+       /**
+        * Returns the operator table for this environment including a custom 
Calcite configuration.
+        */
+       private SqlOperatorTable getSqlOperatorTable(CalciteConfig 
calciteConfig, FunctionCatalog functionCatalog) {
+               return 
JavaScalaConversionUtil.toJava(calciteConfig.getSqlOperatorTable()).map(operatorTable
 -> {
+                                       if 
(calciteConfig.replacesSqlOperatorTable()) {
+                                               return operatorTable;
+                                       } else {
+                                               return 
ChainedSqlOperatorTable.of(getBuiltinSqlOperatorTable(functionCatalog), 
operatorTable);
+                                       }
+                               }
+               ).orElseGet(() -> getBuiltinSqlOperatorTable(functionCatalog));
+       }
+
+       /**
+        * Returns builtin the operator table for this environment.
+        */
+       private SqlOperatorTable getBuiltinSqlOperatorTable(FunctionCatalog 
functionCatalog) {
+               return ChainedSqlOperatorTable.of(
+                               new 
ListSqlOperatorTable(functionCatalog.sqlFunctions()),
+                               FlinkSqlOperatorTable.instance());
+       }
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 506f724..82d1655 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -29,12 +29,9 @@ import 
org.apache.flink.streaming.api.transformations.StreamTransformation
 import org.apache.flink.table.api.java.{BatchTableEnvironment => 
JavaBatchTableEnvironment, StreamTableEnvironment => JavaStreamTableEnv}
 import org.apache.flink.table.api.scala.{BatchTableEnvironment => 
ScalaBatchTableEnvironment, StreamTableEnvironment => ScalaStreamTableEnv}
 import org.apache.flink.table.calcite._
-import org.apache.flink.table.codegen.ExpressionReducer
 import org.apache.flink.table.dataformat.BaseRow
-import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
 import 
org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation,
 checkNotSingleton, extractResultTypeFromTableFunction, 
getAccumulatorTypeOfAggregateFunction, getResultTypeOfAggregateFunction}
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, 
TableFunction}
-import org.apache.flink.table.plan.cost.FlinkCostFactory
 import org.apache.flink.table.plan.nodes.calcite.{LogicalSink, Sink}
 import org.apache.flink.table.plan.nodes.exec.ExecNode
 import org.apache.flink.table.plan.nodes.physical.FlinkPhysicalRel
@@ -43,6 +40,7 @@ import org.apache.flink.table.plan.reuse.SubplanReuser
 import org.apache.flink.table.plan.schema.RelTable
 import org.apache.flink.table.plan.stats.FlinkStatistic
 import org.apache.flink.table.plan.util.SameRelObjectShuttle
+import org.apache.flink.table.planner.PlannerContext
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.TableSource
 import 
org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
@@ -52,16 +50,12 @@ import 
org.apache.flink.table.types.{ClassLogicalTypeConverter, DataType}
 import org.apache.flink.table.validate.FunctionCatalog
 import org.apache.flink.types.Row
 
-import org.apache.calcite.config.Lex
 import org.apache.calcite.jdbc.CalciteSchema
-import org.apache.calcite.plan.{RelOptPlanner, RelTrait, RelTraitDef}
+import org.apache.calcite.plan.{RelTrait, RelTraitDef}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.schema.SchemaPlus
 import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.calcite.sql._
-import org.apache.calcite.sql.parser.SqlParser
-import org.apache.calcite.sql.util.{ChainedSqlOperatorTable, 
ListSqlOperatorTable}
-import org.apache.calcite.sql2rel.SqlToRelConverter
 import org.apache.calcite.tools._
 
 import _root_.java.lang.reflect.Modifier
@@ -82,36 +76,29 @@ abstract class TableEnvironment(val config: TableConfig) {
 
   protected val DEFAULT_JOB_NAME = "Flink Exec Table Job"
 
-  // the catalog to hold all registered and translated tables
-  // we disable caching here to prevent side effects
-  private val internalSchema: CalciteSchema = 
CalciteSchema.createRootSchema(false, false)
-  private val rootSchema: SchemaPlus = internalSchema.plus()
   private val functionCatalog = new FunctionCatalog
 
-  // the configuration to create a Calcite planner
-  protected lazy val frameworkConfig: FrameworkConfig = Frameworks
-    .newConfigBuilder
-    .defaultSchema(rootSchema)
-    .parserConfig(getSqlParserConfig)
-    .costFactory(new FlinkCostFactory)
-    .typeSystem(new FlinkTypeSystem)
-    .sqlToRelConverterConfig(getSqlToRelConverterConfig)
-    .operatorTable(ChainedSqlOperatorTable.of(
-      new ListSqlOperatorTable(functionCatalog.sqlFunctions),
-      FlinkSqlOperatorTable.instance()))
-    // set the executor to evaluate constant expressions
-    .executor(new ExpressionReducer(config))
-    .context(new FlinkContextImpl(config))
-    .traitDefs(getTraitDefs: _*)
-    .build
-
-  // the builder for Calcite RelNodes, Calcite's representation of a 
relational expression tree.
-  protected lazy val relBuilder: FlinkRelBuilder = 
FlinkRelBuilder.create(frameworkConfig)
-
-  // the planner instance used to optimize queries of this TableEnvironment
-  private lazy val planner: RelOptPlanner = relBuilder.getPlanner
-
-  private lazy val typeFactory: FlinkTypeFactory = relBuilder.getTypeFactory
+  private val plannerContext: PlannerContext =
+    new PlannerContext(
+      config,
+      functionCatalog,
+      // the catalog to hold all registered and translated tables
+      // we disable caching here to prevent side effects
+      CalciteSchema.createRootSchema(false, false),
+      getTraitDefs.toList
+    )
+
+  private lazy val rootSchema: SchemaPlus = plannerContext.getRootSchema
+
+  /** Returns the [[FlinkRelBuilder]] of this TableEnvironment. */
+  private[flink] def getRelBuilder: FlinkRelBuilder = 
plannerContext.createRelBuilder()
+
+  /** Returns the Calcite [[FrameworkConfig]] of this TableEnvironment. */
+  @VisibleForTesting
+  private[flink] def getFlinkPlanner: FlinkPlannerImpl = 
plannerContext.createFlinkPlanner()
+
+  /** Returns the [[FlinkTypeFactory]] of this TableEnvironment. */
+  private[flink] def getTypeFactory: FlinkTypeFactory = 
plannerContext.getTypeFactory
 
   // a counter for unique attribute names
   private[flink] val attrNameCntr: AtomicInteger = new AtomicInteger(0)
@@ -130,53 +117,6 @@ abstract class TableEnvironment(val config: TableConfig) {
   /** Returns the table config to define the runtime behavior of the Table 
API. */
   def getConfig: TableConfig = config
 
-  /** Returns the [[FlinkRelBuilder]] of this TableEnvironment. */
-  private[flink] def getRelBuilder: FlinkRelBuilder = relBuilder
-
-  /** Returns the Calcite [[org.apache.calcite.plan.RelOptPlanner]] of this 
TableEnvironment. */
-  private[flink] def getPlanner: RelOptPlanner = planner
-
-  /** Returns the [[FlinkTypeFactory]] of this TableEnvironment. */
-  private[flink] def getTypeFactory: FlinkTypeFactory = typeFactory
-
-  /** Returns the Calcite [[FrameworkConfig]] of this TableEnvironment. */
-  private[flink] def getFrameworkConfig: FrameworkConfig = frameworkConfig
-
-  /**
-    * Returns the SqlToRelConverter config.
-    *
-    * `expand` is set as false, and each sub-query becomes a 
[[org.apache.calcite.rex.RexSubQuery]].
-    */
-  protected def getSqlToRelConverterConfig: SqlToRelConverter.Config = {
-    SqlToRelConverter.configBuilder()
-    .withTrimUnusedFields(false)
-    .withConvertTableAccess(false)
-    .withInSubQueryThreshold(Integer.MAX_VALUE)
-    .withExpand(false)
-    .build()
-  }
-
-  /**
-    * Returns the SQL parser config for this environment including a custom 
Calcite configuration.
-    */
-  protected def getSqlParserConfig: SqlParser.Config = {
-    val calciteConfig = config.getCalciteConfig
-    calciteConfig.getSqlParserConfig match {
-
-      case None =>
-        // we use Java lex because back ticks are easier than double quotes in 
programming
-        // and cases are preserved
-        SqlParser
-          .configBuilder()
-          .setLex(Lex.JAVA)
-          .setIdentifierMaxLength(256)
-          .build()
-
-      case Some(sqlParserConfig) =>
-        sqlParserConfig
-    }
-  }
-
   /** Returns the [[QueryConfig]] depends on the concrete type of this 
TableEnvironment. */
   private[flink] def queryConfig: QueryConfig
 
@@ -392,7 +332,7 @@ abstract class TableEnvironment(val config: TableConfig) {
       val tableName = tablePath(tablePath.length - 1)
       val table = schema.getTable(tableName)
       if (table != null) {
-        val scan = relBuilder.scan(JArrays.asList(tablePath: _*)).build()
+        val scan = getRelBuilder.scan(JArrays.asList(tablePath: _*)).build()
         return Some(new TableImpl(this, scan))
       }
     }
@@ -428,11 +368,7 @@ abstract class TableEnvironment(val config: TableConfig) {
     * @return completion hints that fit at the current cursor position
     */
   def getCompletionHints(statement: String, position: Int): Array[String] = {
-    val planner = new FlinkPlannerImpl(
-      getFrameworkConfig,
-      getPlanner,
-      getTypeFactory,
-      relBuilder.getCluster)
+    val planner = getFlinkPlanner
     planner.getCompletionHints(statement, position)
   }
 
@@ -485,11 +421,7 @@ abstract class TableEnvironment(val config: TableConfig) {
     * @return The result of the query as Table
     */
   def sqlQuery(query: String): Table = {
-    val planner = new FlinkPlannerImpl(
-      getFrameworkConfig,
-      getPlanner,
-      getTypeFactory,
-      relBuilder.getCluster)
+    val planner = getFlinkPlanner
     // parse the sql query
     val parsed = planner.parse(query)
     if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
@@ -595,7 +527,7 @@ abstract class TableEnvironment(val config: TableConfig) {
     functionCatalog.registerScalarFunction(
       name,
       function,
-      typeFactory)
+      getTypeFactory)
   }
 
   /**
@@ -626,7 +558,7 @@ abstract class TableEnvironment(val config: TableConfig) {
       name,
       function,
       fromLegacyInfoToDataType(implicitly[TypeInformation[T]]),
-      typeFactory)
+      getTypeFactory)
   }
 
   /**
@@ -676,7 +608,7 @@ abstract class TableEnvironment(val config: TableConfig) {
       function,
       resultTypeInfo,
       accTypeInfo,
-      typeFactory)
+      getTypeFactory)
   }
 
   /** Returns a unique temporary attribute name. */
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index a31f8c3..c3490f9 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -22,13 +22,11 @@ import org.apache.flink.table.api.{SqlParserException, 
TableException, Validatio
 
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.config.NullCollation
-import org.apache.calcite.jdbc.CalciteSchema
 import org.apache.calcite.plan.RelOptTable.ViewExpander
 import org.apache.calcite.plan._
 import org.apache.calcite.prepare.CalciteCatalogReader
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{RelFieldCollation, RelRoot}
-import org.apache.calcite.schema.SchemaPlus
 import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator}
 import org.apache.calcite.sql.parser.{SqlParser, SqlParseException => 
CSqlParseException}
 import org.apache.calcite.sql.validate.SqlValidator
@@ -36,7 +34,9 @@ import org.apache.calcite.sql.{SqlNode, SqlOperatorTable}
 import org.apache.calcite.sql2rel.{RelDecorrelator, SqlRexConvertletTable, 
SqlToRelConverter}
 import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}
 
+import java.lang.{Boolean => JBoolean}
 import java.util
+import java.util.function.{Function => JFunction}
 
 import scala.collection.JavaConversions._
 
@@ -48,7 +48,7 @@ import scala.collection.JavaConversions._
   */
 class FlinkPlannerImpl(
     config: FrameworkConfig,
-    planner: RelOptPlanner,
+    catalogReaderSupplier: JFunction[JBoolean, CalciteCatalogReader],
     typeFactory: FlinkTypeFactory,
     cluster: RelOptCluster) {
 
@@ -57,7 +57,6 @@ class FlinkPlannerImpl(
   val traitDefs: ImmutableList[RelTraitDef[_ <: RelTrait]] = 
config.getTraitDefs
   val parserConfig: SqlParser.Config = config.getParserConfig
   val convertletTable: SqlRexConvertletTable = config.getConvertletTable
-  val defaultSchema: SchemaPlus = config.getDefaultSchema
   val sqlToRelConverterConfig: SqlToRelConverter.Config = 
config.getSqlToRelConverterConfig
 
   var validator: FlinkCalciteSqlValidator = _
@@ -65,9 +64,9 @@ class FlinkPlannerImpl(
 
   private def ready() {
     if (this.traitDefs != null) {
-      planner.clearRelTraitDefs()
+      cluster.getPlanner.clearRelTraitDefs()
       for (traitDef <- this.traitDefs) {
-        planner.addRelTraitDef(traitDef)
+        cluster.getPlanner.addRelTraitDef(traitDef)
       }
     }
   }
@@ -75,7 +74,7 @@ class FlinkPlannerImpl(
   def getCompletionHints(sql: String, cursor: Int): Array[String] = {
     val advisorValidator = new SqlAdvisorValidator(
       operatorTable,
-      createCatalogReader(true), // ignore cases for lenient completion
+      catalogReaderSupplier.apply(true), // ignore cases for lenient completion
       typeFactory,
       config.getParserConfig.conformance())
     val advisor = new SqlAdvisor(advisorValidator)
@@ -98,7 +97,9 @@ class FlinkPlannerImpl(
   }
 
   def validate(sqlNode: SqlNode): SqlNode = {
-    validator = new FlinkCalciteSqlValidator(operatorTable, 
createCatalogReader(false), typeFactory)
+    validator = new FlinkCalciteSqlValidator(
+      operatorTable,
+      catalogReaderSupplier(false), typeFactory)
     validator.setIdentifierExpansion(true)
     validator.setDefaultNullCollation(FlinkPlannerImpl.defaultNullCollation)
 
@@ -117,7 +118,7 @@ class FlinkPlannerImpl(
       val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
         new ViewExpanderImpl,
         validator,
-        createCatalogReader(false),
+        catalogReaderSupplier.apply(false),
         cluster,
         convertletTable,
         sqlToRelConverterConfig)
@@ -155,7 +156,7 @@ class FlinkPlannerImpl(
         case e: CSqlParseException =>
           throw new SqlParserException(s"SQL parse failed. ${e.getMessage}", e)
       }
-      val catalogReader: CalciteCatalogReader = createCatalogReader(false)
+      val catalogReader: CalciteCatalogReader = 
catalogReaderSupplier.apply(false)
         .withSchemaPath(schemaPath)
       val validator: SqlValidator =
         new FlinkCalciteSqlValidator(operatorTable, catalogReader, typeFactory)
@@ -175,38 +176,9 @@ class FlinkPlannerImpl(
     }
   }
 
-  private def createCatalogReader(lenientCaseSensitivity: Boolean): 
CalciteCatalogReader = {
-    val rootSchema: SchemaPlus = FlinkPlannerImpl.rootSchema(defaultSchema)
-
-    val caseSensitive = if (lenientCaseSensitivity) {
-      false
-    } else {
-      this.parserConfig.caseSensitive()
-    }
-
-    val parserConfig = SqlParser.configBuilder(this.parserConfig)
-      .setCaseSensitive(caseSensitive)
-      .build()
-
-    new FlinkCalciteCatalogReader(
-      CalciteSchema.from(rootSchema),
-      CalciteSchema.from(defaultSchema).path(null),
-      typeFactory,
-      CalciteConfig.connectionConfig(parserConfig)
-    )
-  }
-
 }
 
 object FlinkPlannerImpl {
-  private def rootSchema(schema: SchemaPlus): SchemaPlus = {
-    if (schema.getParentSchema == null) {
-      schema
-    }
-    else {
-      rootSchema(schema.getParentSchema)
-    }
-  }
 
   /**
     * the null default direction if not specified. Consistent with 
HIVE/SPARK/MYSQL/FLINK-RUNTIME.
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
index b8e2c1a..11de839 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
@@ -23,20 +23,14 @@ import org.apache.flink.table.expressions.WindowProperty
 import org.apache.flink.table.runtime.rank.{RankRange, RankType}
 import org.apache.flink.table.sinks.TableSink
 
-import org.apache.calcite.config.{CalciteConnectionConfigImpl, 
CalciteConnectionProperty}
-import org.apache.calcite.jdbc.CalciteSchema
 import org.apache.calcite.plan._
-import org.apache.calcite.plan.volcano.VolcanoPlanner
 import org.apache.calcite.rel.RelCollation
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
-import org.apache.calcite.rex.{RexBuilder, RexNode}
-import org.apache.calcite.tools.{FrameworkConfig, RelBuilder, 
RelBuilderFactory}
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.{RelBuilder, RelBuilderFactory}
 import org.apache.calcite.util.{ImmutableBitSet, Util}
 
 import java.util
-import java.util.{Collections, Properties}
-
-import scala.collection.JavaConversions._
 
 /**
   * Flink specific [[RelBuilder]] that changes the default type factory to a 
[[FlinkTypeFactory]].
@@ -66,8 +60,6 @@ class FlinkRelBuilder(
 
   def getRelOptSchema: RelOptSchema = relOptSchema
 
-  def getPlanner: RelOptPlanner = cluster.getPlanner
-
   def getCluster: RelOptCluster = relOptCluster
 
   override def getTypeFactory: FlinkTypeFactory =
@@ -104,36 +96,6 @@ class FlinkRelBuilder(
 
 object FlinkRelBuilder {
 
-  def create(config: FrameworkConfig): FlinkRelBuilder = {
-
-    // create Flink type factory
-    val typeSystem = config.getTypeSystem
-    val typeFactory = new FlinkTypeFactory(typeSystem)
-
-    // create context instances with Flink type factory
-    val context = config.getContext
-    val planner = new VolcanoPlanner(config.getCostFactory, context)
-    planner.setExecutor(config.getExecutor)
-    config.getTraitDefs.foreach(planner.addRelTraitDef)
-
-    val cluster = FlinkRelOptClusterFactory.create(planner, new 
RexBuilder(typeFactory))
-    val calciteSchema = CalciteSchema.from(config.getDefaultSchema)
-
-    val prop = new Properties()
-    prop.setProperty(
-      CalciteConnectionProperty.CASE_SENSITIVE.camelName,
-      String.valueOf(config.getParserConfig.caseSensitive))
-    val connectionConfig = new CalciteConnectionConfigImpl(prop)
-
-    val relOptSchema = new FlinkCalciteCatalogReader(
-      calciteSchema,
-      Collections.emptyList(),
-      typeFactory,
-      connectionConfig)
-
-    new FlinkRelBuilder(context, cluster, relOptSchema)
-  }
-
   /**
     * Information necessary to create a window aggregate.
     *
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/util/JavaScalaConversionUtil.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/util/JavaScalaConversionUtil.scala
new file mode 100644
index 0000000..bee0b0e
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/util/JavaScalaConversionUtil.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.util
+
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+
+import java.util.function.{BiConsumer, Consumer, Function}
+import java.util.{Optional, List => JList}
+
+import scala.collection.JavaConverters._
+
+/**
+  * Utilities for interoperability between Scala and Java classes.
+  */
+object JavaScalaConversionUtil {
+
+  // most of these methods are not necessary once we upgraded to Scala 2.12
+
+  def toJava[T](option: Option[T]): Optional[T] = option match {
+    case Some(v) => Optional.of(v)
+    case None => Optional.empty()
+  }
+
+  def toScala[T](option: Optional[T]): Option[T] = 
Option(option.orElse(null.asInstanceOf[T]))
+
+  def toJava[T](func: (T) => Unit): Consumer[T] = new Consumer[T] {
+    override def accept(t: T): Unit = {
+      func.apply(t)
+    }
+  }
+
+  def toJava[K, V](func: (K, V) => Unit): BiConsumer[K, V] = new BiConsumer[K, 
V] {
+    override def accept(k: K, v: V): Unit = {
+      func.apply(k ,v)
+    }
+  }
+
+  def toJava[I, O](func: (I) => O): Function[I, O] = new Function[I, O] {
+    override def apply(in: I): O = {
+      func.apply(in)
+    }
+  }
+
+  def toJava[T0, T1](tuple: (T0, T1)): JTuple2[T0, T1] = {
+    new JTuple2[T0, T1](tuple._1, tuple._2)
+  }
+
+  def toJava[T](seq: Seq[T]): JList[T] = {
+    seq.asJava
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 6de2f4a..c48c4e6 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -26,11 +26,10 @@ import org.apache.flink.table.types.DataType
 import 
org.apache.flink.table.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
 
 import org.apache.calcite.sql._
-import org.apache.calcite.sql.util.ListSqlOperatorTable
+
+import java.util
 
 import _root_.scala.collection.JavaConversions._
-import _root_.scala.collection.mutable
-import scala.collection.mutable.ListBuffer
 
 /**
   * A catalog for looking up (user-defined) functions, used during validation 
phases
@@ -39,7 +38,7 @@ import scala.collection.mutable.ListBuffer
   */
 class FunctionCatalog() {
 
-  val sqlFunctions: ListBuffer[SqlFunction] = mutable.ListBuffer[SqlFunction]()
+  val sqlFunctions: util.List[SqlOperator] = new util.ArrayList[SqlOperator]()
 
   def registerScalarFunction(
       name: String,
@@ -96,6 +95,4 @@ class FunctionCatalog() {
     sqlFunctions.map(_.getName)
   }
 
-  def getSqlOperatorTable: SqlOperatorTable =
-      new ListSqlOperatorTable(sqlFunctions)
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggTestBase.scala
index 9d925c9..cf19da9 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/codegen/agg/AggTestBase.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.RuntimeContext
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment
 import org.apache.flink.table.api.java.StreamTableEnvironment
 import org.apache.flink.table.api.{DataTypes, TableConfig}
-import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory, 
FlinkTypeSystem}
+import org.apache.flink.table.calcite.{FlinkTypeFactory, FlinkTypeSystem}
 import org.apache.flink.table.codegen.CodeGeneratorContext
 import org.apache.flink.table.dataview.DataViewSpec
 import 
org.apache.flink.table.functions.aggfunctions.AvgAggFunction.{DoubleAvgAggFunction,
 IntegralAvgAggFunction}
@@ -32,7 +32,7 @@ import org.apache.flink.table.types.logical.{BigIntType, 
DoubleType, LogicalType
 import 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
 
 import org.apache.calcite.rel.core.AggregateCall
-import org.apache.calcite.tools.{FrameworkConfig, RelBuilder}
+import org.apache.calcite.tools.RelBuilder
 import org.powermock.api.mockito.PowerMockito.{mock, when}
 
 /**
@@ -44,14 +44,13 @@ abstract class AggTestBase {
   val env = new LocalStreamEnvironment
   val conf = new TableConfig
   val tEnv = new StreamTableEnvironment(env, conf)
-  val frameworkConfig: FrameworkConfig = tEnv.getFrameworkConfig
   val inputNames = Array("f0", "f1", "f2", "f3", "f4")
   val inputTypes: Array[LogicalType] = Array(
     new VarCharType(VarCharType.MAX_LENGTH), new BigIntType(), new 
DoubleType(), new BigIntType(),
     new VarCharType(VarCharType.MAX_LENGTH))
-  val inputType = RowType.of(inputTypes, inputNames)
+  val inputType: RowType = RowType.of(inputTypes, inputNames)
 
-  val relBuilder: RelBuilder = FlinkRelBuilder.create(frameworkConfig).values(
+  val relBuilder: RelBuilder = tEnv.getRelBuilder.values(
     typeFactory.buildRelNodeRowType(inputNames, inputTypes))
   val aggInfo1: AggregateInfo = {
     val aggInfo = mock(classOf[AggregateInfo])
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
index 867b380..1c345b1 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
@@ -26,7 +26,6 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.api.java.StreamTableEnvironment
-import org.apache.flink.table.calcite.FlinkPlannerImpl
 import org.apache.flink.table.codegen.{CodeGeneratorContext, 
ExprCodeGenerator, FunctionCodeGenerator}
 import org.apache.flink.table.dataformat.{BaseRow, BinaryRow, 
DataFormatConverters}
 import org.apache.flink.table.types.DataType
@@ -58,11 +57,7 @@ abstract class ExpressionTestBase {
   private val env = StreamExecutionEnvironment.createLocalEnvironment(4)
   private val tEnv = StreamTableEnvironment.create(env, config)
   private val relBuilder = tEnv.getRelBuilder
-  private val planner = new FlinkPlannerImpl(
-    tEnv.getFrameworkConfig,
-    tEnv.getPlanner,
-    tEnv.getTypeFactory,
-    relBuilder.getCluster)
+  private val planner = tEnv.getFlinkPlanner
 
   // setup test utils
   private val tableName = "testTable"
@@ -196,8 +191,7 @@ abstract class ExpressionTestBase {
 
   def testSqlApi(
       sqlExpr: String,
-      expected: String)
-    : Unit = {
+      expected: String): Unit = {
     addSqlTestExpr(sqlExpr, expected)
   }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
index ee69520..249da20 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -20,14 +20,14 @@ package org.apache.flink.table.plan.metadata
 
 import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.table.calcite.{FlinkCalciteCatalogReader, 
FlinkRelBuilder, FlinkTypeFactory}
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
 import org.apache.flink.table.expressions.ApiExpressionUtils.intervalOfMillis
 import org.apache.flink.table.expressions._
 import 
org.apache.flink.table.functions.aggfunctions.SumAggFunction.DoubleSumAggFunction
 import org.apache.flink.table.functions.aggfunctions.{DenseRankAggFunction, 
RankAggFunction, RowNumberAggFunction}
 import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
 import org.apache.flink.table.plan.PartialFinalType
-import org.apache.flink.table.plan.`trait`.FlinkRelDistribution
+import org.apache.flink.table.plan.`trait`.{FlinkRelDistribution, 
FlinkRelDistributionTraitDef}
 import org.apache.flink.table.plan.logical.{LogicalWindow, TumblingGroupWindow}
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.calcite.{LogicalExpand, LogicalRank, 
LogicalWindowAggregate}
@@ -37,13 +37,17 @@ import org.apache.flink.table.plan.nodes.physical.stream._
 import org.apache.flink.table.plan.schema.FlinkRelOptTable
 import 
org.apache.flink.table.plan.util.AggregateUtil.transformToStreamAggregateInfoList
 import org.apache.flink.table.plan.util._
+import org.apache.flink.table.planner.PlannerContext
 import org.apache.flink.table.runtime.rank.{ConstantRankRange, RankType, 
VariableRankRange}
 import org.apache.flink.table.types.AtomicDataType
 import org.apache.flink.table.types.logical.{BigIntType, DoubleType, IntType, 
LogicalType, TimestampKind, TimestampType, VarCharType}
 import org.apache.flink.table.util.CountAggFunction
+import org.apache.flink.table.validate.FunctionCatalog
 
 import com.google.common.collect.{ImmutableList, Lists}
+import org.apache.calcite.jdbc.CalciteSchema
 import org.apache.calcite.plan._
+import org.apache.calcite.prepare.CalciteCatalogReader
 import org.apache.calcite.rel._
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl}
 import org.apache.calcite.rel.core._
@@ -57,7 +61,6 @@ import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql.fun.SqlStdOperatorTable._
 import org.apache.calcite.sql.fun.{SqlCountAggFunction, SqlStdOperatorTable}
 import org.apache.calcite.sql.parser.SqlParserPos
-import org.apache.calcite.tools.FrameworkConfig
 import org.apache.calcite.util.{DateString, ImmutableBitSet, TimeString, 
TimestampString}
 import org.junit.{Before, BeforeClass}
 
@@ -70,11 +73,20 @@ class FlinkRelMdHandlerTestBase {
 
   val tableConfig = new TableConfig()
   val rootSchema: SchemaPlus = MetadataTestUtil.initRootSchema()
-  val frameworkConfig: FrameworkConfig =
-    MetadataTestUtil.createFrameworkConfig(rootSchema, tableConfig)
-  val typeFactory: FlinkTypeFactory = new 
FlinkTypeFactory(frameworkConfig.getTypeSystem)
-  val catalogReader: FlinkCalciteCatalogReader =
-    MetadataTestUtil.createCatalogReader(rootSchema, typeFactory)
+  // TODO batch RelNode and stream RelNode should have different 
PlanningConfigurationBuilder
+  //  and RelOptCluster due to they have different trait definitions.
+  val plannerContext: PlannerContext =
+    new PlannerContext(
+      tableConfig,
+      new FunctionCatalog,
+      CalciteSchema.from(rootSchema),
+      util.Arrays.asList(
+        ConventionTraitDef.INSTANCE,
+        FlinkRelDistributionTraitDef.INSTANCE,
+        RelCollationTraitDef.INSTANCE
+      )
+    )
+  val typeFactory: FlinkTypeFactory = plannerContext.getTypeFactory
   val mq: FlinkRelMetadataQuery = FlinkRelMetadataQuery.instance()
 
   var relBuilder: FlinkRelBuilder = _
@@ -88,7 +100,7 @@ class FlinkRelMdHandlerTestBase {
 
   @Before
   def setUp(): Unit = {
-    relBuilder = FlinkRelBuilder.create(frameworkConfig)
+    relBuilder = plannerContext.createRelBuilder()
 
     rexBuilder = relBuilder.getRexBuilder
     cluster = relBuilder.getCluster
@@ -2033,7 +2045,8 @@ class FlinkRelMdHandlerTestBase {
 
   protected def createDataStreamScan[T](
       tableNames: util.List[String], traitSet: RelTraitSet): T = {
-    val table = 
catalogReader.getTable(tableNames).asInstanceOf[FlinkRelOptTable]
+    val table = 
relBuilder.getRelOptSchema.asInstanceOf[CalciteCatalogReader].getTable(tableNames)
+      .asInstanceOf[FlinkRelOptTable]
     val conventionTrait = traitSet.getTrait(ConventionTraitDef.INSTANCE)
     val scan = conventionTrait match {
       case Convention.NONE =>

Reply via email to