This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 7bce4854fac [FLINK-28360][sql-client] Support stop job statement in SQL client (#20159) 7bce4854fac is described below commit 7bce4854faccc0914951832d1dda101afce0105a Author: Paul Lin <paullin3...@gmail.com> AuthorDate: Mon Oct 10 19:21:09 2022 +0800 [FLINK-28360][sql-client] Support stop job statement in SQL client (#20159) --- .../apache/flink/table/client/cli/CliClient.java | 21 ++++ .../apache/flink/table/client/cli/CliStrings.java | 5 + .../flink/table/client/gateway/Executor.java | 6 ++ .../client/gateway/context/ExecutionContext.java | 11 ++ .../table/client/gateway/local/LocalExecutor.java | 112 +++++++++++++++++++ .../flink/table/client/cli/CliClientTest.java | 76 +++++++++++++ .../flink/table/client/cli/CliResultViewTest.java | 8 ++ .../flink/table/client/cli/TestingExecutor.java | 8 ++ .../client/gateway/local/LocalExecutorITCase.java | 118 ++++++++++++++++----- .../src/main/codegen/data/Parser.tdd | 8 ++ .../src/main/codegen/includes/parserImpls.ftl | 45 ++++++++ .../apache/flink/sql/parser/ddl/SqlStopJob.java | 95 +++++++++++++++++ .../flink/sql/parser/utils/ParserResource.java | 3 + .../flink/sql/parser/FlinkSqlParserImplTest.java | 12 +++ .../table/operations/command/StopJobOperation.java | 62 +++++++++++ .../operations/SqlToOperationConverter.java | 9 ++ 16 files changed, 573 insertions(+), 26 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java index f6ade74ae70..fb4b3ffdafd 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java @@ -48,6 +48,7 @@ import org.apache.flink.table.operations.command.QuitOperation; import org.apache.flink.table.operations.command.RemoveJarOperation; import org.apache.flink.table.operations.command.ResetOperation; import org.apache.flink.table.operations.command.SetOperation; +import org.apache.flink.table.operations.command.StopJobOperation; import org.apache.flink.table.operations.ddl.AlterOperation; import org.apache.flink.table.operations.ddl.CreateOperation; import org.apache.flink.table.operations.ddl.DropOperation; @@ -468,6 +469,9 @@ public class CliClient implements AutoCloseable { } else if (operation instanceof CreateTableASOperation) { // CTAS callInsert((CreateTableASOperation) operation); + } else if (operation instanceof StopJobOperation) { + // STOP JOB + callStopJob((StopJobOperation) operation); } else { // fallback to default implementation executeOperation(operation); @@ -635,6 +639,23 @@ public class CliClient implements AutoCloseable { } } + private void callStopJob(StopJobOperation stopJobOperation) { + Optional<String> savepoint = + executor.stopJob( + sessionId, + stopJobOperation.getJobId(), + stopJobOperation.isWithSavepoint(), + stopJobOperation.isWithDrain()); + if (stopJobOperation.isWithSavepoint()) { + Preconditions.checkState(savepoint.isPresent()); + printInfo( + String.format( + CliStrings.MESSAGE_STOP_JOB_WITH_SAVEPOINT_STATEMENT, savepoint.get())); + } else { + printInfo(CliStrings.MESSAGE_STOP_JOB_STATEMENT); + } + } + private void executeOperation(Operation operation) { TableResultInternal result = executor.executeOperation(sessionId, operation); if (TABLE_RESULT_OK == result) { diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java index a8160711aac..43a8782e69f 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java @@ -246,6 +246,11 @@ public final class CliStrings { public static final String MESSAGE_REMOVE_JAR_STATEMENT = "The specified jar is removed from session classloader."; + public static final String MESSAGE_STOP_JOB_WITH_SAVEPOINT_STATEMENT = + "The specified job is stopped with savepoint %s."; + + public static final String MESSAGE_STOP_JOB_STATEMENT = "The specified job is stopped."; + // -------------------------------------------------------------------------------------------- public static final String RESULT_TITLE = "SQL Query Result"; diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java index c4a022e1d0b..0dbe999fd45 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java @@ -30,6 +30,7 @@ import javax.annotation.Nullable; import java.util.List; import java.util.Map; +import java.util.Optional; /** A gateway for communicating with Flink and other external systems. */ public interface Executor { @@ -143,4 +144,9 @@ public interface Executor { /** Remove the JAR resource from the classloader with specified session. */ void removeJar(String sessionId, String jarPath); + + /** Stops a job in the specified session. */ + Optional<String> stopJob( + String sessionId, String jobId, boolean isWithSavepoint, boolean isWithDrain) + throws SqlExecutionException; } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java index c2bf0b1d725..d172de81f2a 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java @@ -36,8 +36,10 @@ import org.apache.flink.table.factories.PlannerFactoryUtil; import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.resource.ResourceManager; import org.apache.flink.util.MutableURLClassLoader; +import org.apache.flink.util.TemporaryClassLoaderContext; import java.lang.reflect.Method; +import java.util.function.Supplier; import static org.apache.flink.table.client.gateway.context.SessionContext.SessionState; @@ -167,4 +169,13 @@ public class ExecutionContext { e); } } + + /** + * Executes the given supplier using the execution context's classloader as thread classloader. + */ + public <R> R wrapClassLoader(Supplier<R> supplier) { + try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) { + return supplier.get(); + } + } } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java index f5e1c630863..125d12667aa 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java @@ -19,7 +19,16 @@ package org.apache.flink.table.client.gateway.local; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.client.cli.ClientOptions; +import org.apache.flink.client.deployment.ClusterClientFactory; +import org.apache.flink.client.deployment.ClusterClientServiceLoader; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.internal.TableEnvironmentInternal; import org.apache.flink.table.api.internal.TableResultInternal; @@ -38,17 +47,22 @@ import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.QueryOperation; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import static org.apache.flink.table.client.cli.CliStrings.MESSAGE_SQL_EXECUTION_ERROR; import static org.apache.flink.util.Preconditions.checkArgument; @@ -69,11 +83,14 @@ public class LocalExecutor implements Executor { private final ResultStore resultStore; private final DefaultContext defaultContext; + private final ClusterClientServiceLoader clusterClientServiceLoader; + /** Creates a local executor for submitting table programs and retrieving results. */ public LocalExecutor(DefaultContext defaultContext) { this.contextMap = new ConcurrentHashMap<>(); this.resultStore = new ResultStore(); this.defaultContext = defaultContext; + this.clusterClientServiceLoader = new DefaultClusterClientServiceLoader(); } @Override @@ -307,4 +324,99 @@ public class LocalExecutor implements Executor { final SessionContext context = getSessionContext(sessionId); context.removeJar(jarUrl); } + + @Override + public Optional<String> stopJob( + String sessionId, String jobId, boolean isWithSavepoint, boolean isWithDrain) + throws SqlExecutionException { + Duration clientTimeout = getSessionConfig(sessionId).get(ClientOptions.CLIENT_TIMEOUT); + try { + return runClusterAction( + sessionId, + clusterClient -> { + if (isWithSavepoint) { + // blocking get savepoint path + try { + String savepoint = + clusterClient + .stopWithSavepoint( + JobID.fromHexString(jobId), + isWithDrain, + null, + SavepointFormatType.DEFAULT) + .get( + clientTimeout.toMillis(), + TimeUnit.MILLISECONDS); + return Optional.of(savepoint); + } catch (Exception e) { + throw new FlinkException( + "Could not stop job " + + jobId + + " in session " + + sessionId + + ".", + e); + } + } else { + clusterClient.cancel(JobID.fromHexString(jobId)); + return Optional.empty(); + } + }); + } catch (Exception e) { + throw new SqlExecutionException( + "Could not stop job " + jobId + " in session " + sessionId + ".", e); + } + } + + /** + * Retrieves the {@link ClusterClient} from the session and runs the given {@link ClusterAction} + * against it. + * + * @param sessionId the specified session ID + * @param clusterAction the cluster action to run against the retrieved {@link ClusterClient}. + * @param <ClusterID> type of the cluster id + * @param <Result>> type of the result + * @throws FlinkException if something goes wrong + */ + private <ClusterID, Result> Result runClusterAction( + String sessionId, ClusterAction<ClusterID, Result> clusterAction) + throws FlinkException { + final SessionContext context = getSessionContext(sessionId); + final Configuration configuration = (Configuration) context.getReadableConfig(); + final ClusterClientFactory<ClusterID> clusterClientFactory = + context.getExecutionContext() + .wrapClassLoader( + () -> + clusterClientServiceLoader.getClusterClientFactory( + configuration)); + + final ClusterID clusterId = clusterClientFactory.getClusterId(configuration); + Preconditions.checkNotNull(clusterId, "No cluster ID found for session " + sessionId); + + try (final ClusterDescriptor<ClusterID> clusterDescriptor = + clusterClientFactory.createClusterDescriptor(configuration); + final ClusterClient<ClusterID> clusterClient = + clusterDescriptor.retrieve(clusterId).getClusterClient()) { + return clusterAction.runAction(clusterClient); + } + } + + /** + * Internal interface to encapsulate cluster actions which are executed via the {@link + * ClusterClient}. + * + * @param <ClusterID> type of the cluster id + * @param <Result>> type of the result + */ + @FunctionalInterface + private interface ClusterAction<ClusterID, Result> { + + /** + * Run the cluster action with the given {@link ClusterClient}. + * + * @param clusterClient to run the cluster action against + * @throws FlinkException if something goes wrong + */ + Result runAction(ClusterClient<ClusterID> clusterClient) throws FlinkException; + } } diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java index a1b6e0069f6..b4c4e595fe8 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java @@ -70,6 +70,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC; import static org.apache.flink.table.client.cli.CliClient.DEFAULT_TERMINAL_FACTORY; @@ -356,6 +359,58 @@ public class CliClientTest extends TestLogger { } } + @Test(timeout = 10000) + public void testStopJob() throws Exception { + final MockExecutor mockExecutor = new MockExecutor(); + mockExecutor.isSync = false; + + String sessionId = mockExecutor.openSession("test-session"); + OutputStream outputStream = new ByteArrayOutputStream(256); + try (CliClient client = + new CliClient( + () -> TerminalUtils.createDumbTerminal(outputStream), + sessionId, + mockExecutor, + historyTempFile(), + null)) { + client.executeInNonInteractiveMode(INSERT_INTO_STATEMENT); + String dmlResult = outputStream.toString(); + String jobId = extractJobId(dmlResult); + client.executeInNonInteractiveMode("STOP JOB '" + jobId + "'"); + String stopResult = outputStream.toString(); + assertThat(stopResult).contains(CliStrings.MESSAGE_STOP_JOB_STATEMENT); + } + } + + @Test(timeout = 10000) + public void testStopJobWithSavepoint() throws Exception { + final MockExecutor mockExecutor = new MockExecutor(); + mockExecutor.isSync = false; + final String mockSavepoint = "/my/savepoint/path"; + mockExecutor.savepoint = mockSavepoint; + + String sessionId = mockExecutor.openSession("test-session"); + OutputStream outputStream = new ByteArrayOutputStream(256); + try (CliClient client = + new CliClient( + () -> TerminalUtils.createDumbTerminal(outputStream), + sessionId, + mockExecutor, + historyTempFile(), + null)) { + client.executeInNonInteractiveMode(INSERT_INTO_STATEMENT); + String dmlResult = outputStream.toString(); + String jobId = extractJobId(dmlResult); + client.executeInNonInteractiveMode("STOP JOB '" + jobId + "' WITH SAVEPOINT"); + String stopResult = outputStream.toString(); + assertThat(stopResult) + .contains( + String.format( + CliStrings.MESSAGE_STOP_JOB_WITH_SAVEPOINT_STATEMENT, + mockSavepoint)); + } + } + // -------------------------------------------------------------------------------------------- private void verifyUpdateSubmission( @@ -419,11 +474,21 @@ public class CliClientTest extends TestLogger { return outputStream.toString(); } + private String extractJobId(String result) { + Pattern pattern = Pattern.compile("[\\s\\S]*Job ID: (.*)[\\s\\S]*"); + Matcher matcher = pattern.matcher(result); + if (!matcher.matches()) { + throw new IllegalStateException("No job ID found in string: " + result); + } + return matcher.group(1); + } + // -------------------------------------------------------------------------------------------- private static class MockExecutor implements Executor { public boolean failExecution; + public String savepoint; public volatile boolean isSync = false; public volatile boolean isAwait = false; @@ -595,5 +660,16 @@ public class CliClientTest extends TestLogger { public void removeJar(String sessionId, String jarUrl) { throw new UnsupportedOperationException("Not implemented."); } + + @Override + public Optional<String> stopJob( + String sessionId, String jobId, boolean isWithSavepoint, boolean isWithDrain) + throws SqlExecutionException { + if (isWithSavepoint) { + return Optional.of(savepoint); + } else { + return Optional.empty(); + } + } } } diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java index e2b7bb8e25e..bba5f9cb5c2 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java @@ -49,6 +49,7 @@ import java.nio.file.Path; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -245,6 +246,13 @@ public class CliResultViewTest { public void removeJar(String sessionId, String jarUrl) { throw new UnsupportedOperationException("Not implemented."); } + + @Override + public Optional<String> stopJob( + String sessionId, String jobId, boolean isWithSavepoint, boolean isWithDrain) + throws SqlExecutionException { + throw new UnsupportedOperationException("Not implemented."); + } } private static final class TestingCliResultView implements Runnable { diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java index b4c68f6951c..55c6413a976 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java @@ -35,6 +35,7 @@ import javax.annotation.Nullable; import java.util.List; import java.util.Map; +import java.util.Optional; /** A customizable {@link Executor} for testing purposes. */ class TestingExecutor implements Executor { @@ -71,6 +72,13 @@ class TestingExecutor implements Executor { throw new UnsupportedOperationException("Not implemented."); } + @Override + public Optional<String> stopJob( + String sessionId, String jobId, boolean isWithSavepoint, boolean isWithDrain) + throws SqlExecutionException { + throw new UnsupportedOperationException("Not implemented."); + } + @Override public TypedResult<List<RowData>> retrieveResultChanges(String sessionId, String resultId) throws SqlExecutionException { diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java index 7c7a5062136..73b03112fcf 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java @@ -19,16 +19,22 @@ package org.apache.flink.table.client.gateway.local; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.client.cli.DefaultCLI; import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.StateBackendOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.WebOptions; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.client.config.ResultMode; import org.apache.flink.table.client.gateway.Executor; import org.apache.flink.table.client.gateway.ResultDescriptor; @@ -41,16 +47,19 @@ import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.utils.UserDefinedFunctions; import org.apache.flink.table.utils.print.RowDataToStringConverter; -import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.test.junit5.InjectClusterClient; +import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.StringUtils; import org.apache.flink.util.TestLogger; import org.apache.flink.util.UserClassLoaderJarTestUtils; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.io.IOException; @@ -63,6 +72,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -81,28 +91,33 @@ public class LocalExecutorITCase extends TestLogger { private static final int NUM_TMS = 2; private static final int NUM_SLOTS_PER_TM = 2; - @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder(); - - @ClassRule - public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = - new MiniClusterWithClientResource( - new MiniClusterResourceConfiguration.Builder() - .setConfiguration(getConfig()) - .setNumberTaskManagers(NUM_TMS) - .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM) - .build()); + @TempDir + @Order(1) + public static File tempFolder; + + @RegisterExtension + @Order(2) + public static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + () -> + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfig()) + .setNumberTaskManagers(NUM_TMS) + .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM) + .build()); private static ClusterClient<?> clusterClient; // a generated UDF jar used for testing classloading of dependencies private static URL udfDependency; - @BeforeClass - public static void setup() throws IOException { - clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient(); + @BeforeAll + public static void setup(@InjectClusterClient ClusterClient<?> injectedClusterClient) + throws Exception { + clusterClient = injectedClusterClient; File udfJar = UserClassLoaderJarTestUtils.createJarFile( - tempFolder.newFolder("test-jar"), + tempFolder, "test-classloader-udf.jar", GENERATED_LOWER_UDF_CLASS, String.format(GENERATED_LOWER_UDF_CODE, GENERATED_LOWER_UDF_CLASS)); @@ -115,6 +130,10 @@ public class LocalExecutorITCase extends TestLogger { config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); config.setBoolean(WebOptions.SUBMIT_ENABLE, false); + config.set(StateBackendOptions.STATE_BACKEND, "hashmap"); + config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); + config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, tempFolder.toURI().toString()); + config.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, tempFolder.toURI().toString()); return config; } @@ -144,7 +163,8 @@ public class LocalExecutorITCase extends TestLogger { executor.closeSession(sessionId); } - @Test(timeout = 90_000L) + @Test + @Timeout(value = 90) public void testStreamQueryExecutionChangelog() throws Exception { final URL url = getClass().getClassLoader().getResource("test-data.csv"); Objects.requireNonNull(url); @@ -191,7 +211,8 @@ public class LocalExecutorITCase extends TestLogger { } } - @Test(timeout = 90_000L) + @Test + @Timeout(value = 90) public void testStreamQueryExecutionChangelogMultipleTimes() throws Exception { final URL url = getClass().getClassLoader().getResource("test-data.csv"); Objects.requireNonNull(url); @@ -240,7 +261,8 @@ public class LocalExecutorITCase extends TestLogger { } } - @Test(timeout = 90_000L) + @Test + @Timeout(value = 90) public void testStreamQueryExecutionTable() throws Exception { final URL url = getClass().getClassLoader().getResource("test-data.csv"); Objects.requireNonNull(url); @@ -265,7 +287,8 @@ public class LocalExecutorITCase extends TestLogger { executeStreamQueryTable(replaceVars, configMap, query, expectedResults); } - @Test(timeout = 90_000L) + @Test + @Timeout(value = 90) public void testStreamQueryExecutionTableMultipleTimes() throws Exception { final URL url = getClass().getClassLoader().getResource("test-data.csv"); Objects.requireNonNull(url); @@ -291,7 +314,8 @@ public class LocalExecutorITCase extends TestLogger { } } - @Test(timeout = 90_000L) + @Test + @Timeout(value = 90) public void testStreamQueryExecutionLimitedTable() throws Exception { final URL url = getClass().getClassLoader().getResource("test-data.csv"); Objects.requireNonNull(url); @@ -312,7 +336,8 @@ public class LocalExecutorITCase extends TestLogger { executeStreamQueryTable(replaceVars, configMap, query, expectedResults); } - @Test(timeout = 90_000L) + @Test + @Timeout(value = 90) public void testBatchQueryExecution() throws Exception { final URL url = getClass().getClassLoader().getResource("test-data.csv"); Objects.requireNonNull(url); @@ -358,7 +383,8 @@ public class LocalExecutorITCase extends TestLogger { } } - @Test(timeout = 90_000L) + @Test + @Timeout(value = 90) public void testBatchQueryExecutionMultipleTimes() throws Exception { final URL url = getClass().getClassLoader().getResource("test-data.csv"); Objects.requireNonNull(url); @@ -406,6 +432,46 @@ public class LocalExecutorITCase extends TestLogger { } } + @Test + @Timeout(value = 90) + public void testStopJob() throws Exception { + final Map<String, String> configMap = new HashMap<>(); + configMap.put(EXECUTION_RESULT_MODE.key(), ResultMode.TABLE.name()); + configMap.put(RUNTIME_MODE.key(), RuntimeExecutionMode.STREAMING.name()); + configMap.put(TableConfigOptions.TABLE_DML_SYNC.key(), "false"); + + final LocalExecutor executor = + createLocalExecutor( + Collections.singletonList(udfDependency), Configuration.fromMap(configMap)); + String sessionId = executor.openSession("test-session"); + + final String srcDdl = "CREATE TABLE src (a STRING) WITH ('connector' = 'datagen')"; + final String snkDdl = "CREATE TABLE snk (a STRING) WITH ('connector' = 'blackhole')"; + final String insert = "INSERT INTO snk SELECT a FROM src;"; + + try { + executor.executeOperation(sessionId, executor.parseStatement(sessionId, srcDdl)); + executor.executeOperation(sessionId, executor.parseStatement(sessionId, snkDdl)); + TableResult result = + executor.executeOperation( + sessionId, executor.parseStatement(sessionId, insert)); + JobClient jobClient = result.getJobClient().get(); + JobID jobId = jobClient.getJobID(); + + // wait till the job turns into running status or the test times out + JobStatus jobStatus; + do { + Thread.sleep(2_000L); + jobStatus = jobClient.getJobStatus().get(); + } while (jobStatus != JobStatus.RUNNING); + + Optional<String> savepoint = executor.stopJob(sessionId, jobId.toString(), true, true); + assertThat(savepoint.isPresent()).isTrue(); + } finally { + executor.closeSession(sessionId); + } + } + // -------------------------------------------------------------------------------------------- // Helper method // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index 8c87f645d02..a775b699480 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -102,6 +102,7 @@ "org.apache.flink.sql.parser.dql.SqlRichDescribeTable" "org.apache.flink.sql.parser.dql.SqlUnloadModule" "org.apache.flink.sql.parser.expr.SqlUnresolvedTryCastFunction" + "org.apache.flink.sql.parser.ddl.SqlStopJob" "org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec" "org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec" "org.apache.flink.sql.parser.type.SqlMapTypeNameSpec" @@ -132,6 +133,7 @@ "COMPILE" "COLUMNS" "DATABASES" + "DRAIN" "ENFORCED" "ESTIMATED_COST" "EXTENDED" @@ -140,6 +142,7 @@ "JSON_EXECUTION_PLAN" "JAR" "JARS" + "JOB" "LOAD" "METADATA" "MODIFY" @@ -153,6 +156,7 @@ "REMOVE" "RENAME" "SCALA" + "STOP" "STRING" "TABLES" "UNLOAD" @@ -244,6 +248,7 @@ "DOMAIN" "DOW" "DOY" + "DRAIN" "DYNAMIC_FUNCTION" "DYNAMIC_FUNCTION_CODE" "ENCODING" @@ -285,6 +290,7 @@ "ISOYEAR" "JAR" "JARS" + "JOB" "JAVA" "JSON" "K" @@ -444,6 +450,7 @@ "SQL_VARBINARY" "SQL_VARCHAR" "STATE" + "STOP" "STRUCTURE" "STYLE" "SUBCLASS_ORIGIN" @@ -557,6 +564,7 @@ "SqlSet()" "SqlReset()" "SqlAnalyzeTable()" + "SqlStopJob()" ] # List of methods for parsing custom literals. diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index 19677305cd2..797248abf79 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -2325,3 +2325,48 @@ SqlNode SqlAnalyzeTable(): return new SqlAnalyzeTable(s.end(this), tableName, partitionSpec, columns, allColumns); } } + +/** +* Parses a STOP JOB statement: +* STOP JOB <JOB_ID> [<WITH SAVEPOINT>] [<WITH DRAIN>]; +*/ +SqlStopJob SqlStopJob() : +{ + SqlCharStringLiteral jobId; + boolean isWithSavepoint = false; + boolean isWithDrain = false; + final Span span; +} +{ + <STOP> <JOB> <QUOTED_STRING> + { + String id = SqlParserUtil.parseString(token.image); + jobId = SqlLiteral.createCharString(id, getPos()); + } + [ + LOOKAHEAD(2) + <WITH> <SAVEPOINT> + { + isWithSavepoint = true; + } + ] + [ + LOOKAHEAD(2) + <WITH> + { + span = span(); + } + <DRAIN> + { + span.end(this); + if (!isWithSavepoint) { + throw SqlUtil.newContextException(span.pos(), + ParserResource.RESOURCE.withDrainOnlyUsedWithSavepoint()); + } + isWithDrain = true; + } + ] + { + return new SqlStopJob(getPos(), jobId, isWithSavepoint, isWithDrain); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlStopJob.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlStopJob.java new file mode 100644 index 00000000000..a50a03d8154 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlStopJob.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.NlsString; + +import javax.annotation.Nonnull; + +import java.util.Collections; +import java.util.List; + +/** The command to stop a flink job. */ +public class SqlStopJob extends SqlCall { + + public static final SqlOperator OPERATOR = + new SqlSpecialOperator("STOP JOB", SqlKind.OTHER_DDL); + + private final SqlCharStringLiteral jobId; + + private final boolean isWithDrain; + + private final boolean isWithSavepoint; + + public SqlStopJob( + SqlParserPos pos, + SqlCharStringLiteral jobId, + boolean isWithSavepoint, + boolean isWithDrain) { + super(pos); + this.jobId = jobId; + this.isWithSavepoint = isWithSavepoint; + this.isWithDrain = isWithDrain; + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("STOP"); + writer.keyword("JOB"); + jobId.unparse(writer, leftPrec, rightPrec); + if (isWithSavepoint) { + writer.keyword("WITH SAVEPOINT"); + } + if (isWithDrain) { + writer.keyword("WITH DRAIN"); + } + } + + @Nonnull + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Nonnull + @Override + public List<SqlNode> getOperandList() { + return Collections.singletonList(jobId); + } + + public String getId() { + return jobId.getValueAs(NlsString.class).getValue(); + } + + public boolean isWithSavepoint() { + return isWithSavepoint; + } + + public boolean isWithDrain() { + return isWithDrain; + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java index 8fa96b90ae2..8a63777f543 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java @@ -43,4 +43,7 @@ public interface ParserResource { @Resources.BaseMessage("CREATE FUNCTION USING JAR syntax is not applicable to {0} language.") Resources.ExInst<ParseException> createFunctionUsingJar(String language); + + @Resources.BaseMessage("WITH DRAIN could only be used after WITH SAVEPOINT.") + Resources.ExInst<ParseException> withDrainOnlyUsedWithSavepoint(); } diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index b6dd3fdf74e..27d57acc88f 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -1950,6 +1950,18 @@ class FlinkSqlParserImplTest extends SqlParserTest { "CREATE TABLE AS SELECT syntax does not support to create partitioned table yet.")); } + @Test + void testStopJob() { + sql("STOP JOB 'myjob'").ok("STOP JOB 'myjob'"); + sql("STOP JOB 'myjob' WITH SAVEPOINT").ok("STOP JOB 'myjob' WITH SAVEPOINT"); + sql("STOP JOB 'myjob' WITH SAVEPOINT WITH DRAIN") + .ok("STOP JOB 'myjob' WITH SAVEPOINT WITH DRAIN"); + sql("STOP JOB 'myjob' ^WITH DRAIN^") + .fails("WITH DRAIN could only be used after WITH SAVEPOINT."); + sql("STOP JOB 'myjob' ^WITH DRAIN^ WITH SAVEPOINT") + .fails("WITH DRAIN could only be used after WITH SAVEPOINT."); + } + public static BaseMatcher<SqlNode> validated(String validatedSql) { return new TypeSafeDiagnosingMatcher<SqlNode>() { @Override diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/StopJobOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/StopJobOperation.java new file mode 100644 index 00000000000..dcb56c20ab6 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/StopJobOperation.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.command; + +import org.apache.flink.table.operations.Operation; + +/** Operation to stop a running job. */ +public class StopJobOperation implements Operation { + + private final String jobId; + + private final boolean isWithSavepoint; + + private final boolean isWithDrain; + + public StopJobOperation(String jobId, boolean isWithSavepoint, boolean isWithDrain) { + this.jobId = jobId; + this.isWithSavepoint = isWithSavepoint; + this.isWithDrain = isWithDrain; + } + + public String getJobId() { + return jobId; + } + + public boolean isWithSavepoint() { + return isWithSavepoint; + } + + public boolean isWithDrain() { + return isWithDrain; + } + + @Override + public String asSummaryString() { + StringBuilder summary = new StringBuilder("STOP JOB "); + summary.append("'").append(jobId).append("'"); + if (isWithSavepoint) { + summary.append(" WITH SAVEPOINT"); + } + if (isWithDrain) { + summary.append(" WITH DRAIN"); + } + return summary.toString(); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java index 2e137b3ebeb..c1860e5e9c1 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java @@ -52,6 +52,7 @@ import org.apache.flink.sql.parser.ddl.SqlDropView; import org.apache.flink.sql.parser.ddl.SqlRemoveJar; import org.apache.flink.sql.parser.ddl.SqlReset; import org.apache.flink.sql.parser.ddl.SqlSet; +import org.apache.flink.sql.parser.ddl.SqlStopJob; import org.apache.flink.sql.parser.ddl.SqlTableOption; import org.apache.flink.sql.parser.ddl.SqlUseCatalog; import org.apache.flink.sql.parser.ddl.SqlUseDatabase; @@ -154,6 +155,7 @@ import org.apache.flink.table.operations.command.RemoveJarOperation; import org.apache.flink.table.operations.command.ResetOperation; import org.apache.flink.table.operations.command.SetOperation; import org.apache.flink.table.operations.command.ShowJarsOperation; +import org.apache.flink.table.operations.command.StopJobOperation; import org.apache.flink.table.operations.ddl.AddPartitionsOperation; import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation; import org.apache.flink.table.operations.ddl.AlterDatabaseOperation; @@ -374,6 +376,8 @@ public class SqlToOperationConverter { return Optional.of(converter.convertSqlQuery(validated)); } else if (validated instanceof SqlAnalyzeTable) { return Optional.of(converter.convertAnalyzeTable((SqlAnalyzeTable) validated)); + } else if (validated instanceof SqlStopJob) { + return Optional.of(converter.convertStopJob((SqlStopJob) validated)); } else { return Optional.empty(); } @@ -1467,6 +1471,11 @@ public class SqlToOperationConverter { return new ValueLiteralExpression(value, dataType.notNull()); } + private Operation convertStopJob(SqlStopJob sqlStopJob) { + return new StopJobOperation( + sqlStopJob.getId(), sqlStopJob.isWithSavepoint(), sqlStopJob.isWithDrain()); + } + private void validateTableConstraint(SqlTableConstraint constraint) { if (constraint.isUnique()) { throw new UnsupportedOperationException("UNIQUE constraint is not supported yet");