[FLINK-5570] [table] Register ExternalCatalogs in TableEnvironment. This closes #3409.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/135a57c4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/135a57c4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/135a57c4 Branch: refs/heads/table-retraction Commit: 135a57c4bb37eaa9cb85faaff1cc694f9448fabd Parents: 976e03c Author: jingzhang <beyond1...@126.com> Authored: Thu Mar 16 11:24:09 2017 +0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Fri Mar 24 20:19:17 2017 +0100 ---------------------------------------------------------------------- docs/dev/table_api.md | 37 +++++ .../flink/table/api/TableEnvironment.scala | 104 ++++++++++-- .../org/apache/flink/table/api/exceptions.scala | 62 +++++-- .../table/catalog/ExternalCatalogSchema.scala | 14 +- .../flink/table/plan/logical/operators.scala | 4 +- .../flink/table/ExternalCatalogTest.scala | 161 +++++++++++++++++++ .../catalog/ExternalCatalogSchemaTest.scala | 5 +- 7 files changed, 342 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/docs/dev/table_api.md ---------------------------------------------------------------------- diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index 03b916c..117f32f 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -344,6 +344,43 @@ tableEnvironment.unregisterTable("Customers") </div> </div> +Registering external Catalogs +-------------------------------- + +An external catalog is defined by the `ExternalCatalog` interface and provides information about databases and tables such as their name, schema, statistics, and access information. An `ExternalCatalog` is registered in a `TableEnvironment` as follows: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +// works for StreamExecutionEnvironment identically +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + +ExternalCatalog customerCatalog = new InMemoryExternalCatalog(); + +// register the ExternalCatalog customerCatalog +tableEnv.registerExternalCatalog("Customers", customerCatalog); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +// works for StreamExecutionEnvironment identically +val env = ExecutionEnvironment.getExecutionEnvironment +val tableEnv = TableEnvironment.getTableEnvironment(env) + +val customerCatalog: ExternalCatalog = new InMemoryExternalCatalog + +// register the ExternalCatalog customerCatalog +tableEnv.registerExternalCatalog("Customers", customerCatalog) + +{% endhighlight %} +</div> +</div> + +Once registered in a `TableEnvironment`, all tables defined in a `ExternalCatalog` can be accessed from Table API or SQL queries by specifying their full path (`catalog`.`database`.`table`). + +Currently, Flink provides an `InMemoryExternalCatalog` for demo and testing purposes. However, the `ExternalCatalog` interface can also be used to connect catalogs like HCatalog or Metastore to the Table API. Table API ---------- http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 1dda3a8..bb4c3ac 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -46,6 +46,7 @@ import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => Scala import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv} import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv} import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem} +import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema} import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer} import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference} import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions} @@ -60,6 +61,8 @@ import org.apache.flink.table.validate.FunctionCatalog import org.apache.flink.types.Row import _root_.scala.collection.JavaConverters._ +import _root_.scala.collection.mutable.HashMap +import _root_.scala.annotation.varargs /** * The abstract base class for batch and stream TableEnvironments. @@ -71,7 +74,7 @@ abstract class TableEnvironment(val config: TableConfig) { // the catalog to hold all registered and translated tables // we disable caching here to prevent side effects private val internalSchema: CalciteSchema = CalciteSchema.createRootSchema(true, false) - private val tables: SchemaPlus = internalSchema.plus() + private val rootSchema: SchemaPlus = internalSchema.plus() // Table API/SQL function catalog private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns @@ -79,7 +82,7 @@ abstract class TableEnvironment(val config: TableConfig) { // the configuration to create a Calcite planner private lazy val frameworkConfig: FrameworkConfig = Frameworks .newConfigBuilder - .defaultSchema(tables) + .defaultSchema(rootSchema) .parserConfig(getSqlParserConfig) .costFactory(new DataSetCostFactory) .typeSystem(new FlinkTypeSystem) @@ -99,6 +102,9 @@ abstract class TableEnvironment(val config: TableConfig) { // a counter for unique attribute names private[flink] val attrNameCntr: AtomicInteger = new AtomicInteger(0) + // registered external catalog names -> catalog + private val externalCatalogs = new HashMap[String, ExternalCatalog] + /** Returns the table config to define the runtime behavior of the Table API. */ def getConfig = config @@ -246,6 +252,35 @@ abstract class TableEnvironment(val config: TableConfig) { } /** + * Registers an [[ExternalCatalog]] under a unique name in the TableEnvironment's schema. + * All tables registered in the [[ExternalCatalog]] can be accessed. + * + * @param name The name under which the externalCatalog will be registered + * @param externalCatalog The externalCatalog to register + */ + def registerExternalCatalog(name: String, externalCatalog: ExternalCatalog): Unit = { + if (rootSchema.getSubSchema(name) != null) { + throw new ExternalCatalogAlreadyExistException(name) + } + this.externalCatalogs.put(name, externalCatalog) + // create an external catalog calicte schema, register it on the root schema + ExternalCatalogSchema.registerCatalog(rootSchema, name, externalCatalog) + } + + /** + * Gets a registered [[ExternalCatalog]] by name. + * + * @param name The name to look up the [[ExternalCatalog]] + * @return The [[ExternalCatalog]] + */ + def getRegisteredExternalCatalog(name: String): ExternalCatalog = { + this.externalCatalogs.get(name) match { + case Some(catalog) => catalog + case None => throw new ExternalCatalogNotExistException(name) + } + } + + /** * Registers a [[ScalarFunction]] under a unique name. Replaces already existing * user-defined functions under this name. */ @@ -254,6 +289,7 @@ abstract class TableEnvironment(val config: TableConfig) { checkForInstantiation(function.getClass) // register in Table API + functionCatalog.registerFunction(name, function.getClass) // register in SQL API @@ -341,7 +377,7 @@ abstract class TableEnvironment(val config: TableConfig) { protected def replaceRegisteredTable(name: String, table: AbstractTable): Unit = { if (isRegistered(name)) { - tables.add(name, table) + rootSchema.add(name, table) } else { throw new TableException(s"Table \'$name\' is not registered.") } @@ -350,19 +386,55 @@ abstract class TableEnvironment(val config: TableConfig) { /** * Scans a registered table and returns the resulting [[Table]]. * - * The table to scan must be registered in the [[TableEnvironment]]'s catalog. + * A table to scan must be registered in the TableEnvironment. It can be either directly + * registered as DataStream, DataSet, or Table or as member of an [[ExternalCatalog]]. + * + * Examples: * - * @param tableName The name of the table to scan. - * @throws ValidationException if no table is registered under the given name. - * @return The scanned table. + * - Scanning a directly registered table + * {{{ + * val tab: Table = tableEnv.scan("tableName") + * }}} + * + * - Scanning a table from a registered catalog + * {{{ + * val tab: Table = tableEnv.scan("catalogName", "dbName", "tableName") + * }}} + * + * @param tablePath The path of the table to scan. + * @throws TableException if no table is found using the given table path. + * @return The resulting [[Table]]. */ - @throws[ValidationException] - def scan(tableName: String): Table = { - if (isRegistered(tableName)) { - new Table(this, CatalogNode(tableName, getRowType(tableName))) - } else { - throw new TableException(s"Table \'$tableName\' was not found in the registry.") + @throws[TableException] + @varargs + def scan(tablePath: String*): Table = { + scanInternal(tablePath.toArray) + } + + @throws[TableException] + private def scanInternal(tablePath: Array[String]): Table = { + require(tablePath != null && !tablePath.isEmpty, "tablePath must not be null or empty.") + val schemaPaths = tablePath.slice(0, tablePath.length - 1) + val schema = getSchema(schemaPaths) + if (schema != null) { + val tableName = tablePath(tablePath.length - 1) + val table = schema.getTable(tableName) + if (table != null) { + return new Table(this, CatalogNode(tablePath, table.getRowType(typeFactory))) + } + } + throw new TableException(s"Table \'${tablePath.mkString(".")}\' was not found.") + } + + private def getSchema(schemaPath: Array[String]): SchemaPlus = { + var schema = rootSchema + for (schemaName <- schemaPath) { + schema = schema.getSubSchema(schemaName) + if (schema == null) { + return schema + } } + schema } /** @@ -416,7 +488,7 @@ abstract class TableEnvironment(val config: TableConfig) { throw new TableException(s"Table \'$name\' already exists. " + s"Please, choose a different name.") } else { - tables.add(name, table) + rootSchema.add(name, table) } } @@ -434,11 +506,11 @@ abstract class TableEnvironment(val config: TableConfig) { * @return true, if a table is registered under the name, false otherwise. */ protected def isRegistered(name: String): Boolean = { - tables.getTableNames.contains(name) + rootSchema.getTableNames.contains(name) } protected def getRowType(name: String): RelDataType = { - tables.getTable(name).getRowType(typeFactory) + rootSchema.getTable(name).getRowType(typeFactory) } /** Returns a unique temporary attribute name. */ http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala index 8632436..760cf75 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala @@ -75,34 +75,34 @@ object ValidationException { case class UnresolvedException(msg: String) extends RuntimeException(msg) /** - * Exception for operation on a nonexistent table + * Exception for an operation on a nonexistent table * * @param db database name * @param table table name - * @param cause + * @param cause the cause */ case class TableNotExistException( db: String, table: String, cause: Throwable) - extends RuntimeException(s"table $db.$table does not exist!", cause) { + extends RuntimeException(s"Table $db.$table does not exist.", cause) { def this(db: String, table: String) = this(db, table, null) } /** - * Exception for adding an already existed table + * Exception for adding an already existent table * * @param db database name * @param table table name - * @param cause + * @param cause the cause */ case class TableAlreadyExistException( db: String, table: String, cause: Throwable) - extends RuntimeException(s"table $db.$table already exists!", cause) { + extends RuntimeException(s"Table $db.$table already exists.", cause) { def this(db: String, table: String) = this(db, table, null) @@ -112,56 +112,84 @@ case class TableAlreadyExistException( * Exception for operation on a nonexistent database * * @param db database name - * @param cause + * @param cause the cause */ case class DatabaseNotExistException( db: String, cause: Throwable) - extends RuntimeException(s"database $db does not exist!", cause) { + extends RuntimeException(s"Database $db does not exist.", cause) { def this(db: String) = this(db, null) } /** - * Exception for adding an already existed database + * Exception for adding an already existent database * * @param db database name - * @param cause + * @param cause the cause */ case class DatabaseAlreadyExistException( db: String, cause: Throwable) - extends RuntimeException(s"database $db already exists!", cause) { + extends RuntimeException(s"Database $db already exists.", cause) { def this(db: String) = this(db, null) } /** - * Exception for does not find any matched [[TableSourceConverter]] for a specified table type + * Exception for not finding a [[TableSourceConverter]] for a given table type. * * @param tableType table type - * @param cause + * @param cause the cause */ case class NoMatchedTableSourceConverterException( tableType: String, cause: Throwable) - extends RuntimeException(s"find no table source converter matched table type $tableType!", + extends RuntimeException(s"Could not find a TableSourceConverter for table type $tableType.", cause) { def this(tableType: String) = this(tableType, null) } /** - * Exception for find more than one matched [[TableSourceConverter]] for a specified table type + * Exception for finding more than one [[TableSourceConverter]] for a given table type. * * @param tableType table type - * @param cause + * @param cause the cause */ case class AmbiguousTableSourceConverterException( tableType: String, cause: Throwable) - extends RuntimeException(s"more than one table source converter matched table type $tableType!", + extends RuntimeException(s"More than one TableSourceConverter for table type $tableType.", cause) { def this(tableType: String) = this(tableType, null) } + +/** + * Exception for operation on a nonexistent external catalog + * + * @param catalogName external catalog name + * @param cause the cause + */ +case class ExternalCatalogNotExistException( + catalogName: String, + cause: Throwable) + extends RuntimeException(s"External catalog $catalogName does not exist.", cause) { + + def this(catalogName: String) = this(catalogName, null) +} + +/** + * Exception for adding an already existent external catalog + * + * @param catalogName external catalog name + * @param cause the cause + */ +case class ExternalCatalogAlreadyExistException( + catalogName: String, + cause: Throwable) + extends RuntimeException(s"External catalog $catalogName already exists.", cause) { + + def this(catalogName: String) = this(catalogName, null) +} http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala index e3ed96e..8e010fa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala @@ -136,20 +136,18 @@ class ExternalCatalogSchema( object ExternalCatalogSchema { /** - * Creates a FlinkExternalCatalogSchema. + * Registers an external catalog in a Calcite schema. * - * @param parentSchema Parent schema - * @param externalCatalogIdentifier External catalog identifier - * @param externalCatalog External catalog object - * @return Created schema + * @param parentSchema Parent schema into which the catalog is registered + * @param externalCatalogIdentifier Identifier of the external catalog + * @param externalCatalog The external catalog to register */ - def create( + def registerCatalog( parentSchema: SchemaPlus, externalCatalogIdentifier: String, - externalCatalog: ExternalCatalog): ExternalCatalogSchema = { + externalCatalog: ExternalCatalog): Unit = { val newSchema = new ExternalCatalogSchema(externalCatalogIdentifier, externalCatalog) val schemaPlusOfNewSchema = parentSchema.add(externalCatalogIdentifier, newSchema) newSchema.registerSubSchemas(schemaPlusOfNewSchema) - newSchema } } http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala index 1b5eafb..559bd75 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala @@ -511,7 +511,7 @@ case class Join( } case class CatalogNode( - tableName: String, + tablePath: Array[String], rowType: RelDataType) extends LeafNode { val output: Seq[Attribute] = rowType.getFieldList.asScala.map { field => @@ -519,7 +519,7 @@ case class CatalogNode( } override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { - relBuilder.scan(tableName) + relBuilder.scan(tablePath.toIterable.asJava) } override def validate(tableEnv: TableEnvironment): LogicalNode = this http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala new file mode 100644 index 0000000..696468d --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table + +import org.apache.flink.table.utils.{CommonTestData, TableTestBase} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestUtil._ +import org.junit.Test + +/** + * Test for external catalog query plan. + */ +class ExternalCatalogTest extends TableTestBase { + private val table1Path: Array[String] = Array("test", "db1", "tb1") + private val table1ProjectedFields: Array[String] = Array("a", "b", "c") + private val table2Path: Array[String] = Array("test", "db2", "tb2") + private val table2ProjectedFields: Array[String] = Array("d", "e", "g") + + @Test + def testBatchTableApi(): Unit = { + val util = batchTestUtil() + val tEnv = util.tEnv + + tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog) + + val table1 = tEnv.scan("test", "db1", "tb1") + val table2 = tEnv.scan("test", "db2", "tb2") + val result = table2 + .select('d * 2, 'e, 'g.upperCase()) + .unionAll(table1.select('a * 2, 'b, 'c.upperCase())) + + val expected = binaryNode( + "DataSetUnion", + unaryNode( + "DataSetCalc", + sourceBatchTableNode(table2Path, table2ProjectedFields), + term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2") + ), + unaryNode( + "DataSetCalc", + sourceBatchTableNode(table1Path, table1ProjectedFields), + term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2") + ), + term("union", "_c0", "e", "_c2") + ) + + util.verifyTable(result, expected) + } + + @Test + def testBatchSQL(): Unit = { + val util = batchTestUtil() + + util.tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog) + + val sqlQuery = "SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 UNION ALL " + + "(SELECT a * 2, b, c FROM test.db1.tb1)" + + val expected = binaryNode( + "DataSetUnion", + unaryNode( + "DataSetCalc", + sourceBatchTableNode(table2Path, table2ProjectedFields), + term("select", "*(d, 2) AS EXPR$0", "e", "g"), + term("where", "<(d, 3)")), + unaryNode( + "DataSetCalc", + sourceBatchTableNode(table1Path, table1ProjectedFields), + term("select", "*(a, 2) AS EXPR$0", "b", "c") + ), + term("union", "EXPR$0", "e", "g")) + + util.verifySql(sqlQuery, expected) + } + + @Test + def testStreamTableApi(): Unit = { + val util = streamTestUtil() + val tEnv = util.tEnv + + util.tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog) + + val table1 = tEnv.scan("test", "db1", "tb1") + val table2 = tEnv.scan("test", "db2", "tb2") + + val result = table2.where("d < 3") + .select('d * 2, 'e, 'g.upperCase()) + .unionAll(table1.select('a * 2, 'b, 'c.upperCase())) + + val expected = binaryNode( + "DataStreamUnion", + unaryNode( + "DataStreamCalc", + sourceStreamTableNode(table2Path, table2ProjectedFields), + term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2"), + term("where", "<(d, 3)") + ), + unaryNode( + "DataStreamCalc", + sourceStreamTableNode(table1Path, table1ProjectedFields), + term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2") + ), + term("union", "_c0", "e", "_c2") + ) + + util.verifyTable(result, expected) + } + + @Test + def testStreamSQL(): Unit = { + val util = streamTestUtil() + + util.tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog) + + val sqlQuery = "SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 UNION ALL " + + "(SELECT a * 2, b, c FROM test.db1.tb1)" + + val expected = binaryNode( + "DataStreamUnion", + unaryNode( + "DataStreamCalc", + sourceStreamTableNode(table2Path, table2ProjectedFields), + term("select", "*(d, 2) AS EXPR$0", "e", "g"), + term("where", "<(d, 3)")), + unaryNode( + "DataStreamCalc", + sourceStreamTableNode(table1Path, table1ProjectedFields), + term("select", "*(a, 2) AS EXPR$0", "b", "c") + ), + term("union", "EXPR$0", "e", "g")) + + util.verifySql(sqlQuery, expected) + } + + def sourceBatchTableNode(sourceTablePath: Array[String], fields: Array[String]): String = { + s"BatchTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " + + s"fields=[${fields.mkString(", ")}])" + } + + def sourceStreamTableNode(sourceTablePath: Array[String], fields: Array[String]): String = { + s"StreamTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " + + s"fields=[${fields.mkString(", ")}])" + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/135a57c4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala index 6ffa8c6..b780a3f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala @@ -37,7 +37,7 @@ import scala.collection.JavaConverters._ class ExternalCatalogSchemaTest { private val schemaName: String = "test" - private var externalCatalogSchema: ExternalCatalogSchema = _ + private var externalCatalogSchema: SchemaPlus = _ private var calciteCatalogReader: CalciteCatalogReader = _ private val db = "db1" private val tb = "tb1" @@ -46,7 +46,8 @@ class ExternalCatalogSchemaTest { def setUp(): Unit = { val rootSchemaPlus: SchemaPlus = CalciteSchema.createRootSchema(true, false).plus() val catalog = CommonTestData.getInMemoryTestCatalog - externalCatalogSchema = ExternalCatalogSchema.create(rootSchemaPlus, schemaName, catalog) + ExternalCatalogSchema.registerCatalog(rootSchemaPlus, schemaName, catalog) + externalCatalogSchema = rootSchemaPlus.getSubSchema("schemaName") val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem()) calciteCatalogReader = new CalciteCatalogReader( CalciteSchema.from(rootSchemaPlus),