[FLINK-5570] [table] Register ExternalCatalogs in TableEnvironment.

This closes #3409.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/135a57c4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/135a57c4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/135a57c4

Branch: refs/heads/table-retraction
Commit: 135a57c4bb37eaa9cb85faaff1cc694f9448fabd
Parents: 976e03c
Author: jingzhang <beyond1...@126.com>
Authored: Thu Mar 16 11:24:09 2017 +0800
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Fri Mar 24 20:19:17 2017 +0100

----------------------------------------------------------------------
 docs/dev/table_api.md                           |  37 +++++
 .../flink/table/api/TableEnvironment.scala      | 104 ++++++++++--
 .../org/apache/flink/table/api/exceptions.scala |  62 +++++--
 .../table/catalog/ExternalCatalogSchema.scala   |  14 +-
 .../flink/table/plan/logical/operators.scala    |   4 +-
 .../flink/table/ExternalCatalogTest.scala       | 161 +++++++++++++++++++
 .../catalog/ExternalCatalogSchemaTest.scala     |   5 +-
 7 files changed, 342 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 03b916c..117f32f 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -344,6 +344,43 @@ tableEnvironment.unregisterTable("Customers")
 </div>
 </div>
 
+Registering external Catalogs
+--------------------------------
+
+An external catalog is defined by the `ExternalCatalog` interface and provides 
information about databases and tables such as their name, schema, statistics, 
and access information. An `ExternalCatalog` is registered in a 
`TableEnvironment` as follows: 
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// works for StreamExecutionEnvironment identically
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+ExternalCatalog customerCatalog = new InMemoryExternalCatalog();
+
+// register the ExternalCatalog customerCatalog
+tableEnv.registerExternalCatalog("Customers", customerCatalog);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// works for StreamExecutionEnvironment identically
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+val customerCatalog: ExternalCatalog = new InMemoryExternalCatalog
+
+// register the ExternalCatalog customerCatalog
+tableEnv.registerExternalCatalog("Customers", customerCatalog)
+
+{% endhighlight %}
+</div>
+</div>
+
+Once registered in a `TableEnvironment`, all tables defined in a 
`ExternalCatalog` can be accessed from Table API or SQL queries by specifying 
their full path (`catalog`.`database`.`table`).
+
+Currently, Flink provides an `InMemoryExternalCatalog` for demo and testing 
purposes. However, the `ExternalCatalog` interface can also be used to connect 
catalogs like HCatalog or Metastore to the Table API.
 
 Table API
 ----------

http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 1dda3a8..bb4c3ac 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -46,6 +46,7 @@ import 
org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => Scala
 import org.apache.flink.table.api.java.{BatchTableEnvironment => 
JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
 import org.apache.flink.table.api.scala.{BatchTableEnvironment => 
ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
 import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, 
FlinkTypeFactory, FlinkTypeSystem}
+import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
 import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
 import org.apache.flink.table.expressions.{Alias, Expression, 
UnresolvedFieldReference}
 import 
org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation,
 checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions}
@@ -60,6 +61,8 @@ import org.apache.flink.table.validate.FunctionCatalog
 import org.apache.flink.types.Row
 
 import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable.HashMap
+import _root_.scala.annotation.varargs
 
 /**
   * The abstract base class for batch and stream TableEnvironments.
@@ -71,7 +74,7 @@ abstract class TableEnvironment(val config: TableConfig) {
   // the catalog to hold all registered and translated tables
   // we disable caching here to prevent side effects
   private val internalSchema: CalciteSchema = 
CalciteSchema.createRootSchema(true, false)
-  private val tables: SchemaPlus = internalSchema.plus()
+  private val rootSchema: SchemaPlus = internalSchema.plus()
 
   // Table API/SQL function catalog
   private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns
@@ -79,7 +82,7 @@ abstract class TableEnvironment(val config: TableConfig) {
   // the configuration to create a Calcite planner
   private lazy val frameworkConfig: FrameworkConfig = Frameworks
     .newConfigBuilder
-    .defaultSchema(tables)
+    .defaultSchema(rootSchema)
     .parserConfig(getSqlParserConfig)
     .costFactory(new DataSetCostFactory)
     .typeSystem(new FlinkTypeSystem)
@@ -99,6 +102,9 @@ abstract class TableEnvironment(val config: TableConfig) {
   // a counter for unique attribute names
   private[flink] val attrNameCntr: AtomicInteger = new AtomicInteger(0)
 
+  // registered external catalog names -> catalog
+  private val externalCatalogs = new HashMap[String, ExternalCatalog]
+
   /** Returns the table config to define the runtime behavior of the Table 
API. */
   def getConfig = config
 
@@ -246,6 +252,35 @@ abstract class TableEnvironment(val config: TableConfig) {
   }
 
   /**
+    * Registers an [[ExternalCatalog]] under a unique name in the 
TableEnvironment's schema.
+    * All tables registered in the [[ExternalCatalog]] can be accessed.
+    *
+    * @param name            The name under which the externalCatalog will be 
registered
+    * @param externalCatalog The externalCatalog to register
+    */
+  def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): 
Unit = {
+    if (rootSchema.getSubSchema(name) != null) {
+      throw new ExternalCatalogAlreadyExistException(name)
+    }
+    this.externalCatalogs.put(name, externalCatalog)
+    // create an external catalog calicte schema, register it on the root 
schema
+    ExternalCatalogSchema.registerCatalog(rootSchema, name, externalCatalog)
+  }
+
+  /**
+    * Gets a registered [[ExternalCatalog]] by name.
+    *
+    * @param name The name to look up the [[ExternalCatalog]]
+    * @return The [[ExternalCatalog]]
+    */
+  def getRegisteredExternalCatalog(name: String): ExternalCatalog = {
+    this.externalCatalogs.get(name) match {
+      case Some(catalog) => catalog
+      case None => throw new ExternalCatalogNotExistException(name)
+    }
+  }
+
+  /**
     * Registers a [[ScalarFunction]] under a unique name. Replaces already 
existing
     * user-defined functions under this name.
     */
@@ -254,6 +289,7 @@ abstract class TableEnvironment(val config: TableConfig) {
     checkForInstantiation(function.getClass)
 
     // register in Table API
+
     functionCatalog.registerFunction(name, function.getClass)
 
     // register in SQL API
@@ -341,7 +377,7 @@ abstract class TableEnvironment(val config: TableConfig) {
   protected def replaceRegisteredTable(name: String, table: AbstractTable): 
Unit = {
 
     if (isRegistered(name)) {
-      tables.add(name, table)
+      rootSchema.add(name, table)
     } else {
       throw new TableException(s"Table \'$name\' is not registered.")
     }
@@ -350,19 +386,55 @@ abstract class TableEnvironment(val config: TableConfig) {
   /**
     * Scans a registered table and returns the resulting [[Table]].
     *
-    * The table to scan must be registered in the [[TableEnvironment]]'s 
catalog.
+    * A table to scan must be registered in the TableEnvironment. It can be 
either directly
+    * registered as DataStream, DataSet, or Table or as member of an 
[[ExternalCatalog]].
+    *
+    * Examples:
     *
-    * @param tableName The name of the table to scan.
-    * @throws ValidationException if no table is registered under the given 
name.
-    * @return The scanned table.
+    * - Scanning a directly registered table
+    * {{{
+    *   val tab: Table = tableEnv.scan("tableName")
+    * }}}
+    *
+    * - Scanning a table from a registered catalog
+    * {{{
+    *   val tab: Table = tableEnv.scan("catalogName", "dbName", "tableName")
+    * }}}
+    *
+    * @param tablePath The path of the table to scan.
+    * @throws TableException if no table is found using the given table path.
+    * @return The resulting [[Table]].
     */
-  @throws[ValidationException]
-  def scan(tableName: String): Table = {
-    if (isRegistered(tableName)) {
-      new Table(this, CatalogNode(tableName, getRowType(tableName)))
-    } else {
-      throw new TableException(s"Table \'$tableName\' was not found in the 
registry.")
+  @throws[TableException]
+  @varargs
+  def scan(tablePath: String*): Table = {
+    scanInternal(tablePath.toArray)
+  }
+
+  @throws[TableException]
+  private def scanInternal(tablePath: Array[String]): Table = {
+    require(tablePath != null && !tablePath.isEmpty, "tablePath must not be 
null or empty.")
+    val schemaPaths = tablePath.slice(0, tablePath.length - 1)
+    val schema = getSchema(schemaPaths)
+    if (schema != null) {
+      val tableName = tablePath(tablePath.length - 1)
+      val table = schema.getTable(tableName)
+      if (table != null) {
+        return new Table(this, CatalogNode(tablePath, 
table.getRowType(typeFactory)))
+      }
+    }
+    throw new TableException(s"Table \'${tablePath.mkString(".")}\' was not 
found.")
+  }
+
+  private def getSchema(schemaPath: Array[String]): SchemaPlus = {
+    var schema = rootSchema
+    for (schemaName <- schemaPath) {
+      schema = schema.getSubSchema(schemaName)
+      if (schema == null) {
+        return schema
+      }
     }
+    schema
   }
 
   /**
@@ -416,7 +488,7 @@ abstract class TableEnvironment(val config: TableConfig) {
       throw new TableException(s"Table \'$name\' already exists. " +
         s"Please, choose a different name.")
     } else {
-      tables.add(name, table)
+      rootSchema.add(name, table)
     }
   }
 
@@ -434,11 +506,11 @@ abstract class TableEnvironment(val config: TableConfig) {
     * @return true, if a table is registered under the name, false otherwise.
     */
   protected def isRegistered(name: String): Boolean = {
-    tables.getTableNames.contains(name)
+    rootSchema.getTableNames.contains(name)
   }
 
   protected def getRowType(name: String): RelDataType = {
-    tables.getTable(name).getRowType(typeFactory)
+    rootSchema.getTable(name).getRowType(typeFactory)
   }
 
   /** Returns a unique temporary attribute name. */

http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
index 8632436..760cf75 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
@@ -75,34 +75,34 @@ object ValidationException {
 case class UnresolvedException(msg: String) extends RuntimeException(msg)
 
 /**
-  * Exception for operation on a nonexistent table
+  * Exception for an operation on a nonexistent table
   *
   * @param db    database name
   * @param table table name
-  * @param cause
+  * @param cause the cause
   */
 case class TableNotExistException(
     db: String,
     table: String,
     cause: Throwable)
-    extends RuntimeException(s"table $db.$table does not exist!", cause) {
+    extends RuntimeException(s"Table $db.$table does not exist.", cause) {
 
   def this(db: String, table: String) = this(db, table, null)
 
 }
 
 /**
-  * Exception for adding an already existed table
+  * Exception for adding an already existent table
   *
   * @param db    database name
   * @param table table name
-  * @param cause
+  * @param cause the cause
   */
 case class TableAlreadyExistException(
     db: String,
     table: String,
     cause: Throwable)
-    extends RuntimeException(s"table $db.$table already exists!", cause) {
+    extends RuntimeException(s"Table $db.$table already exists.", cause) {
 
   def this(db: String, table: String) = this(db, table, null)
 
@@ -112,56 +112,84 @@ case class TableAlreadyExistException(
   * Exception for operation on a nonexistent database
   *
   * @param db database name
-  * @param cause
+  * @param cause the cause
   */
 case class DatabaseNotExistException(
     db: String,
     cause: Throwable)
-    extends RuntimeException(s"database $db does not exist!", cause) {
+    extends RuntimeException(s"Database $db does not exist.", cause) {
 
   def this(db: String) = this(db, null)
 }
 
 /**
-  * Exception for adding an already existed database
+  * Exception for adding an already existent database
   *
   * @param db database name
-  * @param cause
+  * @param cause the cause
   */
 case class DatabaseAlreadyExistException(
     db: String,
     cause: Throwable)
-    extends RuntimeException(s"database $db already exists!", cause) {
+    extends RuntimeException(s"Database $db already exists.", cause) {
 
   def this(db: String) = this(db, null)
 }
 
 /**
-  * Exception for does not find any matched [[TableSourceConverter]] for a 
specified table type
+  * Exception for not finding a [[TableSourceConverter]] for a given table 
type.
   *
   * @param tableType table type
-  * @param cause
+  * @param cause the cause
   */
 case class NoMatchedTableSourceConverterException(
     tableType: String,
     cause: Throwable)
-    extends RuntimeException(s"find no table source converter matched table 
type $tableType!",
+    extends RuntimeException(s"Could not find a TableSourceConverter for table 
type $tableType.",
       cause) {
 
   def this(tableType: String) = this(tableType, null)
 }
 
 /**
-  * Exception for find more than one matched [[TableSourceConverter]] for a 
specified table type
+  * Exception for finding more than one [[TableSourceConverter]] for a given 
table type.
   *
   * @param tableType table type
-  * @param cause
+  * @param cause the cause
   */
 case class AmbiguousTableSourceConverterException(
     tableType: String,
     cause: Throwable)
-    extends RuntimeException(s"more than one table source converter matched 
table type $tableType!",
+    extends RuntimeException(s"More than one TableSourceConverter for table 
type $tableType.",
       cause) {
 
   def this(tableType: String) = this(tableType, null)
 }
+
+/**
+  * Exception for operation on a nonexistent external catalog
+  *
+  * @param catalogName external catalog name
+  * @param cause the cause
+  */
+case class ExternalCatalogNotExistException(
+    catalogName: String,
+    cause: Throwable)
+    extends RuntimeException(s"External catalog $catalogName does not exist.", 
cause) {
+
+  def this(catalogName: String) = this(catalogName, null)
+}
+
+/**
+  * Exception for adding an already existent external catalog
+  *
+  * @param catalogName external catalog name
+  * @param cause the cause
+  */
+case class ExternalCatalogAlreadyExistException(
+    catalogName: String,
+    cause: Throwable)
+    extends RuntimeException(s"External catalog $catalogName already exists.", 
cause) {
+
+  def this(catalogName: String) = this(catalogName, null)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
index e3ed96e..8e010fa 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
@@ -136,20 +136,18 @@ class ExternalCatalogSchema(
 object ExternalCatalogSchema {
 
   /**
-    * Creates a FlinkExternalCatalogSchema.
+    * Registers an external catalog in a Calcite schema.
     *
-    * @param parentSchema              Parent schema
-    * @param externalCatalogIdentifier External catalog identifier
-    * @param externalCatalog           External catalog object
-    * @return Created schema
+    * @param parentSchema              Parent schema into which the catalog is 
registered
+    * @param externalCatalogIdentifier Identifier of the external catalog
+    * @param externalCatalog           The external catalog to register
     */
-  def create(
+  def registerCatalog(
       parentSchema: SchemaPlus,
       externalCatalogIdentifier: String,
-      externalCatalog: ExternalCatalog): ExternalCatalogSchema = {
+      externalCatalog: ExternalCatalog): Unit = {
     val newSchema = new ExternalCatalogSchema(externalCatalogIdentifier, 
externalCatalog)
     val schemaPlusOfNewSchema = parentSchema.add(externalCatalogIdentifier, 
newSchema)
     newSchema.registerSubSchemas(schemaPlusOfNewSchema)
-    newSchema
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 1b5eafb..559bd75 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -511,7 +511,7 @@ case class Join(
 }
 
 case class CatalogNode(
-    tableName: String,
+    tablePath: Array[String],
     rowType: RelDataType) extends LeafNode {
 
   val output: Seq[Attribute] = rowType.getFieldList.asScala.map { field =>
@@ -519,7 +519,7 @@ case class CatalogNode(
   }
 
   override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
-    relBuilder.scan(tableName)
+    relBuilder.scan(tablePath.toIterable.asJava)
   }
 
   override def validate(tableEnv: TableEnvironment): LogicalNode = this

http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
new file mode 100644
index 0000000..696468d
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table
+
+import org.apache.flink.table.utils.{CommonTestData, TableTestBase}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+/**
+  * Test for external catalog query plan.
+  */
+class ExternalCatalogTest extends TableTestBase {
+  private val table1Path: Array[String] = Array("test", "db1", "tb1")
+  private val table1ProjectedFields: Array[String] = Array("a", "b", "c")
+  private val table2Path: Array[String] = Array("test", "db2", "tb2")
+  private val table2ProjectedFields: Array[String] = Array("d", "e", "g")
+
+  @Test
+  def testBatchTableApi(): Unit = {
+    val util = batchTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+
+    val table1 = tEnv.scan("test", "db1", "tb1")
+    val table2 = tEnv.scan("test", "db2", "tb2")
+    val result = table2
+        .select('d * 2, 'e, 'g.upperCase())
+        .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+
+    val expected = binaryNode(
+      "DataSetUnion",
+      unaryNode(
+        "DataSetCalc",
+        sourceBatchTableNode(table2Path, table2ProjectedFields),
+        term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2")
+      ),
+      unaryNode(
+        "DataSetCalc",
+        sourceBatchTableNode(table1Path, table1ProjectedFields),
+        term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
+      ),
+      term("union", "_c0", "e", "_c2")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testBatchSQL(): Unit = {
+    val util = batchTestUtil()
+
+    util.tEnv.registerExternalCatalog("test", 
CommonTestData.getInMemoryTestCatalog)
+
+    val sqlQuery = "SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 UNION ALL 
" +
+        "(SELECT a * 2, b, c FROM test.db1.tb1)"
+
+    val expected = binaryNode(
+      "DataSetUnion",
+      unaryNode(
+        "DataSetCalc",
+        sourceBatchTableNode(table2Path, table2ProjectedFields),
+        term("select", "*(d, 2) AS EXPR$0", "e", "g"),
+        term("where", "<(d, 3)")),
+      unaryNode(
+        "DataSetCalc",
+        sourceBatchTableNode(table1Path, table1ProjectedFields),
+        term("select", "*(a, 2) AS EXPR$0", "b", "c")
+      ),
+      term("union", "EXPR$0", "e", "g"))
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testStreamTableApi(): Unit = {
+    val util = streamTestUtil()
+    val tEnv = util.tEnv
+
+    util.tEnv.registerExternalCatalog("test", 
CommonTestData.getInMemoryTestCatalog)
+
+    val table1 = tEnv.scan("test", "db1", "tb1")
+    val table2 = tEnv.scan("test", "db2", "tb2")
+
+    val result = table2.where("d < 3")
+        .select('d * 2, 'e, 'g.upperCase())
+        .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+
+    val expected = binaryNode(
+      "DataStreamUnion",
+      unaryNode(
+        "DataStreamCalc",
+        sourceStreamTableNode(table2Path, table2ProjectedFields),
+        term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2"),
+        term("where", "<(d, 3)")
+      ),
+      unaryNode(
+        "DataStreamCalc",
+        sourceStreamTableNode(table1Path, table1ProjectedFields),
+        term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
+      ),
+      term("union", "_c0", "e", "_c2")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testStreamSQL(): Unit = {
+    val util = streamTestUtil()
+
+    util.tEnv.registerExternalCatalog("test", 
CommonTestData.getInMemoryTestCatalog)
+
+    val sqlQuery = "SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 UNION ALL 
" +
+        "(SELECT a * 2, b, c FROM test.db1.tb1)"
+
+    val expected = binaryNode(
+      "DataStreamUnion",
+      unaryNode(
+        "DataStreamCalc",
+        sourceStreamTableNode(table2Path, table2ProjectedFields),
+        term("select", "*(d, 2) AS EXPR$0", "e", "g"),
+        term("where", "<(d, 3)")),
+      unaryNode(
+        "DataStreamCalc",
+        sourceStreamTableNode(table1Path, table1ProjectedFields),
+        term("select", "*(a, 2) AS EXPR$0", "b", "c")
+      ),
+      term("union", "EXPR$0", "e", "g"))
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  def sourceBatchTableNode(sourceTablePath: Array[String], fields: 
Array[String]): String = {
+    s"BatchTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " +
+        s"fields=[${fields.mkString(", ")}])"
+  }
+
+  def sourceStreamTableNode(sourceTablePath: Array[String], fields: 
Array[String]): String = {
+    s"StreamTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " +
+        s"fields=[${fields.mkString(", ")}])"
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
index 6ffa8c6..b780a3f 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
@@ -37,7 +37,7 @@ import scala.collection.JavaConverters._
 class ExternalCatalogSchemaTest {
 
   private val schemaName: String = "test"
-  private var externalCatalogSchema: ExternalCatalogSchema = _
+  private var externalCatalogSchema: SchemaPlus = _
   private var calciteCatalogReader: CalciteCatalogReader = _
   private val db = "db1"
   private val tb = "tb1"
@@ -46,7 +46,8 @@ class ExternalCatalogSchemaTest {
   def setUp(): Unit = {
     val rootSchemaPlus: SchemaPlus = CalciteSchema.createRootSchema(true, 
false).plus()
     val catalog = CommonTestData.getInMemoryTestCatalog
-    externalCatalogSchema = ExternalCatalogSchema.create(rootSchemaPlus, 
schemaName, catalog)
+    ExternalCatalogSchema.registerCatalog(rootSchemaPlus, schemaName, catalog)
+    externalCatalogSchema = rootSchemaPlus.getSubSchema("schemaName")
     val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem())
     calciteCatalogReader = new CalciteCatalogReader(
       CalciteSchema.from(rootSchemaPlus),

Reply via email to