This is an automated email from the ASF dual-hosted git repository. guoyangze pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new dcb30f934a7 [FLINK-25015][Table SQL] Use SQL string as jobName for DQL jobs submitted by sql-gateway dcb30f934a7 is described below commit dcb30f934a7953e33d35460414c6ee57cb982ab3 Author: Xiangyu Feng <xiangyu...@gmail.com> AuthorDate: Sun Feb 18 14:56:26 2024 +0800 [FLINK-25015][Table SQL] Use SQL string as jobName for DQL jobs submitted by sql-gateway --- .../table/api/internal/TableEnvironmentImpl.java | 24 +++++++++---- .../api/QueryOperationSqlSerializationTest.java | 39 ++++++++++++++++++++++ 2 files changed, 57 insertions(+), 6 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 6548c03c4b9..c4a58d9d351 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -1065,14 +1065,9 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { QueryOperation operation, CollectModifyOperation sinkOperation, List<Transformation<?>> transformations) { - final String defaultJobName = "collect"; - resourceManager.addJarConfiguration(tableConfig); - // We pass only the configuration to avoid reconfiguration with the rootConfiguration - Pipeline pipeline = - execEnv.createPipeline( - transformations, tableConfig.getConfiguration(), defaultJobName); + Pipeline pipeline = generatePipelineFromQueryOperation(operation, transformations); try { JobClient jobClient = execEnv.executeAsync(pipeline); ResultProvider resultProvider = sinkOperation.getSelectResultProvider(); @@ -1185,6 +1180,23 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { } } + /** generate execution {@link Pipeline} from {@link QueryOperation}. */ + @VisibleForTesting + public Pipeline generatePipelineFromQueryOperation( + QueryOperation operation, List<Transformation<?>> transformations) { + String defaultJobName = "collect"; + + try { + defaultJobName = operation.asSerializableString(); + } catch (Throwable e) { + // ignore error for unsupported operations and use 'collect' as default job name + } + + // We pass only the configuration to avoid reconfiguration with the rootConfiguration + return execEnv.createPipeline( + transformations, tableConfig.getConfiguration(), defaultJobName); + } + /** * extract sink identifier names from {@link ModifyOperation}s and deduplicate them with {@link * #deduplicateSinkIdentifierNames(List)}. diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java index acd38d6adc8..1c9251608a3 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java @@ -18,6 +18,11 @@ package org.apache.flink.table.api; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.table.operations.CollectModifyOperation; +import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.test.program.SqlTestStep; import org.apache.flink.table.test.program.TableApiTestStep; import org.apache.flink.table.test.program.TableTestProgram; @@ -29,6 +34,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import java.util.Arrays; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.List; @@ -78,6 +84,39 @@ public class QueryOperationSqlSerializationTest implements TableTestProgramRunne assertThat(table.getQueryOperation().asSerializableString()).isEqualTo(sqlStep.sql); } + @ParameterizedTest + @MethodSource("supportedPrograms") + void testSqlAsJobNameForQueryOperation(TableTestProgram program) { + final TableEnvironmentImpl env = (TableEnvironmentImpl) setupEnv(program); + + final TableApiTestStep tableApiStep = + (TableApiTestStep) + program.runSteps.stream() + .filter(s -> s instanceof TableApiTestStep) + .findFirst() + .get(); + + final SqlTestStep sqlStep = + (SqlTestStep) + program.runSteps.stream() + .filter(s -> s instanceof SqlTestStep) + .findFirst() + .get(); + + final Table table = tableApiStep.toTable(env); + + QueryOperation queryOperation = table.getQueryOperation(); + CollectModifyOperation sinkOperation = new CollectModifyOperation(queryOperation); + List<Transformation<?>> transformations = + env.getPlanner().translate(Collections.singletonList(sinkOperation)); + + StreamGraph streamGraph = + (StreamGraph) + env.generatePipelineFromQueryOperation(queryOperation, transformations); + + assertThat(sqlStep.sql).isEqualTo(streamGraph.getJobName()); + } + private static TableEnvironment setupEnv(TableTestProgram program) { final TableEnvironment env = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); final Map<String, String> connectorOptions = new HashMap<>();