This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push: new c727999 [FLINK-16375][table] Reintroduce registerTable[Source/Sink] methods for table env c727999 is described below commit c72799933a0d5ac02dfb9437bafa1b5ef38619f9 Author: Kurt Young <k...@apache.org> AuthorDate: Thu Jun 11 23:23:09 2020 +0800 [FLINK-16375][table] Reintroduce registerTable[Source/Sink] methods for table env Since we don't have a good alternative solution for easily register user defined source/sink/factory, we need these methods back, for now. This closes #12603 --- .../apache/flink/table/api/TableEnvironment.java | 50 ++++++++++++++++++++++ .../table/api/internal/TableEnvironmentImpl.java | 27 ++++++++++++ .../planner/plan/stream/sql/LegacySinkTest.scala | 18 ++++---- .../plan/stream/table/LegacyTableSourceTest.scala | 24 +++++------ .../flink/table/api/internal/TableEnvImpl.scala | 38 ++++++++++++++++ .../table/api/stream/table/TableSourceTest.scala | 24 +++++------ .../runtime/batch/sql/TableEnvironmentITCase.scala | 22 +++++----- .../flink/table/utils/MockTableEnvironment.scala | 12 +++++- 8 files changed, 170 insertions(+), 45 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java index d5f5ce0..614b6e7 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java @@ -21,6 +21,7 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.descriptors.ConnectTableDescriptor; @@ -546,6 +547,55 @@ public interface TableEnvironment { void createTemporaryView(String path, Table view); /** + * Registers an external {@link TableSource} in this {@link TableEnvironment}'s catalog. + * Registered tables can be referenced in SQL queries. + * + * <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will + * be inaccessible in the current session. To make the permanent object available again one can drop the + * corresponding temporary object. + * + * @param name The name under which the {@link TableSource} is registered. + * @param tableSource The {@link TableSource} to register. + * @deprecated Use {@link #connect(ConnectorDescriptor)} instead. + */ + @Deprecated + void registerTableSource(String name, TableSource<?> tableSource); + + /** + * Registers an external {@link TableSink} with given field names and types in this + * {@link TableEnvironment}'s catalog. + * Registered sink tables can be referenced in SQL DML statements. + * + * <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will + * be inaccessible in the current session. To make the permanent object available again one can drop the + * corresponding temporary object. + * + * @param name The name under which the {@link TableSink} is registered. + * @param fieldNames The field names to register with the {@link TableSink}. + * @param fieldTypes The field types to register with the {@link TableSink}. + * @param tableSink The {@link TableSink} to register. + * @deprecated Use {@link #connect(ConnectorDescriptor)} instead. + */ + @Deprecated + void registerTableSink(String name, String[] fieldNames, TypeInformation<?>[] fieldTypes, TableSink<?> tableSink); + + /** + * Registers an external {@link TableSink} with already configured field names and field types in + * this {@link TableEnvironment}'s catalog. + * Registered sink tables can be referenced in SQL DML statements. + * + * <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, it will + * be inaccessible in the current session. To make the permanent object available again one can drop the + * corresponding temporary object. + * + * @param name The name under which the {@link TableSink} is registered. + * @param configuredSink The configured {@link TableSink} to register. + * @deprecated Use {@link #connect(ConnectorDescriptor)} instead. + */ + @Deprecated + void registerTableSink(String name, TableSink<?> configuredSink); + + /** * Scans a registered table and returns the resulting {@link Table}. * * <p>A table to scan must be registered in the {@link TableEnvironment}. It can be either directly diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 6edce4a..cd47f35 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -21,6 +21,7 @@ package org.apache.flink.table.api.internal; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.api.dag.Transformation; import org.apache.flink.core.execution.JobClient; @@ -444,6 +445,32 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { } @Override + public void registerTableSource(String name, TableSource<?> tableSource) { + // only accept StreamTableSource and LookupableTableSource here + // TODO should add a validation, while StreamTableSource is in flink-table-api-java-bridge module now + registerTableSourceInternal(name, tableSource); + } + + @Override + public void registerTableSink( + String name, + String[] fieldNames, + TypeInformation<?>[] fieldTypes, + TableSink<?> tableSink) { + registerTableSink(name, tableSink.configure(fieldNames, fieldTypes)); + } + + @Override + public void registerTableSink(String name, TableSink<?> configuredSink) { + // validate + if (configuredSink.getTableSchema().getFieldCount() == 0) { + throw new TableException("Table schema cannot be empty."); + } + + registerTableSinkInternal(name, configuredSink); + } + + @Override public Table scan(String... tablePath) { UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(tablePath); return scanInternal(unresolvedIdentifier) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala index 599e4e9..50f83a9 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala @@ -53,14 +53,14 @@ class LegacySinkTest extends TableTestBase { util.tableEnv.createTemporaryView("TempTable", table) val retractSink = util.createRetractTableSink(Array("cnt"), Array(LONG)) - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal( + util.tableEnv.registerTableSink( "retractSink1", retractSink) stmtSet.addInsert("retractSink1", table) val table2 = util.tableEnv.sqlQuery( "SELECT cnt, SUM(cnt) OVER (ORDER BY PROCTIME()) FROM TempTable") val retractSink2 = util.createRetractTableSink(Array("cnt", "total"), Array(LONG, LONG)) - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal( + util.tableEnv.registerTableSink( "retractSink2", retractSink2) stmtSet.addInsert("retractSink2", table2) @@ -143,13 +143,13 @@ class LegacySinkTest extends TableTestBase { val table1 = util.tableEnv.sqlQuery("SELECT b, cnt FROM TempTable WHERE b < 4") val retractSink = util.createRetractTableSink(Array("b", "cnt"), Array(LONG, LONG)) - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal( + util.tableEnv.registerTableSink( "retractSink", retractSink) stmtSet.addInsert("retractSink", table1) val table2 = util.tableEnv.sqlQuery("SELECT b, cnt FROM TempTable WHERE b >= 4 AND b < 6") val upsertSink = util.createUpsertTableSink(Array(), Array("b", "cnt"), Array(LONG, LONG)) - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal( + util.tableEnv.registerTableSink( "upsertSink", upsertSink) stmtSet.addInsert("upsertSink", table2) @@ -165,13 +165,13 @@ class LegacySinkTest extends TableTestBase { val table1 = util.tableEnv.sqlQuery( "SELECT cnt, COUNT(b) AS frequency FROM TempTable WHERE b < 4 GROUP BY cnt") val upsertSink1 = util.createUpsertTableSink(Array(0), Array("b", "cnt"), Array(LONG, LONG)) - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal( + util.tableEnv.registerTableSink( "upsertSink1", upsertSink1) stmtSet.addInsert("upsertSink1", table1) val table2 = util.tableEnv.sqlQuery("SELECT b, cnt FROM TempTable WHERE b >= 4 AND b < 6") val upsertSink2 = util.createUpsertTableSink(Array(), Array("b", "cnt"), Array(LONG, LONG)) - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal( + util.tableEnv.registerTableSink( "upsertSink2", upsertSink2) stmtSet.addInsert("upsertSink2", table2) @@ -189,7 +189,7 @@ class LegacySinkTest extends TableTestBase { util.tableEnv.registerTable("TempTable", table) val appendSink = util.createAppendTableSink(Array("a", "b"), Array(INT, LONG)) - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal( + util.tableEnv.registerTableSink( "appendSink", appendSink) stmtSet.addInsert("appendSink", table) @@ -199,13 +199,13 @@ class LegacySinkTest extends TableTestBase { val table2 = util.tableEnv.sqlQuery("SELECT SUM(a) AS total_sum FROM TempTable1") val retractSink = util.createRetractTableSink(Array("total_sum"), Array(INT)) - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal( + util.tableEnv.registerTableSink( "retractSink", retractSink) stmtSet.addInsert("retractSink", table2) val table3 = util.tableEnv.sqlQuery("SELECT MIN(a) AS total_min FROM TempTable1") val upsertSink = util.createUpsertTableSink(Array(), Array("total_min"), Array(INT)) - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal( + util.tableEnv.registerTableSink( "upsertSink", upsertSink) stmtSet.addInsert("upsertSink", table3) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.scala index 72d2c11..fa35afe 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/LegacyTableSourceTest.scala @@ -41,7 +41,7 @@ class LegacyTableSourceTest extends TableTestBase { Array("id", "rowtime", "val", "name")) val util = streamTestUtil() - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal( + util.tableEnv.registerTableSource( "rowTimeT", new TestTableSourceWithTime[Row]( false, tableSchema, returnType, Seq(), rowtime = "rowtime")) @@ -62,7 +62,7 @@ class LegacyTableSourceTest extends TableTestBase { Array("id", "rowtime", "val", "name")) val util = streamTestUtil() - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal( + util.tableEnv.registerTableSource( "rowTimeT", new TestTableSourceWithTime[Row]( false, tableSchema, returnType, Seq(), rowtime = "rowtime")) @@ -83,7 +83,7 @@ class LegacyTableSourceTest extends TableTestBase { Array("id", "rowtime", "val", "name")) val util = streamTestUtil() - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal( + util.tableEnv.registerTableSource( "rowTimeT", new TestTableSourceWithTime[Row]( false, tableSchema, returnType, Seq(), rowtime = "rowtime")) @@ -107,7 +107,7 @@ class LegacyTableSourceTest extends TableTestBase { Array("id", "val", "name")) val util = streamTestUtil() - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal( + util.tableEnv.registerTableSource( "procTimeT", new TestTableSourceWithTime[Row]( false, tableSchema, returnType, Seq(), proctime = "proctime")) @@ -127,7 +127,7 @@ class LegacyTableSourceTest extends TableTestBase { Array("id", "val", "name")) val util = streamTestUtil() - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal( + util.tableEnv.registerTableSource( "procTimeT", new TestTableSourceWithTime[Row]( false, tableSchema, returnType, Seq(), proctime = "proctime")) @@ -150,7 +150,7 @@ class LegacyTableSourceTest extends TableTestBase { Array("id", "name", "val", "rtime")) val util = streamTestUtil() - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal( + util.tableEnv.registerTableSource( "T", new TestLegacyProjectableTableSource( false, tableSchema, returnType, Seq(), "rtime", "ptime")) @@ -170,7 +170,7 @@ class LegacyTableSourceTest extends TableTestBase { Array("id", "name", "val", "rtime")) val util = streamTestUtil() - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal( + util.tableEnv.registerTableSource( "T", new TestLegacyProjectableTableSource( false, tableSchema, returnType, Seq(), "rtime", "ptime")) @@ -190,7 +190,7 @@ class LegacyTableSourceTest extends TableTestBase { Array("id", "rtime", "val", "name")) val util = streamTestUtil() - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal( + util.tableEnv.registerTableSource( "T", new TestLegacyProjectableTableSource( false, tableSchema, returnType, Seq(), "rtime", "ptime")) @@ -210,7 +210,7 @@ class LegacyTableSourceTest extends TableTestBase { Array("id", "rtime", "val", "name")) val util = streamTestUtil() - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal( + util.tableEnv.registerTableSource( "T", new TestLegacyProjectableTableSource( false, tableSchema, returnType, Seq(), "rtime", "ptime")) @@ -230,7 +230,7 @@ class LegacyTableSourceTest extends TableTestBase { Array("id", "rtime", "val", "name")) val util = streamTestUtil() - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal( + util.tableEnv.registerTableSource( "T", new TestLegacyProjectableTableSource( false, tableSchema, returnType, Seq(), "rtime", "ptime")) @@ -251,7 +251,7 @@ class LegacyTableSourceTest extends TableTestBase { val mapping = Map("rtime" -> "p-rtime", "id" -> "p-id", "val" -> "p-val", "name" -> "p-name") val util = streamTestUtil() - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal( + util.tableEnv.registerTableSource( "T", new TestLegacyProjectableTableSource( false, tableSchema, returnType, Seq(), "rtime", "ptime", mapping)) @@ -287,7 +287,7 @@ class LegacyTableSourceTest extends TableTestBase { Array("id", "deepNested", "nested", "name")) val util = streamTestUtil() - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal( + util.tableEnv.registerTableSource( "T", new TestNestedProjectableTableSource( false, tableSchema, returnType, Seq())) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index 905a031..16f04e1 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -322,6 +322,44 @@ abstract class TableEnvImpl( false) } + override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = { + validateTableSource(tableSource) + registerTableSourceInternal(name, tableSource) + } + + override def registerTableSink( + name: String, + fieldNames: Array[String], + fieldTypes: Array[TypeInformation[_]], + tableSink: TableSink[_]): Unit = { + + if (fieldNames == null) { + throw new TableException("fieldNames must not be null.") + } + if (fieldTypes == null) { + throw new TableException("fieldTypes must not be null.") + } + if (fieldNames.length == 0) { + throw new TableException("fieldNames must not be empty.") + } + if (fieldNames.length != fieldTypes.length) { + throw new TableException("Same number of field names and types required.") + } + + val configuredSink = tableSink.configure(fieldNames, fieldTypes) + registerTableSinkInternal(name, configuredSink) + } + + override def registerTableSink(name: String, configuredSink: TableSink[_]): Unit = { + // validate + if (configuredSink.getTableSchema.getFieldNames.length == 0) { + throw new TableException("Field names must not be empty.") + } + + validateTableSink(configuredSink) + registerTableSinkInternal(name, configuredSink) + } + override def fromTableSource(source: TableSource[_]): Table = { createTable(new TableSourceQueryOperation(source, isBatchTable)) } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala index 8c2d426..75367d5 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala @@ -42,7 +42,7 @@ class TableSourceTest extends TableTestBase { Array("id", "rowtime", "val", "name")) val util = streamTestUtil() - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal( + util.tableEnv.registerTableSource( "rowTimeT", new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), rowtime = "rowtime")) @@ -66,7 +66,7 @@ class TableSourceTest extends TableTestBase { Array("id", "rowtime", "val", "name")) val util = streamTestUtil() - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal( + util.tableEnv.registerTableSource( "rowTimeT", new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), rowtime = "rowtime")) @@ -90,7 +90,7 @@ class TableSourceTest extends TableTestBase { Array("id", "rowtime", "val", "name")) val util = streamTestUtil() - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal( + util.tableEnv.registerTableSource( "rowTimeT", new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), rowtime = "rowtime")) @@ -133,7 +133,7 @@ class TableSourceTest extends TableTestBase { Array("id", "val", "name")) val util = streamTestUtil() - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal( + util.tableEnv.registerTableSource( "procTimeT", new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), proctime = "proctime")) @@ -161,7 +161,7 @@ class TableSourceTest extends TableTestBase { Array("id", "val", "name")) val util = streamTestUtil() - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal( + util.tableEnv.registerTableSource( "procTimeT", new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), proctime = "proctime")) @@ -200,7 +200,7 @@ class TableSourceTest extends TableTestBase { Array("id", "name", "val", "rtime")) val util = streamTestUtil() - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal( + util.tableEnv.registerTableSource( "T", new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime")) @@ -223,7 +223,7 @@ class TableSourceTest extends TableTestBase { Array("id", "name", "val", "rtime")) val util = streamTestUtil() - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal( + util.tableEnv.registerTableSource( "T", new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime")) @@ -249,7 +249,7 @@ class TableSourceTest extends TableTestBase { Array("id", "rtime", "val", "name")) val util = streamTestUtil() - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal( + util.tableEnv.registerTableSource( "T", new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime")) @@ -271,7 +271,7 @@ class TableSourceTest extends TableTestBase { Array("id", "rtime", "val", "name")) val util = streamTestUtil() - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal( + util.tableEnv.registerTableSource( "T", new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime")) @@ -293,7 +293,7 @@ class TableSourceTest extends TableTestBase { Array("id", "rtime", "val", "name")) val util = streamTestUtil() - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal( + util.tableEnv.registerTableSource( "T", new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime")) @@ -317,7 +317,7 @@ class TableSourceTest extends TableTestBase { val mapping = Map("rtime" -> "p-rtime", "id" -> "p-id", "val" -> "p-val", "name" -> "p-name") val util = streamTestUtil() - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal( + util.tableEnv.registerTableSource( "T", new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime", mapping)) @@ -356,7 +356,7 @@ class TableSourceTest extends TableTestBase { Array("id", "deepNested", "nested", "name")) val util = streamTestUtil() - util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal( + util.tableEnv.registerTableSource( "T", new TestNestedProjectableTableSource(tableSchema, returnType, Seq())) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala index c68a7c7..30db7ed 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala @@ -150,7 +150,7 @@ class TableEnvironmentITCase( val fieldNames = Array("d", "e", "f") val fieldTypes = tEnv.scan("sourceTable").getSchema.getFieldTypes val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink - tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal( + tEnv.registerTableSink( "targetTable", sink.configure(fieldNames, fieldTypes)) val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable" @@ -173,7 +173,7 @@ class TableEnvironmentITCase( val fieldNames = Array("d", "e", "f") val fieldTypes = tEnv.scan("sourceTable").getSchema.getFieldTypes val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink - tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal( + tEnv.registerTableSink( "targetTable", sink.configure(fieldNames, fieldTypes)) val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable" @@ -220,7 +220,7 @@ class TableEnvironmentITCase( val fieldNames = Array("d", "e", "f", "g") val fieldTypes = tEnv.scan("sourceTable").getSchema.getFieldTypes val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink - tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal( + tEnv.registerTableSink( "targetTable", sink.configure(fieldNames, fieldTypes)) val sql = "INSERT INTO targetTable SELECT * FROM sourceTable where id > 7" @@ -259,7 +259,7 @@ class TableEnvironmentITCase( val fieldNames = Array("d", "e", "f") val fieldTypes = tEnv.scan("sourceTable").getSchema.getFieldTypes val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink - tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal( + tEnv.registerTableSink( "targetTable", sink.configure(fieldNames, fieldTypes)) val result = tEnv.sqlQuery("SELECT c, b, a FROM sourceTable").select('a.avg, 'b.sum, 'c.count) @@ -294,7 +294,7 @@ class TableEnvironmentITCase( val fieldNames = Array("d", "e", "f") val fieldTypes = tEnv.scan("sourceTable").getSchema.getFieldTypes val sink1 = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink - tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal( + tEnv.registerTableSink( "targetTable", sink1.configure(fieldNames, fieldTypes)) val tableResult = tEnv.executeSql("INSERT INTO targetTable SELECT a, b, c FROM sourceTable") @@ -319,7 +319,7 @@ class TableEnvironmentITCase( val sinkPath = resultFile.getAbsolutePath val configuredSink = new TestingOverwritableTableSink(sinkPath) .configure(Array("d"), Array(STRING)) - tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal("MySink", configuredSink) + tEnv.registerTableSink("MySink", configuredSink) val tableResult1 = tEnv.executeSql("INSERT overwrite MySink SELECT c FROM sourceTable") checkInsertTableResult(tableResult1, "default_catalog.default_database.MySink") @@ -354,7 +354,7 @@ class TableEnvironmentITCase( val fieldNames = Array("d", "e", "f") val fieldTypes = tEnv.scan("sourceTable").getSchema.getFieldTypes val sink1 = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink - tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal( + tEnv.registerTableSink( "targetTable", sink1.configure(fieldNames, fieldTypes)) val sink1Path = registerCsvTableSink(tEnv, fieldNames, fieldTypes, "MySink1") @@ -392,7 +392,7 @@ class TableEnvironmentITCase( val fieldNames = Array("d", "e", "f") val fieldTypes = tEnv.scan("sourceTable").getSchema.getFieldTypes val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink - tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal( + tEnv.registerTableSink( "targetTable", sink.configure(fieldNames, fieldTypes)) val result = tEnv.sqlQuery("SELECT c, b, a FROM sourceTable").select('a.avg, 'b.sum, 'c.count) @@ -431,7 +431,7 @@ class TableEnvironmentITCase( val fieldNames = Array("d", "e", "f") val fieldTypes = tEnv.scan("sourceTable").getSchema.getFieldTypes val sink1 = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink - tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal( + tEnv.registerTableSink( "targetTable", sink1.configure(fieldNames, fieldTypes)) val table = tEnv.sqlQuery("SELECT a, b, c FROM sourceTable") @@ -457,7 +457,7 @@ class TableEnvironmentITCase( val sinkPath = resultFile.getAbsolutePath val configuredSink = new TestingOverwritableTableSink(sinkPath) .configure(Array("d"), Array(STRING)) - tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal("MySink", configuredSink) + tEnv.registerTableSink("MySink", configuredSink) val tableResult1 = tEnv.sqlQuery("SELECT c FROM sourceTable").executeInsert("MySink", true) checkInsertTableResult(tableResult1, "default_catalog.default_database.MySink") @@ -543,7 +543,7 @@ class TableEnvironmentITCase( val sinkPath = _tempFolder.newFile().getAbsolutePath val configuredSink = new TestingOverwritableTableSink(sinkPath) .configure(Array("d", "e", "f"), Array(INT, LONG, STRING)) - tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal("MySink", configuredSink) + tEnv.registerTableSink("MySink", configuredSink) assertTrue(FileUtils.readFileUtf8(new File(sinkPath)).isEmpty) val stmtSet = tEnv.createStatementSet() diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala index 0f69ff3..d608765 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala @@ -20,14 +20,15 @@ package org.apache.flink.table.utils import java.lang.{Iterable => JIterable} import java.util.Optional - import org.apache.flink.api.common.JobExecutionResult +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.api.{ExplainDetail, StatementSet, Table, TableConfig, TableEnvironment, TableResult} import org.apache.flink.table.catalog.Catalog import org.apache.flink.table.descriptors.{ConnectTableDescriptor, ConnectorDescriptor} import org.apache.flink.table.expressions.Expression import org.apache.flink.table.functions.{ScalarFunction, UserDefinedFunction} import org.apache.flink.table.module.Module +import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.sources.TableSource import org.apache.flink.table.types.AbstractDataType @@ -39,6 +40,15 @@ class MockTableEnvironment extends TableEnvironment { override def registerTable(name: String, table: Table): Unit = ??? + override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = ??? + + override def registerTableSink( + name: String, + fieldNames: Array[String], + fieldTypes: Array[TypeInformation[_]], tableSink: TableSink[_]): Unit = ??? + + override def registerTableSink(name: String, configuredSink: TableSink[_]): Unit = ??? + override def scan(tablePath: String*): Table = ??? override def connect(connectorDescriptor: ConnectorDescriptor): ConnectTableDescriptor = ???