This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new dcb30f934a7 [FLINK-25015][Table SQL] Use SQL string as jobName for DQL 
jobs submitted by sql-gateway
dcb30f934a7 is described below

commit dcb30f934a7953e33d35460414c6ee57cb982ab3
Author: Xiangyu Feng <xiangyu...@gmail.com>
AuthorDate: Sun Feb 18 14:56:26 2024 +0800

    [FLINK-25015][Table SQL] Use SQL string as jobName for DQL jobs submitted 
by sql-gateway
---
 .../table/api/internal/TableEnvironmentImpl.java   | 24 +++++++++----
 .../api/QueryOperationSqlSerializationTest.java    | 39 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 6 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 6548c03c4b9..c4a58d9d351 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -1065,14 +1065,9 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
             QueryOperation operation,
             CollectModifyOperation sinkOperation,
             List<Transformation<?>> transformations) {
-        final String defaultJobName = "collect";
-
         resourceManager.addJarConfiguration(tableConfig);
 
-        // We pass only the configuration to avoid reconfiguration with the 
rootConfiguration
-        Pipeline pipeline =
-                execEnv.createPipeline(
-                        transformations, tableConfig.getConfiguration(), 
defaultJobName);
+        Pipeline pipeline = generatePipelineFromQueryOperation(operation, 
transformations);
         try {
             JobClient jobClient = execEnv.executeAsync(pipeline);
             ResultProvider resultProvider = 
sinkOperation.getSelectResultProvider();
@@ -1185,6 +1180,23 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
         }
     }
 
+    /** generate execution {@link Pipeline} from {@link QueryOperation}. */
+    @VisibleForTesting
+    public Pipeline generatePipelineFromQueryOperation(
+            QueryOperation operation, List<Transformation<?>> transformations) 
{
+        String defaultJobName = "collect";
+
+        try {
+            defaultJobName = operation.asSerializableString();
+        } catch (Throwable e) {
+            // ignore error for unsupported operations and use 'collect' as 
default job name
+        }
+
+        // We pass only the configuration to avoid reconfiguration with the 
rootConfiguration
+        return execEnv.createPipeline(
+                transformations, tableConfig.getConfiguration(), 
defaultJobName);
+    }
+
     /**
      * extract sink identifier names from {@link ModifyOperation}s and 
deduplicate them with {@link
      * #deduplicateSinkIdentifierNames(List)}.
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
index acd38d6adc8..1c9251608a3 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlSerializationTest.java
@@ -18,6 +18,11 @@
 
 package org.apache.flink.table.api;
 
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.operations.CollectModifyOperation;
+import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.test.program.SqlTestStep;
 import org.apache.flink.table.test.program.TableApiTestStep;
 import org.apache.flink.table.test.program.TableTestProgram;
@@ -29,6 +34,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
@@ -78,6 +84,39 @@ public class QueryOperationSqlSerializationTest implements 
TableTestProgramRunne
         
assertThat(table.getQueryOperation().asSerializableString()).isEqualTo(sqlStep.sql);
     }
 
+    @ParameterizedTest
+    @MethodSource("supportedPrograms")
+    void testSqlAsJobNameForQueryOperation(TableTestProgram program) {
+        final TableEnvironmentImpl env = (TableEnvironmentImpl) 
setupEnv(program);
+
+        final TableApiTestStep tableApiStep =
+                (TableApiTestStep)
+                        program.runSteps.stream()
+                                .filter(s -> s instanceof TableApiTestStep)
+                                .findFirst()
+                                .get();
+
+        final SqlTestStep sqlStep =
+                (SqlTestStep)
+                        program.runSteps.stream()
+                                .filter(s -> s instanceof SqlTestStep)
+                                .findFirst()
+                                .get();
+
+        final Table table = tableApiStep.toTable(env);
+
+        QueryOperation queryOperation = table.getQueryOperation();
+        CollectModifyOperation sinkOperation = new 
CollectModifyOperation(queryOperation);
+        List<Transformation<?>> transformations =
+                
env.getPlanner().translate(Collections.singletonList(sinkOperation));
+
+        StreamGraph streamGraph =
+                (StreamGraph)
+                        env.generatePipelineFromQueryOperation(queryOperation, 
transformations);
+
+        assertThat(sqlStep.sql).isEqualTo(streamGraph.getJobName());
+    }
+
     private static TableEnvironment setupEnv(TableTestProgram program) {
         final TableEnvironment env = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
         final Map<String, String> connectorOptions = new HashMap<>();

Reply via email to