This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bce4854fac [FLINK-28360][sql-client] Support stop job statement in 
SQL client (#20159)
7bce4854fac is described below

commit 7bce4854faccc0914951832d1dda101afce0105a
Author: Paul Lin <paullin3...@gmail.com>
AuthorDate: Mon Oct 10 19:21:09 2022 +0800

    [FLINK-28360][sql-client] Support stop job statement in SQL client (#20159)
---
 .../apache/flink/table/client/cli/CliClient.java   |  21 ++++
 .../apache/flink/table/client/cli/CliStrings.java  |   5 +
 .../flink/table/client/gateway/Executor.java       |   6 ++
 .../client/gateway/context/ExecutionContext.java   |  11 ++
 .../table/client/gateway/local/LocalExecutor.java  | 112 +++++++++++++++++++
 .../flink/table/client/cli/CliClientTest.java      |  76 +++++++++++++
 .../flink/table/client/cli/CliResultViewTest.java  |   8 ++
 .../flink/table/client/cli/TestingExecutor.java    |   8 ++
 .../client/gateway/local/LocalExecutorITCase.java  | 118 ++++++++++++++++-----
 .../src/main/codegen/data/Parser.tdd               |   8 ++
 .../src/main/codegen/includes/parserImpls.ftl      |  45 ++++++++
 .../apache/flink/sql/parser/ddl/SqlStopJob.java    |  95 +++++++++++++++++
 .../flink/sql/parser/utils/ParserResource.java     |   3 +
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |  12 +++
 .../table/operations/command/StopJobOperation.java |  62 +++++++++++
 .../operations/SqlToOperationConverter.java        |   9 ++
 16 files changed, 573 insertions(+), 26 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index f6ade74ae70..fb4b3ffdafd 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -48,6 +48,7 @@ import 
org.apache.flink.table.operations.command.QuitOperation;
 import org.apache.flink.table.operations.command.RemoveJarOperation;
 import org.apache.flink.table.operations.command.ResetOperation;
 import org.apache.flink.table.operations.command.SetOperation;
+import org.apache.flink.table.operations.command.StopJobOperation;
 import org.apache.flink.table.operations.ddl.AlterOperation;
 import org.apache.flink.table.operations.ddl.CreateOperation;
 import org.apache.flink.table.operations.ddl.DropOperation;
@@ -468,6 +469,9 @@ public class CliClient implements AutoCloseable {
         } else if (operation instanceof CreateTableASOperation) {
             // CTAS
             callInsert((CreateTableASOperation) operation);
+        } else if (operation instanceof StopJobOperation) {
+            // STOP JOB
+            callStopJob((StopJobOperation) operation);
         } else {
             // fallback to default implementation
             executeOperation(operation);
@@ -635,6 +639,23 @@ public class CliClient implements AutoCloseable {
         }
     }
 
+    private void callStopJob(StopJobOperation stopJobOperation) {
+        Optional<String> savepoint =
+                executor.stopJob(
+                        sessionId,
+                        stopJobOperation.getJobId(),
+                        stopJobOperation.isWithSavepoint(),
+                        stopJobOperation.isWithDrain());
+        if (stopJobOperation.isWithSavepoint()) {
+            Preconditions.checkState(savepoint.isPresent());
+            printInfo(
+                    String.format(
+                            
CliStrings.MESSAGE_STOP_JOB_WITH_SAVEPOINT_STATEMENT, savepoint.get()));
+        } else {
+            printInfo(CliStrings.MESSAGE_STOP_JOB_STATEMENT);
+        }
+    }
+
     private void executeOperation(Operation operation) {
         TableResultInternal result = executor.executeOperation(sessionId, 
operation);
         if (TABLE_RESULT_OK == result) {
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
index a8160711aac..43a8782e69f 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
@@ -246,6 +246,11 @@ public final class CliStrings {
     public static final String MESSAGE_REMOVE_JAR_STATEMENT =
             "The specified jar is removed from session classloader.";
 
+    public static final String MESSAGE_STOP_JOB_WITH_SAVEPOINT_STATEMENT =
+            "The specified job is stopped with savepoint %s.";
+
+    public static final String MESSAGE_STOP_JOB_STATEMENT = "The specified job 
is stopped.";
+
     // 
--------------------------------------------------------------------------------------------
 
     public static final String RESULT_TITLE = "SQL Query Result";
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
index c4a022e1d0b..0dbe999fd45 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /** A gateway for communicating with Flink and other external systems. */
 public interface Executor {
@@ -143,4 +144,9 @@ public interface Executor {
 
     /** Remove the JAR resource from the classloader with specified session. */
     void removeJar(String sessionId, String jarPath);
+
+    /** Stops a job in the specified session. */
+    Optional<String> stopJob(
+            String sessionId, String jobId, boolean isWithSavepoint, boolean 
isWithDrain)
+            throws SqlExecutionException;
 }
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
index c2bf0b1d725..d172de81f2a 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
@@ -36,8 +36,10 @@ import org.apache.flink.table.factories.PlannerFactoryUtil;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.resource.ResourceManager;
 import org.apache.flink.util.MutableURLClassLoader;
+import org.apache.flink.util.TemporaryClassLoaderContext;
 
 import java.lang.reflect.Method;
+import java.util.function.Supplier;
 
 import static 
org.apache.flink.table.client.gateway.context.SessionContext.SessionState;
 
@@ -167,4 +169,13 @@ public class ExecutionContext {
                     e);
         }
     }
+
+    /**
+     * Executes the given supplier using the execution context's classloader 
as thread classloader.
+     */
+    public <R> R wrapClassLoader(Supplier<R> supplier) {
+        try (TemporaryClassLoaderContext ignored = 
TemporaryClassLoaderContext.of(classLoader)) {
+            return supplier.get();
+        }
+    }
 }
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index f5e1c630863..125d12667aa 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -19,7 +19,16 @@
 package org.apache.flink.table.client.gateway.local;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.cli.ClientOptions;
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.api.internal.TableResultInternal;
@@ -38,17 +47,22 @@ import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.operations.ModifyOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.flink.table.client.cli.CliStrings.MESSAGE_SQL_EXECUTION_ERROR;
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -69,11 +83,14 @@ public class LocalExecutor implements Executor {
     private final ResultStore resultStore;
     private final DefaultContext defaultContext;
 
+    private final ClusterClientServiceLoader clusterClientServiceLoader;
+
     /** Creates a local executor for submitting table programs and retrieving 
results. */
     public LocalExecutor(DefaultContext defaultContext) {
         this.contextMap = new ConcurrentHashMap<>();
         this.resultStore = new ResultStore();
         this.defaultContext = defaultContext;
+        this.clusterClientServiceLoader = new 
DefaultClusterClientServiceLoader();
     }
 
     @Override
@@ -307,4 +324,99 @@ public class LocalExecutor implements Executor {
         final SessionContext context = getSessionContext(sessionId);
         context.removeJar(jarUrl);
     }
+
+    @Override
+    public Optional<String> stopJob(
+            String sessionId, String jobId, boolean isWithSavepoint, boolean 
isWithDrain)
+            throws SqlExecutionException {
+        Duration clientTimeout = 
getSessionConfig(sessionId).get(ClientOptions.CLIENT_TIMEOUT);
+        try {
+            return runClusterAction(
+                    sessionId,
+                    clusterClient -> {
+                        if (isWithSavepoint) {
+                            // blocking get savepoint path
+                            try {
+                                String savepoint =
+                                        clusterClient
+                                                .stopWithSavepoint(
+                                                        
JobID.fromHexString(jobId),
+                                                        isWithDrain,
+                                                        null,
+                                                        
SavepointFormatType.DEFAULT)
+                                                .get(
+                                                        
clientTimeout.toMillis(),
+                                                        TimeUnit.MILLISECONDS);
+                                return Optional.of(savepoint);
+                            } catch (Exception e) {
+                                throw new FlinkException(
+                                        "Could not stop job "
+                                                + jobId
+                                                + " in session "
+                                                + sessionId
+                                                + ".",
+                                        e);
+                            }
+                        } else {
+                            clusterClient.cancel(JobID.fromHexString(jobId));
+                            return Optional.empty();
+                        }
+                    });
+        } catch (Exception e) {
+            throw new SqlExecutionException(
+                    "Could not stop job " + jobId + " in session " + sessionId 
+ ".", e);
+        }
+    }
+
+    /**
+     * Retrieves the {@link ClusterClient} from the session and runs the given 
{@link ClusterAction}
+     * against it.
+     *
+     * @param sessionId the specified session ID
+     * @param clusterAction the cluster action to run against the retrieved 
{@link ClusterClient}.
+     * @param <ClusterID> type of the cluster id
+     * @param <Result>> type of the result
+     * @throws FlinkException if something goes wrong
+     */
+    private <ClusterID, Result> Result runClusterAction(
+            String sessionId, ClusterAction<ClusterID, Result> clusterAction)
+            throws FlinkException {
+        final SessionContext context = getSessionContext(sessionId);
+        final Configuration configuration = (Configuration) 
context.getReadableConfig();
+        final ClusterClientFactory<ClusterID> clusterClientFactory =
+                context.getExecutionContext()
+                        .wrapClassLoader(
+                                () ->
+                                        
clusterClientServiceLoader.getClusterClientFactory(
+                                                configuration));
+
+        final ClusterID clusterId = 
clusterClientFactory.getClusterId(configuration);
+        Preconditions.checkNotNull(clusterId, "No cluster ID found for session 
" + sessionId);
+
+        try (final ClusterDescriptor<ClusterID> clusterDescriptor =
+                        
clusterClientFactory.createClusterDescriptor(configuration);
+                final ClusterClient<ClusterID> clusterClient =
+                        
clusterDescriptor.retrieve(clusterId).getClusterClient()) {
+            return clusterAction.runAction(clusterClient);
+        }
+    }
+
+    /**
+     * Internal interface to encapsulate cluster actions which are executed 
via the {@link
+     * ClusterClient}.
+     *
+     * @param <ClusterID> type of the cluster id
+     * @param <Result>> type of the result
+     */
+    @FunctionalInterface
+    private interface ClusterAction<ClusterID, Result> {
+
+        /**
+         * Run the cluster action with the given {@link ClusterClient}.
+         *
+         * @param clusterClient to run the cluster action against
+         * @throws FlinkException if something goes wrong
+         */
+        Result runAction(ClusterClient<ClusterID> clusterClient) throws 
FlinkException;
+    }
 }
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
index a1b6e0069f6..b4c4e595fe8 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
@@ -70,6 +70,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import static 
org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
 import static 
org.apache.flink.table.client.cli.CliClient.DEFAULT_TERMINAL_FACTORY;
@@ -356,6 +359,58 @@ public class CliClientTest extends TestLogger {
         }
     }
 
+    @Test(timeout = 10000)
+    public void testStopJob() throws Exception {
+        final MockExecutor mockExecutor = new MockExecutor();
+        mockExecutor.isSync = false;
+
+        String sessionId = mockExecutor.openSession("test-session");
+        OutputStream outputStream = new ByteArrayOutputStream(256);
+        try (CliClient client =
+                new CliClient(
+                        () -> TerminalUtils.createDumbTerminal(outputStream),
+                        sessionId,
+                        mockExecutor,
+                        historyTempFile(),
+                        null)) {
+            client.executeInNonInteractiveMode(INSERT_INTO_STATEMENT);
+            String dmlResult = outputStream.toString();
+            String jobId = extractJobId(dmlResult);
+            client.executeInNonInteractiveMode("STOP JOB '" + jobId + "'");
+            String stopResult = outputStream.toString();
+            
assertThat(stopResult).contains(CliStrings.MESSAGE_STOP_JOB_STATEMENT);
+        }
+    }
+
+    @Test(timeout = 10000)
+    public void testStopJobWithSavepoint() throws Exception {
+        final MockExecutor mockExecutor = new MockExecutor();
+        mockExecutor.isSync = false;
+        final String mockSavepoint = "/my/savepoint/path";
+        mockExecutor.savepoint = mockSavepoint;
+
+        String sessionId = mockExecutor.openSession("test-session");
+        OutputStream outputStream = new ByteArrayOutputStream(256);
+        try (CliClient client =
+                new CliClient(
+                        () -> TerminalUtils.createDumbTerminal(outputStream),
+                        sessionId,
+                        mockExecutor,
+                        historyTempFile(),
+                        null)) {
+            client.executeInNonInteractiveMode(INSERT_INTO_STATEMENT);
+            String dmlResult = outputStream.toString();
+            String jobId = extractJobId(dmlResult);
+            client.executeInNonInteractiveMode("STOP JOB '" + jobId + "' WITH 
SAVEPOINT");
+            String stopResult = outputStream.toString();
+            assertThat(stopResult)
+                    .contains(
+                            String.format(
+                                    
CliStrings.MESSAGE_STOP_JOB_WITH_SAVEPOINT_STATEMENT,
+                                    mockSavepoint));
+        }
+    }
+
     // 
--------------------------------------------------------------------------------------------
 
     private void verifyUpdateSubmission(
@@ -419,11 +474,21 @@ public class CliClientTest extends TestLogger {
         return outputStream.toString();
     }
 
+    private String extractJobId(String result) {
+        Pattern pattern = Pattern.compile("[\\s\\S]*Job ID: (.*)[\\s\\S]*");
+        Matcher matcher = pattern.matcher(result);
+        if (!matcher.matches()) {
+            throw new IllegalStateException("No job ID found in string: " + 
result);
+        }
+        return matcher.group(1);
+    }
+
     // 
--------------------------------------------------------------------------------------------
 
     private static class MockExecutor implements Executor {
 
         public boolean failExecution;
+        public String savepoint;
 
         public volatile boolean isSync = false;
         public volatile boolean isAwait = false;
@@ -595,5 +660,16 @@ public class CliClientTest extends TestLogger {
         public void removeJar(String sessionId, String jarUrl) {
             throw new UnsupportedOperationException("Not implemented.");
         }
+
+        @Override
+        public Optional<String> stopJob(
+                String sessionId, String jobId, boolean isWithSavepoint, 
boolean isWithDrain)
+                throws SqlExecutionException {
+            if (isWithSavepoint) {
+                return Optional.of(savepoint);
+            } else {
+                return Optional.empty();
+            }
+        }
     }
 }
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java
index e2b7bb8e25e..bba5f9cb5c2 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java
@@ -49,6 +49,7 @@ import java.nio.file.Path;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -245,6 +246,13 @@ public class CliResultViewTest {
         public void removeJar(String sessionId, String jarUrl) {
             throw new UnsupportedOperationException("Not implemented.");
         }
+
+        @Override
+        public Optional<String> stopJob(
+                String sessionId, String jobId, boolean isWithSavepoint, 
boolean isWithDrain)
+                throws SqlExecutionException {
+            throw new UnsupportedOperationException("Not implemented.");
+        }
     }
 
     private static final class TestingCliResultView implements Runnable {
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
index b4c68f6951c..55c6413a976 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
@@ -35,6 +35,7 @@ import javax.annotation.Nullable;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /** A customizable {@link Executor} for testing purposes. */
 class TestingExecutor implements Executor {
@@ -71,6 +72,13 @@ class TestingExecutor implements Executor {
         throw new UnsupportedOperationException("Not implemented.");
     }
 
+    @Override
+    public Optional<String> stopJob(
+            String sessionId, String jobId, boolean isWithSavepoint, boolean 
isWithDrain)
+            throws SqlExecutionException {
+        throw new UnsupportedOperationException("Not implemented.");
+    }
+
     @Override
     public TypedResult<List<RowData>> retrieveResultChanges(String sessionId, 
String resultId)
             throws SqlExecutionException {
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 7c7a5062136..73b03112fcf 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -19,16 +19,22 @@
 
 package org.apache.flink.table.client.gateway.local;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.client.cli.DefaultCLI;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.client.config.ResultMode;
 import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.ResultDescriptor;
@@ -41,16 +47,19 @@ import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.utils.UserDefinedFunctions;
 import org.apache.flink.table.utils.print.RowDataToStringConverter;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.UserClassLoaderJarTestUtils;
 
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.IOException;
@@ -63,6 +72,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -81,28 +91,33 @@ public class LocalExecutorITCase extends TestLogger {
     private static final int NUM_TMS = 2;
     private static final int NUM_SLOTS_PER_TM = 2;
 
-    @ClassRule public static TemporaryFolder tempFolder = new 
TemporaryFolder();
-
-    @ClassRule
-    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
-            new MiniClusterWithClientResource(
-                    new MiniClusterResourceConfiguration.Builder()
-                            .setConfiguration(getConfig())
-                            .setNumberTaskManagers(NUM_TMS)
-                            .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
-                            .build());
+    @TempDir
+    @Order(1)
+    public static File tempFolder;
+
+    @RegisterExtension
+    @Order(2)
+    public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
+                    () ->
+                            new MiniClusterResourceConfiguration.Builder()
+                                    .setConfiguration(getConfig())
+                                    .setNumberTaskManagers(NUM_TMS)
+                                    
.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+                                    .build());
 
     private static ClusterClient<?> clusterClient;
 
     // a generated UDF jar used for testing classloading of dependencies
     private static URL udfDependency;
 
-    @BeforeClass
-    public static void setup() throws IOException {
-        clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient();
+    @BeforeAll
+    public static void setup(@InjectClusterClient ClusterClient<?> 
injectedClusterClient)
+            throws Exception {
+        clusterClient = injectedClusterClient;
         File udfJar =
                 UserClassLoaderJarTestUtils.createJarFile(
-                        tempFolder.newFolder("test-jar"),
+                        tempFolder,
                         "test-classloader-udf.jar",
                         GENERATED_LOWER_UDF_CLASS,
                         String.format(GENERATED_LOWER_UDF_CODE, 
GENERATED_LOWER_UDF_CLASS));
@@ -115,6 +130,10 @@ public class LocalExecutorITCase extends TestLogger {
         config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
         config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
         config.setBoolean(WebOptions.SUBMIT_ENABLE, false);
+        config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
+        config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
+        config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
tempFolder.toURI().toString());
+        config.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
tempFolder.toURI().toString());
         return config;
     }
 
@@ -144,7 +163,8 @@ public class LocalExecutorITCase extends TestLogger {
         executor.closeSession(sessionId);
     }
 
-    @Test(timeout = 90_000L)
+    @Test
+    @Timeout(value = 90)
     public void testStreamQueryExecutionChangelog() throws Exception {
         final URL url = 
getClass().getClassLoader().getResource("test-data.csv");
         Objects.requireNonNull(url);
@@ -191,7 +211,8 @@ public class LocalExecutorITCase extends TestLogger {
         }
     }
 
-    @Test(timeout = 90_000L)
+    @Test
+    @Timeout(value = 90)
     public void testStreamQueryExecutionChangelogMultipleTimes() throws 
Exception {
         final URL url = 
getClass().getClassLoader().getResource("test-data.csv");
         Objects.requireNonNull(url);
@@ -240,7 +261,8 @@ public class LocalExecutorITCase extends TestLogger {
         }
     }
 
-    @Test(timeout = 90_000L)
+    @Test
+    @Timeout(value = 90)
     public void testStreamQueryExecutionTable() throws Exception {
         final URL url = 
getClass().getClassLoader().getResource("test-data.csv");
         Objects.requireNonNull(url);
@@ -265,7 +287,8 @@ public class LocalExecutorITCase extends TestLogger {
         executeStreamQueryTable(replaceVars, configMap, query, 
expectedResults);
     }
 
-    @Test(timeout = 90_000L)
+    @Test
+    @Timeout(value = 90)
     public void testStreamQueryExecutionTableMultipleTimes() throws Exception {
         final URL url = 
getClass().getClassLoader().getResource("test-data.csv");
         Objects.requireNonNull(url);
@@ -291,7 +314,8 @@ public class LocalExecutorITCase extends TestLogger {
         }
     }
 
-    @Test(timeout = 90_000L)
+    @Test
+    @Timeout(value = 90)
     public void testStreamQueryExecutionLimitedTable() throws Exception {
         final URL url = 
getClass().getClassLoader().getResource("test-data.csv");
         Objects.requireNonNull(url);
@@ -312,7 +336,8 @@ public class LocalExecutorITCase extends TestLogger {
         executeStreamQueryTable(replaceVars, configMap, query, 
expectedResults);
     }
 
-    @Test(timeout = 90_000L)
+    @Test
+    @Timeout(value = 90)
     public void testBatchQueryExecution() throws Exception {
         final URL url = 
getClass().getClassLoader().getResource("test-data.csv");
         Objects.requireNonNull(url);
@@ -358,7 +383,8 @@ public class LocalExecutorITCase extends TestLogger {
         }
     }
 
-    @Test(timeout = 90_000L)
+    @Test
+    @Timeout(value = 90)
     public void testBatchQueryExecutionMultipleTimes() throws Exception {
         final URL url = 
getClass().getClassLoader().getResource("test-data.csv");
         Objects.requireNonNull(url);
@@ -406,6 +432,46 @@ public class LocalExecutorITCase extends TestLogger {
         }
     }
 
+    @Test
+    @Timeout(value = 90)
+    public void testStopJob() throws Exception {
+        final Map<String, String> configMap = new HashMap<>();
+        configMap.put(EXECUTION_RESULT_MODE.key(), ResultMode.TABLE.name());
+        configMap.put(RUNTIME_MODE.key(), 
RuntimeExecutionMode.STREAMING.name());
+        configMap.put(TableConfigOptions.TABLE_DML_SYNC.key(), "false");
+
+        final LocalExecutor executor =
+                createLocalExecutor(
+                        Collections.singletonList(udfDependency), 
Configuration.fromMap(configMap));
+        String sessionId = executor.openSession("test-session");
+
+        final String srcDdl = "CREATE TABLE src (a STRING) WITH ('connector' = 
'datagen')";
+        final String snkDdl = "CREATE TABLE snk (a STRING) WITH ('connector' = 
'blackhole')";
+        final String insert = "INSERT INTO snk SELECT a FROM src;";
+
+        try {
+            executor.executeOperation(sessionId, 
executor.parseStatement(sessionId, srcDdl));
+            executor.executeOperation(sessionId, 
executor.parseStatement(sessionId, snkDdl));
+            TableResult result =
+                    executor.executeOperation(
+                            sessionId, executor.parseStatement(sessionId, 
insert));
+            JobClient jobClient = result.getJobClient().get();
+            JobID jobId = jobClient.getJobID();
+
+            // wait till the job turns into running status or the test times 
out
+            JobStatus jobStatus;
+            do {
+                Thread.sleep(2_000L);
+                jobStatus = jobClient.getJobStatus().get();
+            } while (jobStatus != JobStatus.RUNNING);
+
+            Optional<String> savepoint = executor.stopJob(sessionId, 
jobId.toString(), true, true);
+            assertThat(savepoint.isPresent()).isTrue();
+        } finally {
+            executor.closeSession(sessionId);
+        }
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Helper method
     // 
--------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd 
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 8c87f645d02..a775b699480 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -102,6 +102,7 @@
     "org.apache.flink.sql.parser.dql.SqlRichDescribeTable"
     "org.apache.flink.sql.parser.dql.SqlUnloadModule"
     "org.apache.flink.sql.parser.expr.SqlUnresolvedTryCastFunction"
+    "org.apache.flink.sql.parser.ddl.SqlStopJob"
     "org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec"
     "org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec"
     "org.apache.flink.sql.parser.type.SqlMapTypeNameSpec"
@@ -132,6 +133,7 @@
     "COMPILE"
     "COLUMNS"
     "DATABASES"
+    "DRAIN"
     "ENFORCED"
     "ESTIMATED_COST"
     "EXTENDED"
@@ -140,6 +142,7 @@
     "JSON_EXECUTION_PLAN"
     "JAR"
     "JARS"
+    "JOB"
     "LOAD"
     "METADATA"
     "MODIFY"
@@ -153,6 +156,7 @@
     "REMOVE"
     "RENAME"
     "SCALA"
+    "STOP"
     "STRING"
     "TABLES"
     "UNLOAD"
@@ -244,6 +248,7 @@
     "DOMAIN"
     "DOW"
     "DOY"
+    "DRAIN"
     "DYNAMIC_FUNCTION"
     "DYNAMIC_FUNCTION_CODE"
     "ENCODING"
@@ -285,6 +290,7 @@
     "ISOYEAR"
     "JAR"
     "JARS"
+    "JOB"
     "JAVA"
     "JSON"
     "K"
@@ -444,6 +450,7 @@
     "SQL_VARBINARY"
     "SQL_VARCHAR"
     "STATE"
+    "STOP"
     "STRUCTURE"
     "STYLE"
     "SUBCLASS_ORIGIN"
@@ -557,6 +564,7 @@
     "SqlSet()"
     "SqlReset()"
     "SqlAnalyzeTable()"
+    "SqlStopJob()"
   ]
 
   # List of methods for parsing custom literals.
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 19677305cd2..797248abf79 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -2325,3 +2325,48 @@ SqlNode SqlAnalyzeTable():
         return new SqlAnalyzeTable(s.end(this), tableName, partitionSpec, 
columns, allColumns);
     }
 }
+
+/**
+* Parses a STOP JOB statement:
+* STOP JOB <JOB_ID> [<WITH SAVEPOINT>] [<WITH DRAIN>];
+*/
+SqlStopJob SqlStopJob() :
+{
+    SqlCharStringLiteral jobId;
+    boolean isWithSavepoint = false;
+    boolean isWithDrain = false;
+    final Span span;
+}
+{
+    <STOP> <JOB> <QUOTED_STRING>
+    {
+        String id = SqlParserUtil.parseString(token.image);
+        jobId = SqlLiteral.createCharString(id, getPos());
+    }
+    [
+        LOOKAHEAD(2)
+        <WITH> <SAVEPOINT>
+        {
+            isWithSavepoint = true;
+        }
+    ]
+    [
+        LOOKAHEAD(2)
+        <WITH>
+        {
+            span = span();
+        }
+        <DRAIN>
+        {
+            span.end(this);
+            if (!isWithSavepoint) {
+                throw SqlUtil.newContextException(span.pos(),
+                    ParserResource.RESOURCE.withDrainOnlyUsedWithSavepoint());
+            }
+            isWithDrain = true;
+        }
+    ]
+    {
+        return new SqlStopJob(getPos(), jobId, isWithSavepoint, isWithDrain);
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlStopJob.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlStopJob.java
new file mode 100644
index 00000000000..a50a03d8154
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlStopJob.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.NlsString;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collections;
+import java.util.List;
+
+/** The command to stop a flink job. */
+public class SqlStopJob extends SqlCall {
+
+    public static final SqlOperator OPERATOR =
+            new SqlSpecialOperator("STOP JOB", SqlKind.OTHER_DDL);
+
+    private final SqlCharStringLiteral jobId;
+
+    private final boolean isWithDrain;
+
+    private final boolean isWithSavepoint;
+
+    public SqlStopJob(
+            SqlParserPos pos,
+            SqlCharStringLiteral jobId,
+            boolean isWithSavepoint,
+            boolean isWithDrain) {
+        super(pos);
+        this.jobId = jobId;
+        this.isWithSavepoint = isWithSavepoint;
+        this.isWithDrain = isWithDrain;
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        writer.keyword("STOP");
+        writer.keyword("JOB");
+        jobId.unparse(writer, leftPrec, rightPrec);
+        if (isWithSavepoint) {
+            writer.keyword("WITH SAVEPOINT");
+        }
+        if (isWithDrain) {
+            writer.keyword("WITH DRAIN");
+        }
+    }
+
+    @Nonnull
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Nonnull
+    @Override
+    public List<SqlNode> getOperandList() {
+        return Collections.singletonList(jobId);
+    }
+
+    public String getId() {
+        return jobId.getValueAs(NlsString.class).getValue();
+    }
+
+    public boolean isWithSavepoint() {
+        return isWithSavepoint;
+    }
+
+    public boolean isWithDrain() {
+        return isWithDrain;
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
index 8fa96b90ae2..8a63777f543 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
@@ -43,4 +43,7 @@ public interface ParserResource {
 
     @Resources.BaseMessage("CREATE FUNCTION USING JAR syntax is not applicable 
to {0} language.")
     Resources.ExInst<ParseException> createFunctionUsingJar(String language);
+
+    @Resources.BaseMessage("WITH DRAIN could only be used after WITH 
SAVEPOINT.")
+    Resources.ExInst<ParseException> withDrainOnlyUsedWithSavepoint();
 }
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index b6dd3fdf74e..27d57acc88f 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -1950,6 +1950,18 @@ class FlinkSqlParserImplTest extends SqlParserTest {
                                         "CREATE TABLE AS SELECT syntax does 
not support to create partitioned table yet."));
     }
 
+    @Test
+    void testStopJob() {
+        sql("STOP JOB 'myjob'").ok("STOP JOB 'myjob'");
+        sql("STOP JOB 'myjob' WITH SAVEPOINT").ok("STOP JOB 'myjob' WITH 
SAVEPOINT");
+        sql("STOP JOB 'myjob' WITH SAVEPOINT WITH DRAIN")
+                .ok("STOP JOB 'myjob' WITH SAVEPOINT WITH DRAIN");
+        sql("STOP JOB 'myjob' ^WITH DRAIN^")
+                .fails("WITH DRAIN could only be used after WITH SAVEPOINT.");
+        sql("STOP JOB 'myjob' ^WITH DRAIN^ WITH SAVEPOINT")
+                .fails("WITH DRAIN could only be used after WITH SAVEPOINT.");
+    }
+
     public static BaseMatcher<SqlNode> validated(String validatedSql) {
         return new TypeSafeDiagnosingMatcher<SqlNode>() {
             @Override
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/StopJobOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/StopJobOperation.java
new file mode 100644
index 00000000000..dcb56c20ab6
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/StopJobOperation.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.command;
+
+import org.apache.flink.table.operations.Operation;
+
+/** Operation to stop a running job. */
+public class StopJobOperation implements Operation {
+
+    private final String jobId;
+
+    private final boolean isWithSavepoint;
+
+    private final boolean isWithDrain;
+
+    public StopJobOperation(String jobId, boolean isWithSavepoint, boolean 
isWithDrain) {
+        this.jobId = jobId;
+        this.isWithSavepoint = isWithSavepoint;
+        this.isWithDrain = isWithDrain;
+    }
+
+    public String getJobId() {
+        return jobId;
+    }
+
+    public boolean isWithSavepoint() {
+        return isWithSavepoint;
+    }
+
+    public boolean isWithDrain() {
+        return isWithDrain;
+    }
+
+    @Override
+    public String asSummaryString() {
+        StringBuilder summary = new StringBuilder("STOP JOB ");
+        summary.append("'").append(jobId).append("'");
+        if (isWithSavepoint) {
+            summary.append(" WITH SAVEPOINT");
+        }
+        if (isWithDrain) {
+            summary.append(" WITH DRAIN");
+        }
+        return summary.toString();
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 2e137b3ebeb..c1860e5e9c1 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -52,6 +52,7 @@ import org.apache.flink.sql.parser.ddl.SqlDropView;
 import org.apache.flink.sql.parser.ddl.SqlRemoveJar;
 import org.apache.flink.sql.parser.ddl.SqlReset;
 import org.apache.flink.sql.parser.ddl.SqlSet;
+import org.apache.flink.sql.parser.ddl.SqlStopJob;
 import org.apache.flink.sql.parser.ddl.SqlTableOption;
 import org.apache.flink.sql.parser.ddl.SqlUseCatalog;
 import org.apache.flink.sql.parser.ddl.SqlUseDatabase;
@@ -154,6 +155,7 @@ import 
org.apache.flink.table.operations.command.RemoveJarOperation;
 import org.apache.flink.table.operations.command.ResetOperation;
 import org.apache.flink.table.operations.command.SetOperation;
 import org.apache.flink.table.operations.command.ShowJarsOperation;
+import org.apache.flink.table.operations.command.StopJobOperation;
 import org.apache.flink.table.operations.ddl.AddPartitionsOperation;
 import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
@@ -374,6 +376,8 @@ public class SqlToOperationConverter {
             return Optional.of(converter.convertSqlQuery(validated));
         } else if (validated instanceof SqlAnalyzeTable) {
             return Optional.of(converter.convertAnalyzeTable((SqlAnalyzeTable) 
validated));
+        } else if (validated instanceof SqlStopJob) {
+            return Optional.of(converter.convertStopJob((SqlStopJob) 
validated));
         } else {
             return Optional.empty();
         }
@@ -1467,6 +1471,11 @@ public class SqlToOperationConverter {
         return new ValueLiteralExpression(value, dataType.notNull());
     }
 
+    private Operation convertStopJob(SqlStopJob sqlStopJob) {
+        return new StopJobOperation(
+                sqlStopJob.getId(), sqlStopJob.isWithSavepoint(), 
sqlStopJob.isWithDrain());
+    }
+
     private void validateTableConstraint(SqlTableConstraint constraint) {
         if (constraint.isUnique()) {
             throw new UnsupportedOperationException("UNIQUE constraint is not 
supported yet");


Reply via email to