godfreyhe commented on a change in pull request #11862: URL: https://github.com/apache/flink/pull/11862#discussion_r415500318
########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java ########## @@ -1113,4 +1113,61 @@ * </pre> */ FlatAggregateTable flatAggregate(Expression tableAggregateFunction); + + /** + * Writes the {@link Table} to a {@link TableSink} that was registered under the specified path, + * and then execute the insert operation. + * + * <p>See the documentation of {@link TableEnvironment#useDatabase(String)} or + * {@link TableEnvironment#useCatalog(String)} for the rules on the path resolution. + * + * <p>A batch {@link Table} can only be written to a + * {@code org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} requires a + * {@code org.apache.flink.table.sinks.AppendStreamTableSink}, a + * {@code org.apache.flink.table.sinks.RetractStreamTableSink}, or an + * {@code org.apache.flink.table.sinks.UpsertStreamTableSink}. + * + * <p>Example: + * + * <pre> + * {@code + * Table table = tableEnv.fromQuery("select * from MyTable"); + * TableResult tableResult = table.executeInsert("MySink"); + * tableResult... + * } + * </pre> + * + * @param tablePath The path of the registered TableSink to which the Table is written. + * @return The insert operation execution result. + */ + TableResult executeInsert(String tablePath); + + /** + * Writes the {@link Table} to a {@link TableSink} that was registered under the specified path, + * and then execute the insert operation. + * + * <p>See the documentation of {@link TableEnvironment#useDatabase(String)} or + * {@link TableEnvironment#useCatalog(String)} for the rules on the path resolution. + * + * <p>A batch {@link Table} can only be written to a + * {@code org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} requires a + * {@code org.apache.flink.table.sinks.AppendStreamTableSink}, a + * {@code org.apache.flink.table.sinks.RetractStreamTableSink}, or an + * {@code org.apache.flink.table.sinks.UpsertStreamTableSink}. + * + * <p>Example: + * + * <pre> + * {@code + * Table table = tableEnv.fromQuery("select * from MyTable"); + * TableResult tableResult = table.executeInsert("MySink", true); + * tableResult... + * } + * </pre> + * + * @param tablePath The path of the registered TableSink to which the Table is written. + * @param overwrite The flag that indicates whether the insert should overwrite existing data or not. + * @return The insert operation execution result. + */ + TableResult executeInsert(String tablePath, boolean overwrite); Review comment: > IIRC not all the table sinks support overwrite. The logic of dealing with `overwrite=true` seems not complete. both planners have checked whether a sink is an `OverwritableTableSink`. I will add some tests about override mode ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org