danny0405 commented on a change in pull request #8844: 
[FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
URL: https://github.com/apache/flink/pull/8844#discussion_r300222616
 
 

 ##########
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 ##########
 @@ -245,6 +247,57 @@ public String explain(Table table) {
                return planner.getCompletionHints(statement, position);
        }
 
+       @Override
+       public void sql(String statement) {
+               List<Operation> operations = planner.parse(statement);
+               operations.forEach(operation -> {
+                       if (operation instanceof CreateTableOperation) {
+                               CreateTableOperation operation1 = 
(CreateTableOperation) operation;
+                               registerTable(
+                                       operation1.getTablePath(),
+                                       operation1.getCatalogTable(),
+                                       operation1.isIgnoreIfExists());
+                       } else if (operation instanceof ModifyOperation) {
+                               queryConfigProvider.setConfig(new 
StreamQueryConfig());
+                               List<Transformation<?>> transformations =
+                                       
planner.translate(Collections.singletonList((ModifyOperation) operation));
+
+                               execEnv.apply(transformations);
+                       } else {
+                               throw new ValidationException(
+                                       "Unsupported SQL statement: sql() only 
accepts DDLs or Inserts.");
+                       }
+               });
+       }
+
+       /**
+        * Registers a {@link CatalogBaseTable} under a given object path. The 
{@code path} could be
+        * 3 formats:
+        * <ol>
+        *   <li>`catalog.db.table`: A full table path including the catalog 
name,
+        *   the database name and table name.</li>
+        *   <li>`db.table`: database name following table name, with the 
current catalog name.</li>
+        *   <li>`table`: Only the table name, with the current catalog name 
and database  name.</li>
+        * </ol>
+        * The registered tables then can be referenced in Sql queries.
+        *
+        * @param path           The path under which the table will be 
registered
+        * @param catalogTable   The table to register
+        * @param ignoreIfExists If true, do nothing if there is already same 
table name under
+        *                       the {@code path}. If false, a 
TableAlreadyExistException throws.
+        */
+       private void registerTable(String[] path, CatalogBaseTable 
catalogTable, boolean ignoreIfExists) {
 
 Review comment:
   merged into `registerTableInternal`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to