JingsongLi commented on a change in pull request #9971: [FLINK-14490][table] 
Add methods for interacting with temporary objects
URL: https://github.com/apache/flink/pull/9971#discussion_r339393982
 
 

 ##########
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
 ##########
 @@ -60,37 +65,98 @@ public ConnectTableDescriptor withSchema(Schema schema) {
         * Searches for the specified table source, configures it accordingly, 
and registers it as
         * a table under the given name.
         *
+        * <p>Temporary objects can shadow permanent ones. If a permanent 
object in a given path exists, it will
+        * be inaccessible in the current session. To make the permanent object 
available again you can drop the
+        * corresponding temporary object.
+        *
         * @param name table name to be registered in the table environment
+        * @deprecated use {@link #createTemporaryTable(String)}
         */
+       @Deprecated
        public void registerTableSource(String name) {
                Preconditions.checkNotNull(name);
                TableSource<?> tableSource = 
TableFactoryUtil.findAndCreateTableSource(this);
-               tableEnv.registerTableSource(name, tableSource);
+               registration.createTableSource(name, tableSource);
        }
 
        /**
         * Searches for the specified table sink, configures it accordingly, 
and registers it as
         * a table under the given name.
         *
+        * <p>Temporary objects can shadow permanent ones. If a permanent 
object in a given path exists, it will
+        * be inaccessible in the current session. To make the permanent object 
available again you can drop the
+        * corresponding temporary object.
+        *
         * @param name table name to be registered in the table environment
+        * @deprecated use {@link #createTemporaryTable(String)}
         */
+       @Deprecated
        public void registerTableSink(String name) {
                Preconditions.checkNotNull(name);
                TableSink<?> tableSink = 
TableFactoryUtil.findAndCreateTableSink(this);
-               tableEnv.registerTableSink(name, tableSink);
+               registration.createTableSink(name, tableSink);
        }
 
        /**
         * Searches for the specified table source and sink, configures them 
accordingly, and registers
         * them as a table under the given name.
         *
+        * <p>Temporary objects can shadow permanent ones. If a permanent 
object in a given path exists, it will
+        * be inaccessible in the current session. To make the permanent object 
available again you can drop the
+        * corresponding temporary object.
+        *
         * @param name table name to be registered in the table environment
+        * @deprecated use {@link #createTemporaryTable(String)}
         */
+       @Deprecated
        public void registerTableSourceAndSink(String name) {
                registerTableSource(name);
                registerTableSink(name);
        }
 
+       /**
+        * Registers the table described by underlying properties in a given 
path.
+        *
+        * <p>There is no distinction between source and sink at the descriptor 
level anymore as this
+        * method does not perform actual class lookup. It only stores the 
underlying properties. The
+        * actual source/sink lookup is performed when the table is used.
+        *
+        * <p>Temporary objects can shadow permanent ones. If a permanent 
object in a given path exists, it will
+        * be inaccessible in the current session. To make the permanent object 
available again you can drop the
+        * corresponding temporary object.
+        *
+        * <p><b>NOTE:</b> The schema must be explicitly defined.
+        *
+        * @param path path where to register the temporary table
+        */
+       public void createTemporaryTable(String path) {
+               if (schemaDescriptor == null) {
+                       throw new TableException(
+                               "Table schema must be explicitly defined. To 
derive schema from the underlying connector" +
+                                       " use 
registerTableSource/registerTableSink/registerTableSourceAndSink.");
+               }
+
+               Map<String, String> schemaProperties = 
schemaDescriptor.toProperties();
+               TableSchema tableSchema = getTableSchema(schemaProperties);
+
+               Map<String, String> properties = new HashMap<>(toProperties());
+               schemaProperties.keySet().forEach(properties::remove);
+
+               CatalogTableImpl catalogTable = new CatalogTableImpl(
 
 Review comment:
   Maybe we can use `CatalogTableBuilder` here?
   Looks like there some bugs in `CatalogTableBuilder`, it not remove 
`schemaProperties` in `properties`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to