This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 44528e0ee9f [FLINK-35194][table] Support describe job statement for SqlGateway 44528e0ee9f is described below commit 44528e0ee9fbed11b5417253534078d60fed3a12 Author: xuyang <xyzhong...@163.com> AuthorDate: Fri Apr 26 20:29:56 2024 +0800 [FLINK-35194][table] Support describe job statement for SqlGateway This closes #24728 --- .../service/operation/OperationExecutor.java | 53 +++++++++++++++++ .../gateway/service/SqlGatewayServiceITCase.java | 51 +++++++++++++++++ .../src/main/codegen/data/Parser.tdd | 5 +- .../src/main/codegen/includes/parserImpls.ftl | 18 ++++++ .../flink/sql/parser/dql/SqlDescribeJob.java | 66 ++++++++++++++++++++++ .../flink/sql/parser/FlinkSqlParserImplTest.java | 6 ++ .../operations/command/DescribeJobOperation.java | 52 +++++++++++++++++ .../converters/SqlDescribeJobConverter.java | 32 +++++++++++ .../operations/converters/SqlNodeConverters.java | 1 + .../table/planner/calcite/FlinkPlannerImpl.scala | 3 +- 10 files changed, 285 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java index 945265089c3..c50ba8c2bbf 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java @@ -83,6 +83,7 @@ import org.apache.flink.table.operations.StatementSetOperation; import org.apache.flink.table.operations.UnloadModuleOperation; import org.apache.flink.table.operations.UseOperation; import org.apache.flink.table.operations.command.AddJarOperation; +import org.apache.flink.table.operations.command.DescribeJobOperation; import org.apache.flink.table.operations.command.ExecutePlanOperation; import org.apache.flink.table.operations.command.RemoveJarOperation; import org.apache.flink.table.operations.command.ResetOperation; @@ -481,6 +482,8 @@ public class OperationExecutor { return callStopJobOperation(tableEnv, handle, (StopJobOperation) op); } else if (op instanceof ShowJobsOperation) { return callShowJobsOperation(tableEnv, handle, (ShowJobsOperation) op); + } else if (op instanceof DescribeJobOperation) { + return callDescribeJobOperation(tableEnv, handle, (DescribeJobOperation) op); } else if (op instanceof RemoveJarOperation) { return callRemoveJar(handle, ((RemoveJarOperation) op).getPath()); } else if (op instanceof AddJarOperation @@ -774,6 +777,56 @@ public class OperationExecutor { resultRows); } + public ResultFetcher callDescribeJobOperation( + TableEnvironmentInternal tableEnv, + OperationHandle operationHandle, + DescribeJobOperation describeJobOperation) + throws SqlExecutionException { + Configuration configuration = tableEnv.getConfig().getConfiguration(); + Duration clientTimeout = configuration.get(ClientOptions.CLIENT_TIMEOUT); + String jobId = describeJobOperation.getJobId(); + Optional<JobStatusMessage> jobStatusOp = + runClusterAction( + configuration, + operationHandle, + clusterClient -> { + try { + JobID expectedJobId = JobID.fromHexString(jobId); + return clusterClient.listJobs() + .get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS) + .stream() + .filter(job -> expectedJobId.equals(job.getJobId())) + .findFirst(); + } catch (Exception e) { + throw new SqlExecutionException( + String.format( + "Failed to get job %s in the cluster.", jobId), + e); + } + }); + + if (!jobStatusOp.isPresent()) { + throw new SqlExecutionException( + String.format("Described job %s does not exist in the cluster.", jobId)); + } + JobStatusMessage job = jobStatusOp.get(); + + RowData resultRow = + GenericRowData.of( + StringData.fromString(jobId), + StringData.fromString(job.getJobName()), + StringData.fromString(job.getJobState().toString()), + DateTimeUtils.toTimestampData(job.getStartTime(), 3)); + return ResultFetcher.fromResults( + operationHandle, + ResolvedSchema.of( + Column.physical(JOB_ID, DataTypes.STRING()), + Column.physical(JOB_NAME, DataTypes.STRING()), + Column.physical(STATUS, DataTypes.STRING()), + Column.physical(START_TIME, DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())), + Collections.singletonList(resultRow)); + } + /** * Retrieves the {@link ClusterClient} from the session and runs the given {@link ClusterAction} * against it. diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java index 55aa16cb25a..012c4aed166 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java @@ -511,6 +511,57 @@ public class SqlGatewayServiceITCase { .isBetween(timeOpStart, timeOpSucceed); } + @Test + void testDescribeJobOperation(@InjectClusterClient RestClusterClient<?> restClusterClient) + throws Exception { + SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); + Configuration configuration = new Configuration(MINI_CLUSTER.getClientConfiguration()); + + String pipelineName = "test-describe-job"; + configuration.set(PipelineOptions.NAME, pipelineName); + + // running jobs + String sourceDdl = "CREATE TABLE source (a STRING) WITH ('connector'='datagen');"; + String sinkDdl = "CREATE TABLE sink (a STRING) WITH ('connector'='blackhole');"; + String insertSql = "INSERT INTO sink SELECT * FROM source;"; + + service.executeStatement(sessionHandle, sourceDdl, -1, configuration); + service.executeStatement(sessionHandle, sinkDdl, -1, configuration); + + long timeOpStart = System.currentTimeMillis(); + OperationHandle insertsOperationHandle = + service.executeStatement(sessionHandle, insertSql, -1, configuration); + String jobId = + fetchAllResults(sessionHandle, insertsOperationHandle) + .get(0) + .getString(0) + .toString(); + + TestUtils.waitUntilAllTasksAreRunning(restClusterClient, JobID.fromHexString(jobId)); + long timeOpSucceed = System.currentTimeMillis(); + + OperationHandle describeJobOperationHandle = + service.executeStatement( + sessionHandle, + String.format("DESCRIBE JOB '%s'", jobId), + -1, + configuration); + + List<RowData> result = fetchAllResults(sessionHandle, describeJobOperationHandle); + RowData jobRow = + result.stream() + .filter(row -> jobId.equals(row.getString(0).toString())) + .findFirst() + .orElseThrow( + () -> + new IllegalStateException( + "Test job " + jobId + " not found.")); + assertThat(jobRow.getString(1)).hasToString(pipelineName); + assertThat(jobRow.getString(2)).hasToString("RUNNING"); + assertThat(jobRow.getTimestamp(3, 3).getMillisecond()) + .isBetween(timeOpStart, timeOpSucceed); + } + // -------------------------------------------------------------------------------------------- // Catalog API tests // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index 100e9edd2fb..0984496de8a 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -128,6 +128,7 @@ "org.apache.flink.sql.parser.expr.SqlUnresolvedTryCastFunction" "org.apache.flink.sql.parser.ddl.SqlStopJob" "org.apache.flink.sql.parser.dql.SqlShowJobs" + "org.apache.flink.sql.parser.dql.SqlDescribeJob" "org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec" "org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec" "org.apache.flink.sql.parser.type.SqlMapTypeNameSpec" @@ -572,6 +573,7 @@ # List of methods for parsing custom SQL statements. # Return type of method implementation should be 'SqlNode'. # Example: SqlShowDatabases(), SqlShowTables(). + # Note: move SqlRichDescribeTable at last, otherwise all DESCRIBE syntax will fall into this method statementParserMethods: [ "RichSqlInsert()" "SqlBeginStatementSet()" @@ -591,7 +593,6 @@ "SqlShowColumns()" "SqlShowCreate()" "SqlReplaceTable()" - "SqlRichDescribeTable()" "SqlAlterMaterializedTable()" "SqlAlterTable()" "SqlAlterView()" @@ -615,6 +616,8 @@ "SqlStopJob()" "SqlShowJobs()" "SqlTruncateTable()" + "SqlDescribeJob()" + "SqlRichDescribeTable()" ] # List of methods for parsing custom literals. diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index 95509e7b8da..a9e299ef38c 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -2960,6 +2960,24 @@ SqlShowJobs SqlShowJobs() : } } +/** +* Parse a "DESCRIBE JOB" statement: +* DESCRIBE | DESC JOB <JOB_ID> +*/ +SqlDescribeJob SqlDescribeJob() : +{ + SqlCharStringLiteral jobId; + SqlParserPos pos; +} +{ + ( <DESCRIBE> | <DESC> ) <JOB> <QUOTED_STRING> + { + String id = SqlParserUtil.parseString(token.image); + jobId = SqlLiteral.createCharString(id, getPos()); + return new SqlDescribeJob(getPos(), jobId); + } +} + /** * Parses a STOP JOB statement: * STOP JOB <JOB_ID> [<WITH SAVEPOINT>] [<WITH DRAIN>]; diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlDescribeJob.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlDescribeJob.java new file mode 100644 index 00000000000..af316fc7306 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlDescribeJob.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.dql; + +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.NlsString; + +import java.util.Collections; +import java.util.List; + +/** DESCRIBE | DESC <JOB_ID> sql call. */ +public class SqlDescribeJob extends SqlCall { + + public static final SqlOperator OPERATOR = + new SqlSpecialOperator("DESCRIBE JOB", SqlKind.OTHER); + + private final SqlCharStringLiteral jobId; + + public SqlDescribeJob(SqlParserPos pos, SqlCharStringLiteral jobId) { + super(pos); + this.jobId = jobId; + } + + public String getJobId() { + return jobId.getValueAs(NlsString.class).getValue(); + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List<SqlNode> getOperandList() { + return Collections.singletonList(jobId); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("DESCRIBE JOB"); + jobId.unparse(writer, leftPrec, rightPrec); + } +} diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index 49093ad3fab..429ac1d7183 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -2955,6 +2955,12 @@ class FlinkSqlParserImplTest extends SqlParserTest { .fails("WITH DRAIN could only be used after WITH SAVEPOINT."); } + @Test + void testDescribeJob() { + sql("DESCRIBE JOB 'myjob'").ok("DESCRIBE JOB 'myjob'"); + sql("DESC JOB 'myjob'").ok("DESCRIBE JOB 'myjob'"); + } + @Test void testTruncateTable() { sql("truncate table t1").ok("TRUNCATE TABLE `T1`"); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/DescribeJobOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/DescribeJobOperation.java new file mode 100644 index 00000000000..a5214027431 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/DescribeJobOperation.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.command; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.operations.ExecutableOperation; +import org.apache.flink.table.operations.Operation; + +/** Operation to describe a DESCRIBE JOB statement. */ +@Internal +public class DescribeJobOperation implements Operation, ExecutableOperation { + + private final String jobId; + + public DescribeJobOperation(String jobId) { + this.jobId = jobId; + } + + public String getJobId() { + return jobId; + } + + @Override + public String asSummaryString() { + return String.format("DESCRIBE JOB '%s'", jobId); + } + + @Override + public TableResultInternal execute(Context ctx) { + // TODO: We may need to migrate the execution for ShowJobsOperation from SQL Gateway + // OperationExecutor to here. + throw new UnsupportedOperationException( + "DescribeJobOperation does not support ExecutableOperation yet."); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDescribeJobConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDescribeJobConverter.java new file mode 100644 index 00000000000..9b62570fe33 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDescribeJobConverter.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.dql.SqlDescribeJob; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.command.DescribeJobOperation; + +/** A converter for {@link SqlDescribeJob}. */ +public class SqlDescribeJobConverter implements SqlNodeConverter<SqlDescribeJob> { + + @Override + public Operation convertSqlNode(SqlDescribeJob node, ConvertContext context) { + return new DescribeJobOperation(node.getJobId()); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java index ab9da76c1ce..caaafc9a331 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java @@ -55,6 +55,7 @@ public class SqlNodeConverters { register(new SqlShowDatabasesConverter()); register(new SqlShowCreateCatalogConverter()); register(new SqlDescribeCatalogConverter()); + register(new SqlDescribeJobConverter()); } /** diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala index cda97a4610e..7ad24920c3e 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.planner.calcite import org.apache.flink.sql.parser.ExtendedSqlNode import org.apache.flink.sql.parser.ddl.{SqlCompilePlan, SqlReset, SqlSet, SqlUseModules} -import org.apache.flink.sql.parser.dml.{RichSqlInsert, SqlBeginStatementSet, SqlCompileAndExecutePlan, SqlEndStatementSet, SqlExecute, SqlExecutePlan, SqlStatementSet, SqlTruncateTable} +import org.apache.flink.sql.parser.dml._ import org.apache.flink.sql.parser.dql._ import org.apache.flink.table.api.{TableException, ValidationException} import org.apache.flink.table.planner.hint.FlinkHints @@ -146,6 +146,7 @@ class FlinkPlannerImpl( || sqlNode.isInstanceOf[SqlShowPartitions] || sqlNode.isInstanceOf[SqlShowProcedures] || sqlNode.isInstanceOf[SqlShowJobs] + || sqlNode.isInstanceOf[SqlDescribeJob] || sqlNode.isInstanceOf[SqlRichDescribeTable] || sqlNode.isInstanceOf[SqlUnloadModule] || sqlNode.isInstanceOf[SqlUseModules]