This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1aa1b8c6e6d4628b01ff5503f3a680152680caeb Author: godfreyhe <[email protected]> AuthorDate: Tue Nov 17 23:32:07 2020 +0800 [FLINK-18545][table] Specify job name by `pipeline.name` for sql job (cherry picked from commit 88d0ee002c38f8a17895d4485703787a04f67c50) --- .../apache/flink/table/api/internal/TableEnvironmentImpl.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 8ab6974..a6a3c51 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -23,6 +23,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.api.dag.Transformation; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; @@ -673,7 +674,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { public TableResult executeInternal(List<ModifyOperation> operations) { List<Transformation<?>> transformations = translate(operations); List<String> sinkIdentifierNames = extractSinkIdentifierNames(operations); - String jobName = "insert-into_" + String.join(",", sinkIdentifierNames); + String jobName = getJobName("insert-into_" + String.join(",", sinkIdentifierNames)); Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, jobName); try { JobClient jobClient = execEnv.executeAsync(pipeline); @@ -700,7 +701,8 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { public TableResult executeInternal(QueryOperation operation) { SelectSinkOperation sinkOperation = new SelectSinkOperation(operation); List<Transformation<?>> transformations = translate(Collections.singletonList(sinkOperation)); - Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, "collect"); + String jobName = getJobName("collect"); + Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, jobName); try { JobClient jobClient = execEnv.executeAsync(pipeline); SelectResultProvider resultProvider = sinkOperation.getSelectResultProvider(); @@ -1170,6 +1172,10 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { ).collect(Collectors.toList()); } + private String getJobName(String defaultJobName) { + return tableConfig.getConfiguration().getString(PipelineOptions.NAME, defaultJobName); + } + /** Get catalog from catalogName or throw a ValidationException if the catalog not exists. */ private Catalog getCatalogOrThrowException(String catalogName) { return getCatalog(catalogName)
