This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 6eb6c66 [FLINK-23070][table-api-java] Introduce TableEnvironment#createTable 6eb6c66 is described below commit 6eb6c66b324f1026f3ad388e158aae6b22373824 Author: Ingo Bürk <ingo.bu...@tngtech.com> AuthorDate: Fri Jun 25 07:22:39 2021 +0200 [FLINK-23070][table-api-java] Introduce TableEnvironment#createTable This behaves much like the previously introduced #createTemporaryTable, but actually permanently stores the table in a catalog. This closes #16285. --- docs/content.zh/docs/dev/table/common.md | 11 +++-- docs/content/docs/dev/table/common.md | 11 +++-- .../table/tests/test_environment_completeness.py | 1 + .../apache/flink/table/api/TableEnvironment.java | 48 +++++++++++++++--- .../table/api/internal/TableEnvironmentImpl.java | 18 +++++++ .../flink/table/api/TableEnvironmentTest.java | 57 +++++++++++++++++++++- 6 files changed, 130 insertions(+), 16 deletions(-) diff --git a/docs/content.zh/docs/dev/table/common.md b/docs/content.zh/docs/dev/table/common.md index 6e4af0a..ee8c4b8 100644 --- a/docs/content.zh/docs/dev/table/common.md +++ b/docs/content.zh/docs/dev/table/common.md @@ -324,12 +324,15 @@ Such tables can either be created using the Table API directly, or by switching ```java // Using table descriptors -tableEnv.createTemporaryTable("Source", TableDescriptor.forConnector("datagen") +final TableDescriptor sourceDescriptor = TableDescriptor.forConnector("datagen") .schema(Schema.newBuilder() - .column("f0", DataTypes.STRING()) - .build()) - .option(DataGenOptions.ROWS_PER_SECOND, 100) + .column("f0", DataTypes.STRING()) .build()) + .option(DataGenOptions.ROWS_PER_SECOND, 100) + .build(); + +tableEnv.createTable("SourceTableA", sourceDescriptor); +tableEnv.createTemporaryTable("SourceTableB", sourceDescriptor); // Using SQL DDL tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)") diff --git a/docs/content/docs/dev/table/common.md b/docs/content/docs/dev/table/common.md index 2158670..0667f9e 100644 --- a/docs/content/docs/dev/table/common.md +++ b/docs/content/docs/dev/table/common.md @@ -333,12 +333,15 @@ Such tables can either be created using the Table API directly, or by switching ```java // Using table descriptors -tableEnv.createTemporaryTable("Source", TableDescriptor.forConnector("datagen") +final TableDescriptor sourceDescriptor = TableDescriptor.forConnector("datagen") .schema(Schema.newBuilder() - .column("f0", DataTypes.STRING()) - .build()) - .option(DataGenOptions.ROWS_PER_SECOND, 100) + .column("f0", DataTypes.STRING()) .build()) + .option(DataGenOptions.ROWS_PER_SECOND, 100) + .build(); + +tableEnv.createTable("SourceTableA", sourceDescriptor); +tableEnv.createTemporaryTable("SourceTableB", sourceDescriptor); // Using SQL DDL tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)") diff --git a/flink-python/pyflink/table/tests/test_environment_completeness.py b/flink-python/pyflink/table/tests/test_environment_completeness.py index 955880c..f07ca26 100644 --- a/flink-python/pyflink/table/tests/test_environment_completeness.py +++ b/flink-python/pyflink/table/tests/test_environment_completeness.py @@ -42,6 +42,7 @@ class EnvironmentAPICompletenessTests(PythonAPICompletenessTestCase, PyFlinkTest 'fromValues', 'create', 'createTemporaryTable', + 'createTable', 'createTemporarySystemFunction', 'dropTemporarySystemFunction', 'createFunction', diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java index 09b44c4..ce600e1 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.descriptors.ConnectTableDescriptor; import org.apache.flink.table.descriptors.ConnectorDescriptor; import org.apache.flink.table.expressions.Expression; @@ -54,12 +55,13 @@ import java.util.Optional; * <li>Offering further configuration options. * </ul> * - * <p>The syntax for path in methods such as {@link #createTemporaryView(String, Table)}is following - * [[catalog-name.]database-name.]object-name, where the catalog name and database are optional. For - * path resolution see {@link #useCatalog(String)} and {@link #useDatabase(String)}. + * <p>The syntax for path in methods such as {@link #createTemporaryView(String, Table)} is + * following {@code [[catalog-name.]database-name.]object-name}, where the catalog name and database + * are optional. For path resolution see {@link #useCatalog(String)} and {@link + * #useDatabase(String)}. * - * <p>Example: `cat.1`.`db`.`Table` resolves to an object named 'Table' in a catalog named 'cat.1' - * and database named 'db'. + * <p>Example: {@code `cat.1`.`db`.`Table`} resolves to an object named 'Table' in a catalog named + * 'cat.1' and database named 'db'. * * <p>Note: This environment is meant for pure table programs. If you would like to convert from or * to other Flink APIs, it might be necessary to use one of the available language-specific table @@ -585,7 +587,10 @@ public interface TableEnvironment { boolean dropTemporaryFunction(String path); /** - * Registers the {@link TableDescriptor} as a temporary table. + * Registers the given {@link TableDescriptor} as a temporary catalog table. + * + * <p>The {@link TableDescriptor descriptor} is converted into a {@link CatalogTable} and stored + * in the catalog. * * <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, * it will be inaccessible in the current session. To make the permanent object available again @@ -602,10 +607,41 @@ public interface TableEnvironment { * .option("fields.f0.kind", "random") * .build()); * }</pre> + * + * @param path The path under which the table will be registered. See also the {@link + * TableEnvironment} class description for the format of the path. + * @param descriptor Template for creating a {@link CatalogTable} instance. */ void createTemporaryTable(String path, TableDescriptor descriptor); /** + * Registers the given {@link TableDescriptor} as a catalog table. + * + * <p>The {@link TableDescriptor descriptor} is converted into a {@link CatalogTable} and stored + * in the catalog. + * + * <p>If the table should not be permanently stored in a catalog, use {@link + * #createTemporaryTable(String, TableDescriptor)} instead. + * + * <p>Examples: + * + * <pre>{@code + * tEnv.createTable("MyTable", TableDescriptor.forConnector("datagen") + * .schema(Schema.newBuilder() + * .column("f0", DataTypes.STRING()) + * .build()) + * .option(DataGenOptions.ROWS_PER_SECOND, 10) + * .option("fields.f0.kind", "random") + * .build()); + * }</pre> + * + * @param path The path under which the table will be registered. See also the {@link + * TableEnvironment} class description for the format of the path. + * @param descriptor Template for creating a {@link CatalogTable} instance. + */ + void createTable(String path, TableDescriptor descriptor); + + /** * Registers a {@link Table} under a unique name in the TableEnvironment's catalog. Registered * tables can be referenced in SQL queries. * diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 2f6a412..61367d4 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -500,6 +500,24 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { } @Override + public void createTable(String path, TableDescriptor descriptor) { + Preconditions.checkNotNull(path, "Path must not be null."); + Preconditions.checkNotNull(descriptor, "Table descriptor must not be null."); + + final ObjectIdentifier tableIdentifier = + catalogManager.qualifyIdentifier(getParser().parseIdentifier(path)); + + final CatalogTable catalogTable = + CatalogTable.of( + descriptor.getSchema(), + descriptor.getComment().orElse(null), + descriptor.getPartitionKeys(), + descriptor.getOptions()); + + catalogManager.createTable(catalogTable, tableIdentifier, false); + } + + @Override public void registerTable(String name, Table table) { UnresolvedIdentifier identifier = UnresolvedIdentifier.of(name); createTemporaryView(identifier, table); diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java index 2c92f3f..4f09642 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java @@ -23,7 +23,7 @@ import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.ObjectIdentifier; -import org.apache.flink.table.descriptors.Schema; +import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.utils.ConnectorDescriptorMock; import org.apache.flink.table.utils.FormatDescriptorMock; import org.apache.flink.table.utils.TableEnvironmentMock; @@ -34,8 +34,11 @@ import org.junit.Test; import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -51,7 +54,7 @@ public class TableEnvironmentTest { TableSourceFactoryMock.CONNECTOR_TYPE_VALUE, 1, true)) .withFormat(new FormatDescriptorMock("my_format", 1)) .withSchema( - new Schema() + new org.apache.flink.table.descriptors.Schema() .field("my_field_0", "INT") .field("my_field_1", "BOOLEAN") .field("my_part_1", "BIGINT") @@ -78,6 +81,56 @@ public class TableEnvironmentTest { assertCatalogTable(CatalogTableImpl.fromProperties(table.toProperties())); } + @Test + public void testCreateTemporaryTableFromDescriptor() { + final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); + final String catalog = tEnv.getCurrentCatalog(); + final String database = tEnv.getCurrentDatabase(); + + final Schema schema = Schema.newBuilder().column("f0", DataTypes.INT()).build(); + tEnv.createTemporaryTable( + "T", + TableDescriptor.forConnector("fake").schema(schema).option("a", "Test").build()); + + assertFalse( + tEnv.getCatalog(catalog) + .orElseThrow(AssertionError::new) + .tableExists(new ObjectPath(database, "T"))); + + final Optional<CatalogManager.TableLookupResult> lookupResult = + tEnv.getCatalogManager().getTable(ObjectIdentifier.of(catalog, database, "T")); + assertTrue(lookupResult.isPresent()); + + final CatalogBaseTable catalogTable = lookupResult.get().getTable(); + assertTrue(catalogTable instanceof CatalogTable); + assertEquals(schema, catalogTable.getUnresolvedSchema()); + assertEquals("fake", catalogTable.getOptions().get("connector")); + assertEquals("Test", catalogTable.getOptions().get("a")); + } + + @Test + public void testCreateTableFromDescriptor() throws Exception { + final TableEnvironmentMock tEnv = TableEnvironmentMock.getStreamingInstance(); + final String catalog = tEnv.getCurrentCatalog(); + final String database = tEnv.getCurrentDatabase(); + + final Schema schema = Schema.newBuilder().column("f0", DataTypes.INT()).build(); + tEnv.createTable( + "T", + TableDescriptor.forConnector("fake").schema(schema).option("a", "Test").build()); + + final ObjectPath objectPath = new ObjectPath(database, "T"); + assertTrue( + tEnv.getCatalog(catalog).orElseThrow(AssertionError::new).tableExists(objectPath)); + + final CatalogBaseTable catalogTable = + tEnv.getCatalog(catalog).orElseThrow(AssertionError::new).getTable(objectPath); + assertTrue(catalogTable instanceof CatalogTable); + assertEquals(schema, catalogTable.getUnresolvedSchema()); + assertEquals("fake", catalogTable.getOptions().get("connector")); + assertEquals("Test", catalogTable.getOptions().get("a")); + } + private static void assertCatalogTable(CatalogTable table) { assertThat( table.getSchema(),