This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 44528e0ee9f [FLINK-35194][table] Support describe job statement for 
SqlGateway
44528e0ee9f is described below

commit 44528e0ee9fbed11b5417253534078d60fed3a12
Author: xuyang <xyzhong...@163.com>
AuthorDate: Fri Apr 26 20:29:56 2024 +0800

    [FLINK-35194][table] Support describe job statement for SqlGateway
    
    This closes #24728
---
 .../service/operation/OperationExecutor.java       | 53 +++++++++++++++++
 .../gateway/service/SqlGatewayServiceITCase.java   | 51 +++++++++++++++++
 .../src/main/codegen/data/Parser.tdd               |  5 +-
 .../src/main/codegen/includes/parserImpls.ftl      | 18 ++++++
 .../flink/sql/parser/dql/SqlDescribeJob.java       | 66 ++++++++++++++++++++++
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |  6 ++
 .../operations/command/DescribeJobOperation.java   | 52 +++++++++++++++++
 .../converters/SqlDescribeJobConverter.java        | 32 +++++++++++
 .../operations/converters/SqlNodeConverters.java   |  1 +
 .../table/planner/calcite/FlinkPlannerImpl.scala   |  3 +-
 10 files changed, 285 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index 945265089c3..c50ba8c2bbf 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -83,6 +83,7 @@ import 
org.apache.flink.table.operations.StatementSetOperation;
 import org.apache.flink.table.operations.UnloadModuleOperation;
 import org.apache.flink.table.operations.UseOperation;
 import org.apache.flink.table.operations.command.AddJarOperation;
+import org.apache.flink.table.operations.command.DescribeJobOperation;
 import org.apache.flink.table.operations.command.ExecutePlanOperation;
 import org.apache.flink.table.operations.command.RemoveJarOperation;
 import org.apache.flink.table.operations.command.ResetOperation;
@@ -481,6 +482,8 @@ public class OperationExecutor {
             return callStopJobOperation(tableEnv, handle, (StopJobOperation) 
op);
         } else if (op instanceof ShowJobsOperation) {
             return callShowJobsOperation(tableEnv, handle, (ShowJobsOperation) 
op);
+        } else if (op instanceof DescribeJobOperation) {
+            return callDescribeJobOperation(tableEnv, handle, 
(DescribeJobOperation) op);
         } else if (op instanceof RemoveJarOperation) {
             return callRemoveJar(handle, ((RemoveJarOperation) op).getPath());
         } else if (op instanceof AddJarOperation
@@ -774,6 +777,56 @@ public class OperationExecutor {
                 resultRows);
     }
 
+    public ResultFetcher callDescribeJobOperation(
+            TableEnvironmentInternal tableEnv,
+            OperationHandle operationHandle,
+            DescribeJobOperation describeJobOperation)
+            throws SqlExecutionException {
+        Configuration configuration = tableEnv.getConfig().getConfiguration();
+        Duration clientTimeout = 
configuration.get(ClientOptions.CLIENT_TIMEOUT);
+        String jobId = describeJobOperation.getJobId();
+        Optional<JobStatusMessage> jobStatusOp =
+                runClusterAction(
+                        configuration,
+                        operationHandle,
+                        clusterClient -> {
+                            try {
+                                JobID expectedJobId = 
JobID.fromHexString(jobId);
+                                return clusterClient.listJobs()
+                                        .get(clientTimeout.toMillis(), 
TimeUnit.MILLISECONDS)
+                                        .stream()
+                                        .filter(job -> 
expectedJobId.equals(job.getJobId()))
+                                        .findFirst();
+                            } catch (Exception e) {
+                                throw new SqlExecutionException(
+                                        String.format(
+                                                "Failed to get job %s in the 
cluster.", jobId),
+                                        e);
+                            }
+                        });
+
+        if (!jobStatusOp.isPresent()) {
+            throw new SqlExecutionException(
+                    String.format("Described job %s does not exist in the 
cluster.", jobId));
+        }
+        JobStatusMessage job = jobStatusOp.get();
+
+        RowData resultRow =
+                GenericRowData.of(
+                        StringData.fromString(jobId),
+                        StringData.fromString(job.getJobName()),
+                        StringData.fromString(job.getJobState().toString()),
+                        DateTimeUtils.toTimestampData(job.getStartTime(), 3));
+        return ResultFetcher.fromResults(
+                operationHandle,
+                ResolvedSchema.of(
+                        Column.physical(JOB_ID, DataTypes.STRING()),
+                        Column.physical(JOB_NAME, DataTypes.STRING()),
+                        Column.physical(STATUS, DataTypes.STRING()),
+                        Column.physical(START_TIME, 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())),
+                Collections.singletonList(resultRow));
+    }
+
     /**
      * Retrieves the {@link ClusterClient} from the session and runs the given 
{@link ClusterAction}
      * against it.
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index 55aa16cb25a..012c4aed166 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -511,6 +511,57 @@ public class SqlGatewayServiceITCase {
                 .isBetween(timeOpStart, timeOpSucceed);
     }
 
+    @Test
+    void testDescribeJobOperation(@InjectClusterClient RestClusterClient<?> 
restClusterClient)
+            throws Exception {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+        Configuration configuration = new 
Configuration(MINI_CLUSTER.getClientConfiguration());
+
+        String pipelineName = "test-describe-job";
+        configuration.set(PipelineOptions.NAME, pipelineName);
+
+        // running jobs
+        String sourceDdl = "CREATE TABLE source (a STRING) WITH 
('connector'='datagen');";
+        String sinkDdl = "CREATE TABLE sink (a STRING) WITH 
('connector'='blackhole');";
+        String insertSql = "INSERT INTO sink SELECT * FROM source;";
+
+        service.executeStatement(sessionHandle, sourceDdl, -1, configuration);
+        service.executeStatement(sessionHandle, sinkDdl, -1, configuration);
+
+        long timeOpStart = System.currentTimeMillis();
+        OperationHandle insertsOperationHandle =
+                service.executeStatement(sessionHandle, insertSql, -1, 
configuration);
+        String jobId =
+                fetchAllResults(sessionHandle, insertsOperationHandle)
+                        .get(0)
+                        .getString(0)
+                        .toString();
+
+        TestUtils.waitUntilAllTasksAreRunning(restClusterClient, 
JobID.fromHexString(jobId));
+        long timeOpSucceed = System.currentTimeMillis();
+
+        OperationHandle describeJobOperationHandle =
+                service.executeStatement(
+                        sessionHandle,
+                        String.format("DESCRIBE JOB '%s'", jobId),
+                        -1,
+                        configuration);
+
+        List<RowData> result = fetchAllResults(sessionHandle, 
describeJobOperationHandle);
+        RowData jobRow =
+                result.stream()
+                        .filter(row -> 
jobId.equals(row.getString(0).toString()))
+                        .findFirst()
+                        .orElseThrow(
+                                () ->
+                                        new IllegalStateException(
+                                                "Test job " + jobId + " not 
found."));
+        assertThat(jobRow.getString(1)).hasToString(pipelineName);
+        assertThat(jobRow.getString(2)).hasToString("RUNNING");
+        assertThat(jobRow.getTimestamp(3, 3).getMillisecond())
+                .isBetween(timeOpStart, timeOpSucceed);
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Catalog API tests
     // 
--------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd 
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 100e9edd2fb..0984496de8a 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -128,6 +128,7 @@
     "org.apache.flink.sql.parser.expr.SqlUnresolvedTryCastFunction"
     "org.apache.flink.sql.parser.ddl.SqlStopJob"
     "org.apache.flink.sql.parser.dql.SqlShowJobs"
+    "org.apache.flink.sql.parser.dql.SqlDescribeJob"
     "org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec"
     "org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec"
     "org.apache.flink.sql.parser.type.SqlMapTypeNameSpec"
@@ -572,6 +573,7 @@
   # List of methods for parsing custom SQL statements.
   # Return type of method implementation should be 'SqlNode'.
   # Example: SqlShowDatabases(), SqlShowTables().
+  # Note: move SqlRichDescribeTable at last, otherwise all DESCRIBE syntax 
will fall into this method
   statementParserMethods: [
     "RichSqlInsert()"
     "SqlBeginStatementSet()"
@@ -591,7 +593,6 @@
     "SqlShowColumns()"
     "SqlShowCreate()"
     "SqlReplaceTable()"
-    "SqlRichDescribeTable()"
     "SqlAlterMaterializedTable()"
     "SqlAlterTable()"
     "SqlAlterView()"
@@ -615,6 +616,8 @@
     "SqlStopJob()"
     "SqlShowJobs()"
     "SqlTruncateTable()"
+    "SqlDescribeJob()"
+    "SqlRichDescribeTable()"
   ]
 
   # List of methods for parsing custom literals.
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 95509e7b8da..a9e299ef38c 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -2960,6 +2960,24 @@ SqlShowJobs SqlShowJobs() :
     }
 }
 
+/**
+* Parse a "DESCRIBE JOB" statement:
+* DESCRIBE | DESC JOB <JOB_ID>
+*/
+SqlDescribeJob SqlDescribeJob() :
+{
+    SqlCharStringLiteral jobId;
+    SqlParserPos pos;
+}
+{
+    ( <DESCRIBE> | <DESC> ) <JOB>  <QUOTED_STRING>
+    {
+        String id = SqlParserUtil.parseString(token.image);
+        jobId = SqlLiteral.createCharString(id, getPos());
+        return new SqlDescribeJob(getPos(), jobId);
+    }
+}
+
 /**
 * Parses a STOP JOB statement:
 * STOP JOB <JOB_ID> [<WITH SAVEPOINT>] [<WITH DRAIN>];
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlDescribeJob.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlDescribeJob.java
new file mode 100644
index 00000000000..af316fc7306
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlDescribeJob.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.NlsString;
+
+import java.util.Collections;
+import java.util.List;
+
+/** DESCRIBE | DESC &lt;JOB_ID&gt; sql call. */
+public class SqlDescribeJob extends SqlCall {
+
+    public static final SqlOperator OPERATOR =
+            new SqlSpecialOperator("DESCRIBE JOB", SqlKind.OTHER);
+
+    private final SqlCharStringLiteral jobId;
+
+    public SqlDescribeJob(SqlParserPos pos, SqlCharStringLiteral jobId) {
+        super(pos);
+        this.jobId = jobId;
+    }
+
+    public String getJobId() {
+        return jobId.getValueAs(NlsString.class).getValue();
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return Collections.singletonList(jobId);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        writer.keyword("DESCRIBE JOB");
+        jobId.unparse(writer, leftPrec, rightPrec);
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 49093ad3fab..429ac1d7183 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -2955,6 +2955,12 @@ class FlinkSqlParserImplTest extends SqlParserTest {
                 .fails("WITH DRAIN could only be used after WITH SAVEPOINT.");
     }
 
+    @Test
+    void testDescribeJob() {
+        sql("DESCRIBE JOB 'myjob'").ok("DESCRIBE JOB 'myjob'");
+        sql("DESC JOB 'myjob'").ok("DESCRIBE JOB 'myjob'");
+    }
+
     @Test
     void testTruncateTable() {
         sql("truncate table t1").ok("TRUNCATE TABLE `T1`");
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/DescribeJobOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/DescribeJobOperation.java
new file mode 100644
index 00000000000..a5214027431
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/DescribeJobOperation.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.command;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.operations.ExecutableOperation;
+import org.apache.flink.table.operations.Operation;
+
+/** Operation to describe a DESCRIBE JOB statement. */
+@Internal
+public class DescribeJobOperation implements Operation, ExecutableOperation {
+
+    private final String jobId;
+
+    public DescribeJobOperation(String jobId) {
+        this.jobId = jobId;
+    }
+
+    public String getJobId() {
+        return jobId;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return String.format("DESCRIBE JOB '%s'", jobId);
+    }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        // TODO: We may need to migrate the execution for ShowJobsOperation 
from SQL Gateway
+        //  OperationExecutor to here.
+        throw new UnsupportedOperationException(
+                "DescribeJobOperation does not support ExecutableOperation 
yet.");
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDescribeJobConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDescribeJobConverter.java
new file mode 100644
index 00000000000..9b62570fe33
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDescribeJobConverter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.dql.SqlDescribeJob;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.command.DescribeJobOperation;
+
+/** A converter for {@link SqlDescribeJob}. */
+public class SqlDescribeJobConverter implements 
SqlNodeConverter<SqlDescribeJob> {
+
+    @Override
+    public Operation convertSqlNode(SqlDescribeJob node, ConvertContext 
context) {
+        return new DescribeJobOperation(node.getJobId());
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
index ab9da76c1ce..caaafc9a331 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
@@ -55,6 +55,7 @@ public class SqlNodeConverters {
         register(new SqlShowDatabasesConverter());
         register(new SqlShowCreateCatalogConverter());
         register(new SqlDescribeCatalogConverter());
+        register(new SqlDescribeJobConverter());
     }
 
     /**
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index cda97a4610e..7ad24920c3e 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.planner.calcite
 
 import org.apache.flink.sql.parser.ExtendedSqlNode
 import org.apache.flink.sql.parser.ddl.{SqlCompilePlan, SqlReset, SqlSet, 
SqlUseModules}
-import org.apache.flink.sql.parser.dml.{RichSqlInsert, SqlBeginStatementSet, 
SqlCompileAndExecutePlan, SqlEndStatementSet, SqlExecute, SqlExecutePlan, 
SqlStatementSet, SqlTruncateTable}
+import org.apache.flink.sql.parser.dml._
 import org.apache.flink.sql.parser.dql._
 import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.planner.hint.FlinkHints
@@ -146,6 +146,7 @@ class FlinkPlannerImpl(
         || sqlNode.isInstanceOf[SqlShowPartitions]
         || sqlNode.isInstanceOf[SqlShowProcedures]
         || sqlNode.isInstanceOf[SqlShowJobs]
+        || sqlNode.isInstanceOf[SqlDescribeJob]
         || sqlNode.isInstanceOf[SqlRichDescribeTable]
         || sqlNode.isInstanceOf[SqlUnloadModule]
         || sqlNode.isInstanceOf[SqlUseModules]

Reply via email to