This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6eb6c66  [FLINK-23070][table-api-java] Introduce 
TableEnvironment#createTable
6eb6c66 is described below

commit 6eb6c66b324f1026f3ad388e158aae6b22373824
Author: Ingo Bürk <ingo.bu...@tngtech.com>
AuthorDate: Fri Jun 25 07:22:39 2021 +0200

    [FLINK-23070][table-api-java] Introduce TableEnvironment#createTable
    
    This behaves much like the previously introduced #createTemporaryTable,
    but actually permanently stores the table in a catalog.
    
    This closes #16285.
---
 docs/content.zh/docs/dev/table/common.md           | 11 +++--
 docs/content/docs/dev/table/common.md              | 11 +++--
 .../table/tests/test_environment_completeness.py   |  1 +
 .../apache/flink/table/api/TableEnvironment.java   | 48 +++++++++++++++---
 .../table/api/internal/TableEnvironmentImpl.java   | 18 +++++++
 .../flink/table/api/TableEnvironmentTest.java      | 57 +++++++++++++++++++++-
 6 files changed, 130 insertions(+), 16 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/common.md 
b/docs/content.zh/docs/dev/table/common.md
index 6e4af0a..ee8c4b8 100644
--- a/docs/content.zh/docs/dev/table/common.md
+++ b/docs/content.zh/docs/dev/table/common.md
@@ -324,12 +324,15 @@ Such tables can either be created using the Table API 
directly, or by switching
 
 ```java
 // Using table descriptors
-tableEnv.createTemporaryTable("Source", TableDescriptor.forConnector("datagen")
+final TableDescriptor sourceDescriptor = 
TableDescriptor.forConnector("datagen")
     .schema(Schema.newBuilder()
-      .column("f0", DataTypes.STRING())
-      .build())
-    .option(DataGenOptions.ROWS_PER_SECOND, 100)
+    .column("f0", DataTypes.STRING())
     .build())
+    .option(DataGenOptions.ROWS_PER_SECOND, 100)
+    .build();
+
+tableEnv.createTable("SourceTableA", sourceDescriptor);
+tableEnv.createTemporaryTable("SourceTableB", sourceDescriptor);
 
 // Using SQL DDL
 tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")
diff --git a/docs/content/docs/dev/table/common.md 
b/docs/content/docs/dev/table/common.md
index 2158670..0667f9e 100644
--- a/docs/content/docs/dev/table/common.md
+++ b/docs/content/docs/dev/table/common.md
@@ -333,12 +333,15 @@ Such tables can either be created using the Table API 
directly, or by switching
 
 ```java
 // Using table descriptors
-tableEnv.createTemporaryTable("Source", TableDescriptor.forConnector("datagen")
+final TableDescriptor sourceDescriptor = 
TableDescriptor.forConnector("datagen")
     .schema(Schema.newBuilder()
-      .column("f0", DataTypes.STRING())
-      .build())
-    .option(DataGenOptions.ROWS_PER_SECOND, 100)
+    .column("f0", DataTypes.STRING())
     .build())
+    .option(DataGenOptions.ROWS_PER_SECOND, 100)
+    .build();
+
+tableEnv.createTable("SourceTableA", sourceDescriptor);
+tableEnv.createTemporaryTable("SourceTableB", sourceDescriptor);
 
 // Using SQL DDL
 tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")
diff --git a/flink-python/pyflink/table/tests/test_environment_completeness.py 
b/flink-python/pyflink/table/tests/test_environment_completeness.py
index 955880c..f07ca26 100644
--- a/flink-python/pyflink/table/tests/test_environment_completeness.py
+++ b/flink-python/pyflink/table/tests/test_environment_completeness.py
@@ -42,6 +42,7 @@ class 
EnvironmentAPICompletenessTests(PythonAPICompletenessTestCase, PyFlinkTest
             'fromValues',
             'create',
             'createTemporaryTable',
+            'createTable',
             'createTemporarySystemFunction',
             'dropTemporarySystemFunction',
             'createFunction',
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index 09b44c4..ce600e1 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.descriptors.ConnectTableDescriptor;
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.table.expressions.Expression;
@@ -54,12 +55,13 @@ import java.util.Optional;
  *   <li>Offering further configuration options.
  * </ul>
  *
- * <p>The syntax for path in methods such as {@link 
#createTemporaryView(String, Table)}is following
- * [[catalog-name.]database-name.]object-name, where the catalog name and 
database are optional. For
- * path resolution see {@link #useCatalog(String)} and {@link 
#useDatabase(String)}.
+ * <p>The syntax for path in methods such as {@link 
#createTemporaryView(String, Table)} is
+ * following {@code [[catalog-name.]database-name.]object-name}, where the 
catalog name and database
+ * are optional. For path resolution see {@link #useCatalog(String)} and {@link
+ * #useDatabase(String)}.
  *
- * <p>Example: `cat.1`.`db`.`Table` resolves to an object named 'Table' in a 
catalog named 'cat.1'
- * and database named 'db'.
+ * <p>Example: {@code `cat.1`.`db`.`Table`} resolves to an object named 
'Table' in a catalog named
+ * 'cat.1' and database named 'db'.
  *
  * <p>Note: This environment is meant for pure table programs. If you would 
like to convert from or
  * to other Flink APIs, it might be necessary to use one of the available 
language-specific table
@@ -585,7 +587,10 @@ public interface TableEnvironment {
     boolean dropTemporaryFunction(String path);
 
     /**
-     * Registers the {@link TableDescriptor} as a temporary table.
+     * Registers the given {@link TableDescriptor} as a temporary catalog 
table.
+     *
+     * <p>The {@link TableDescriptor descriptor} is converted into a {@link 
CatalogTable} and stored
+     * in the catalog.
      *
      * <p>Temporary objects can shadow permanent ones. If a permanent object 
in a given path exists,
      * it will be inaccessible in the current session. To make the permanent 
object available again
@@ -602,10 +607,41 @@ public interface TableEnvironment {
      *   .option("fields.f0.kind", "random")
      *   .build());
      * }</pre>
+     *
+     * @param path The path under which the table will be registered. See also 
the {@link
+     *     TableEnvironment} class description for the format of the path.
+     * @param descriptor Template for creating a {@link CatalogTable} instance.
      */
     void createTemporaryTable(String path, TableDescriptor descriptor);
 
     /**
+     * Registers the given {@link TableDescriptor} as a catalog table.
+     *
+     * <p>The {@link TableDescriptor descriptor} is converted into a {@link 
CatalogTable} and stored
+     * in the catalog.
+     *
+     * <p>If the table should not be permanently stored in a catalog, use 
{@link
+     * #createTemporaryTable(String, TableDescriptor)} instead.
+     *
+     * <p>Examples:
+     *
+     * <pre>{@code
+     * tEnv.createTable("MyTable", TableDescriptor.forConnector("datagen")
+     *   .schema(Schema.newBuilder()
+     *     .column("f0", DataTypes.STRING())
+     *     .build())
+     *   .option(DataGenOptions.ROWS_PER_SECOND, 10)
+     *   .option("fields.f0.kind", "random")
+     *   .build());
+     * }</pre>
+     *
+     * @param path The path under which the table will be registered. See also 
the {@link
+     *     TableEnvironment} class description for the format of the path.
+     * @param descriptor Template for creating a {@link CatalogTable} instance.
+     */
+    void createTable(String path, TableDescriptor descriptor);
+
+    /**
      * Registers a {@link Table} under a unique name in the TableEnvironment's 
catalog. Registered
      * tables can be referenced in SQL queries.
      *
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 2f6a412..61367d4 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -500,6 +500,24 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
     }
 
     @Override
+    public void createTable(String path, TableDescriptor descriptor) {
+        Preconditions.checkNotNull(path, "Path must not be null.");
+        Preconditions.checkNotNull(descriptor, "Table descriptor must not be 
null.");
+
+        final ObjectIdentifier tableIdentifier =
+                
catalogManager.qualifyIdentifier(getParser().parseIdentifier(path));
+
+        final CatalogTable catalogTable =
+                CatalogTable.of(
+                        descriptor.getSchema(),
+                        descriptor.getComment().orElse(null),
+                        descriptor.getPartitionKeys(),
+                        descriptor.getOptions());
+
+        catalogManager.createTable(catalogTable, tableIdentifier, false);
+    }
+
+    @Override
     public void registerTable(String name, Table table) {
         UnresolvedIdentifier identifier = UnresolvedIdentifier.of(name);
         createTemporaryView(identifier, table);
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java
index 2c92f3f..4f09642 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableEnvironmentTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.descriptors.Schema;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.utils.ConnectorDescriptorMock;
 import org.apache.flink.table.utils.FormatDescriptorMock;
 import org.apache.flink.table.utils.TableEnvironmentMock;
@@ -34,8 +34,11 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
@@ -51,7 +54,7 @@ public class TableEnvironmentTest {
                                 TableSourceFactoryMock.CONNECTOR_TYPE_VALUE, 
1, true))
                 .withFormat(new FormatDescriptorMock("my_format", 1))
                 .withSchema(
-                        new Schema()
+                        new org.apache.flink.table.descriptors.Schema()
                                 .field("my_field_0", "INT")
                                 .field("my_field_1", "BOOLEAN")
                                 .field("my_part_1", "BIGINT")
@@ -78,6 +81,56 @@ public class TableEnvironmentTest {
         
assertCatalogTable(CatalogTableImpl.fromProperties(table.toProperties()));
     }
 
+    @Test
+    public void testCreateTemporaryTableFromDescriptor() {
+        final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+        final String catalog = tEnv.getCurrentCatalog();
+        final String database = tEnv.getCurrentDatabase();
+
+        final Schema schema = Schema.newBuilder().column("f0", 
DataTypes.INT()).build();
+        tEnv.createTemporaryTable(
+                "T",
+                
TableDescriptor.forConnector("fake").schema(schema).option("a", 
"Test").build());
+
+        assertFalse(
+                tEnv.getCatalog(catalog)
+                        .orElseThrow(AssertionError::new)
+                        .tableExists(new ObjectPath(database, "T")));
+
+        final Optional<CatalogManager.TableLookupResult> lookupResult =
+                tEnv.getCatalogManager().getTable(ObjectIdentifier.of(catalog, 
database, "T"));
+        assertTrue(lookupResult.isPresent());
+
+        final CatalogBaseTable catalogTable = lookupResult.get().getTable();
+        assertTrue(catalogTable instanceof CatalogTable);
+        assertEquals(schema, catalogTable.getUnresolvedSchema());
+        assertEquals("fake", catalogTable.getOptions().get("connector"));
+        assertEquals("Test", catalogTable.getOptions().get("a"));
+    }
+
+    @Test
+    public void testCreateTableFromDescriptor() throws Exception {
+        final TableEnvironmentMock tEnv = 
TableEnvironmentMock.getStreamingInstance();
+        final String catalog = tEnv.getCurrentCatalog();
+        final String database = tEnv.getCurrentDatabase();
+
+        final Schema schema = Schema.newBuilder().column("f0", 
DataTypes.INT()).build();
+        tEnv.createTable(
+                "T",
+                
TableDescriptor.forConnector("fake").schema(schema).option("a", 
"Test").build());
+
+        final ObjectPath objectPath = new ObjectPath(database, "T");
+        assertTrue(
+                
tEnv.getCatalog(catalog).orElseThrow(AssertionError::new).tableExists(objectPath));
+
+        final CatalogBaseTable catalogTable =
+                
tEnv.getCatalog(catalog).orElseThrow(AssertionError::new).getTable(objectPath);
+        assertTrue(catalogTable instanceof CatalogTable);
+        assertEquals(schema, catalogTable.getUnresolvedSchema());
+        assertEquals("fake", catalogTable.getOptions().get("connector"));
+        assertEquals("Test", catalogTable.getOptions().get("a"));
+    }
+
     private static void assertCatalogTable(CatalogTable table) {
         assertThat(
                 table.getSchema(),

Reply via email to