danny0405 commented on a change in pull request #8844: [FLINK-12951][table-planner] Add logic to bridge DDL to table source(… URL: https://github.com/apache/flink/pull/8844#discussion_r300222616
########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java ########## @@ -245,6 +247,57 @@ public String explain(Table table) { return planner.getCompletionHints(statement, position); } + @Override + public void sql(String statement) { + List<Operation> operations = planner.parse(statement); + operations.forEach(operation -> { + if (operation instanceof CreateTableOperation) { + CreateTableOperation operation1 = (CreateTableOperation) operation; + registerTable( + operation1.getTablePath(), + operation1.getCatalogTable(), + operation1.isIgnoreIfExists()); + } else if (operation instanceof ModifyOperation) { + queryConfigProvider.setConfig(new StreamQueryConfig()); + List<Transformation<?>> transformations = + planner.translate(Collections.singletonList((ModifyOperation) operation)); + + execEnv.apply(transformations); + } else { + throw new ValidationException( + "Unsupported SQL statement: sql() only accepts DDLs or Inserts."); + } + }); + } + + /** + * Registers a {@link CatalogBaseTable} under a given object path. The {@code path} could be + * 3 formats: + * <ol> + * <li>`catalog.db.table`: A full table path including the catalog name, + * the database name and table name.</li> + * <li>`db.table`: database name following table name, with the current catalog name.</li> + * <li>`table`: Only the table name, with the current catalog name and database name.</li> + * </ol> + * The registered tables then can be referenced in Sql queries. + * + * @param path The path under which the table will be registered + * @param catalogTable The table to register + * @param ignoreIfExists If true, do nothing if there is already same table name under + * the {@code path}. If false, a TableAlreadyExistException throws. + */ + private void registerTable(String[] path, CatalogBaseTable catalogTable, boolean ignoreIfExists) { Review comment: merged into `registerTableInternal` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services