[FLINK-2111] Add "stop" signal to cleanly shutdown streaming jobs
- added JobType to JobGraph and ExecutionGraph - added interface Stoppable, applied to SourceStreamTask - added STOP signal logic to JobManager, TaskManager, ExecutionGraph - extended Client to support stop - extended Cli frontend, JobManager frontend - updated documenation Fix JobManagerTest.testStopSignal and testStopSignalFail The StoppableInvokable could not be instantiated by Task because it was declared as a private class. Adds additional checks to verify that the stop signal behaves correctly. Auto-detect if job is stoppable A job is stoppable iff all sources are stoppable - Replace JobType by stoppable flag - Add StoppableFunction and StoppableInvokable to support the optional stop operation - added REST get/delete test (no extra YARN test -- think not required as get/delete is both tested) - bug fix: job got canceld instead of stopped in web interface - Add StoppingException - Allow to stop jobs when they are not in state RUNNING Second round of Till's comments Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bdd4024e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bdd4024e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bdd4024e Branch: refs/heads/master Commit: bdd4024e20fdfb0accb6121a68780ce3a0c218c0 Parents: 5eae47f Author: mjsax <mj...@informatik.hu-berlin.de> Authored: Sat Sep 26 13:14:43 2015 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Mon Feb 15 16:16:13 2016 +0100 ---------------------------------------------------------------------- docs/apis/cli.md | 14 + .../org/apache/flink/client/CliFrontend.java | 86 +- .../flink/client/cli/CliFrontendParser.java | 41 +- .../apache/flink/client/cli/StopOptions.java | 37 + .../org/apache/flink/client/program/Client.java | 59 +- .../client/program/ProgramStopException.java | 53 + .../flink/client/CliFrontendStopTest.java | 159 ++ .../org/apache/flink/storm/api/FlinkClient.java | 6 +- .../flink/storm/wrappers/SpoutWrapper.java | 13 +- .../api/common/functions/StoppableFunction.java | 33 + flink-runtime-web/pom.xml | 14 + .../runtime/webmonitor/WebRuntimeMonitor.java | 11 +- .../handlers/JobCancellationHandler.java | 3 + .../webmonitor/handlers/JobDetailsHandler.java | 1 + .../webmonitor/handlers/JobStoppingHandler.java | 49 + .../webmonitor/testutils/HttpTestClient.java | 19 + .../web-dashboard/app/partials/jobs/job.jade | 4 + .../app/scripts/modules/jobs/jobs.ctrl.coffee | 5 + .../app/scripts/modules/jobs/jobs.svc.coffee | 5 + .../web-dashboard/web/css/vendor.css | 1 + .../web-dashboard/web/js/vendor.js | 2053 +++++++++++------- .../web-dashboard/web/partials/jobs/job.html | 1 + .../apache/flink/runtime/StoppingException.java | 35 + .../flink/runtime/executiongraph/Execution.java | 93 +- .../runtime/executiongraph/ExecutionGraph.java | 93 +- .../runtime/executiongraph/ExecutionVertex.java | 5 +- .../apache/flink/runtime/jobgraph/JobGraph.java | 1 + .../flink/runtime/jobgraph/JobVertex.java | 85 +- .../runtime/jobgraph/tasks/StoppableTask.java | 25 + .../apache/flink/runtime/taskmanager/Task.java | 60 +- .../flink/runtime/jobmanager/JobManager.scala | 27 + .../runtime/messages/JobManagerMessages.scala | 25 + .../runtime/messages/TaskControlMessages.scala | 9 + .../flink/runtime/taskmanager/TaskManager.scala | 22 + .../ExecutionGraphConstructionTest.java | 97 +- .../ExecutionGraphDeploymentTest.java | 27 +- .../ExecutionGraphSignalsTest.java | 224 ++ .../executiongraph/ExecutionGraphTestUtils.java | 12 +- .../ExecutionStateProgressTest.java | 173 +- .../executiongraph/ExecutionVertexStopTest.java | 132 ++ .../executiongraph/LocalInputSplitsTest.java | 25 +- .../executiongraph/PointwisePatternTest.java | 84 +- .../runtime/jobmanager/JobManagerTest.java | 107 +- .../runtime/taskmanager/TaskManagerTest.java | 131 +- .../flink/runtime/taskmanager/TaskStopTest.java | 114 + .../runtime/testutils/StoppableInvokable.java | 32 + .../environment/StreamExecutionEnvironment.java | 9 +- .../flink/streaming/api/graph/StreamGraph.java | 7 +- .../api/operators/StoppableStreamSource.java | 51 + .../tasks/StoppableSourceStreamTask.java | 33 + .../runtime/tasks/SourceStreamTaskTest.java | 53 +- flink-tests/pom.xml | 16 + .../flink/test/web/WebFrontendITCase.java | 89 +- 53 files changed, 3389 insertions(+), 1174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/docs/apis/cli.md ---------------------------------------------------------------------- diff --git a/docs/apis/cli.md b/docs/apis/cli.md index c9145bc..421ed94 100644 --- a/docs/apis/cli.md +++ b/docs/apis/cli.md @@ -108,6 +108,10 @@ The command line can be used to ./bin/flink cancel <jobID> +- Stop a job (streaming jobs only): + + ./bin/flink stop <jobID> + ### Savepoints [Savepoints]({{site.baseurl}}/apis/streaming/savepoints.html) are controlled via the command line client: @@ -248,6 +252,16 @@ Action "cancel" cancels a running program. configuration. +Action "stop" stops a running program (streaming jobs only). + + Syntax: stop [OPTIONS] <Job ID> + "stop" action options: + -m,--jobmanager <host:port> Address of the JobManager (master) to which + to connect. Use this flag to connect to a + different JobManager than the one specified + in the configuration. + + Action "savepoint" triggers savepoints for a running job or disposes existing ones. Syntax: savepoint [OPTIONS] <Job ID> http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index da91bca..98bf056 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -36,6 +36,7 @@ import org.apache.flink.client.cli.ListOptions; import org.apache.flink.client.cli.ProgramOptions; import org.apache.flink.client.cli.RunOptions; import org.apache.flink.client.cli.SavepointOptions; +import org.apache.flink.client.cli.StopOptions; import org.apache.flink.client.program.Client; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.ProgramInvocationException; @@ -56,7 +57,10 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob; +import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure; import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus; +import org.apache.flink.runtime.messages.JobManagerMessages.StopJob; +import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure; import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint; import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess; import org.apache.flink.runtime.security.SecurityUtils; @@ -110,6 +114,7 @@ public class CliFrontend { public static final String ACTION_INFO = "info"; private static final String ACTION_LIST = "list"; private static final String ACTION_CANCEL = "cancel"; + private static final String ACTION_STOP = "stop"; private static final String ACTION_SAVEPOINT = "savepoint"; // config dir parameters @@ -290,9 +295,6 @@ public class CliFrontend { catch (FileNotFoundException e) { return handleArgException(e); } - catch (ProgramInvocationException e) { - return handleError(e); - } catch (Throwable t) { return handleError(t); } @@ -362,7 +364,7 @@ public class CliFrontend { /** * Executes the info action. * - * @param args Command line arguments for the info action. + * @param args Command line arguments for the info action. */ protected int info(String[] args) { LOG.info("Running 'info' command."); @@ -568,6 +570,65 @@ public class CliFrontend { } /** + * Executes the STOP action. + * + * @param args Command line arguments for the stop action. + */ + protected int stop(String[] args) { + LOG.info("Running 'stop' command."); + + StopOptions options; + try { + options = CliFrontendParser.parseStopCommand(args); + } + catch (CliArgsException e) { + return handleArgException(e); + } + catch (Throwable t) { + return handleError(t); + } + + // evaluate help flag + if (options.isPrintHelp()) { + CliFrontendParser.printHelpForStop(); + return 0; + } + + String[] stopArgs = options.getArgs(); + JobID jobId; + + if (stopArgs.length > 0) { + String jobIdString = stopArgs[0]; + try { + jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); + } + catch (Exception e) { + return handleError(e); + } + } + else { + return handleArgException(new CliArgsException("Missing JobID")); + } + + try { + ActorGateway jobManager = getJobManagerGateway(options); + Future<Object> response = jobManager.ask(new StopJob(jobId), clientTimeout); + + final Object rc = Await.result(response, clientTimeout); + + if (rc instanceof StoppingFailure) { + throw new Exception("Stopping the job with ID " + jobId + " failed.", + ((StoppingFailure) rc).cause()); + } + + return 0; + } + catch (Throwable t) { + return handleError(t); + } + } + + /** * Executes the CANCEL action. * * @param args Command line arguments for the cancel action. @@ -616,13 +677,14 @@ public class CliFrontend { ActorGateway jobManager = getJobManagerGateway(options); Future<Object> response = jobManager.ask(new CancelJob(jobId), clientTimeout); - try { - Await.result(response, clientTimeout); - return 0; - } - catch (Exception e) { - throw new Exception("Canceling the job with ID " + jobId + " failed.", e); + final Object rc = Await.result(response, clientTimeout); + + if (rc instanceof CancellationFailure) { + throw new Exception("Canceling the job with ID " + jobId + " failed.", + ((CancellationFailure) rc).cause()); } + + return 0; } catch (Throwable t) { return handleError(t); @@ -1123,6 +1185,8 @@ public class CliFrontend { return info(params); case ACTION_CANCEL: return cancel(params); + case ACTION_STOP: + return stop(params); case ACTION_SAVEPOINT: return savepoint(params); case "-h": @@ -1139,7 +1203,7 @@ public class CliFrontend { default: System.out.printf("\"%s\" is not a valid action.\n", action); System.out.println(); - System.out.println("Valid actions are \"run\", \"list\", \"info\", or \"cancel\"."); + System.out.println("Valid actions are \"run\", \"list\", \"info\", \"stop\", or \"cancel\"."); System.out.println(); System.out.println("Specify the version option (-v or --version) to print Flink version."); System.out.println(); http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java index 07d409e..2ac53d2 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java @@ -39,13 +39,13 @@ public class CliFrontendParser { static final Option HELP_OPTION = new Option("h", "help", false, - "Show the help message for the CLI Frontend or the action."); + "Show the help message for the CLI Frontend or the action."); static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file."); public static final Option CLASS_OPTION = new Option("c", "class", true, "Class with the program entry point (\"main\" method or \"getPlan()\" method. Only needed if the " + - "JAR file does not specify the class in its manifest."); + "JAR file does not specify the class in its manifest."); static final Option CLASSPATH_OPTION = new Option("C", "classpath", true, "Adds a URL to each user code " + "classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be " + @@ -55,7 +55,7 @@ public class CliFrontendParser { static final Option PARALLELISM_OPTION = new Option("p", "parallelism", true, "The parallelism with which to run the program. Optional flag to override the default value " + - "specified in the configuration."); + "specified in the configuration."); static final Option LOGGING_OPTION = new Option("q", "sysoutLogging", false, "If present, " + "supress logging output to standard out."); @@ -67,9 +67,9 @@ public class CliFrontendParser { "Program arguments. Arguments can also be added without -a, simply as trailing parameters."); static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true, - "Address of the JobManager (master) to which to connect. Specify '" + CliFrontend.YARN_DEPLOY_JOBMANAGER - + "' as the JobManager to deploy a YARN cluster for the job. Use this flag to connect to a " + - "different JobManager than the one specified in the configuration."); + "Address of the JobManager (master) to which to connect. Specify '" + CliFrontend.YARN_DEPLOY_JOBMANAGER + + "' as the JobManager to deploy a YARN cluster for the job. Use this flag to connect to a " + + "different JobManager than the one specified in the configuration."); static final Option SAVEPOINT_PATH_OPTION = new Option("s", "fromSavepoint", true, "Path to a savepoint to reset the job back to (for example file:///flink/savepoint-1537)."); @@ -123,6 +123,7 @@ public class CliFrontendParser { private static final Options INFO_OPTIONS = getInfoOptions(buildGeneralOptions(new Options())); private static final Options LIST_OPTIONS = getListOptions(buildGeneralOptions(new Options())); private static final Options CANCEL_OPTIONS = getCancelOptions(buildGeneralOptions(new Options())); + private static final Options STOP_OPTIONS = getStopOptions(buildGeneralOptions(new Options())); private static final Options SAVEPOINT_OPTIONS = getSavepointOptions(buildGeneralOptions(new Options())); private static Options buildGeneralOptions(Options options) { @@ -197,6 +198,11 @@ public class CliFrontendParser { return options; } + private static Options getStopOptions(Options options) { + options = getJobManagerAddressOption(options); + return options; + } + private static Options getSavepointOptions(Options options) { options = getJobManagerAddressOption(options); options.addOption(SAVEPOINT_DISPOSE_OPTION); @@ -218,6 +224,7 @@ public class CliFrontendParser { printHelpForRun(); printHelpForInfo(); printHelpForList(); + printHelpForStop(); printHelpForCancel(); printHelpForSavepoint(); @@ -264,6 +271,18 @@ public class CliFrontendParser { System.out.println(); } + public static void printHelpForStop() { + HelpFormatter formatter = new HelpFormatter(); + formatter.setLeftPadding(5); + formatter.setWidth(80); + + System.out.println("\nAction \"stop\" stops a running program (streaming jobs only)."); + System.out.println("\n Syntax: stop [OPTIONS] <Job ID>"); + formatter.setSyntaxPrefix(" \"stop\" action options:"); + formatter.printHelp(" ", getStopOptions(new Options())); + System.out.println(); + } + public static void printHelpForCancel() { HelpFormatter formatter = new HelpFormatter(); formatter.setLeftPadding(5); @@ -325,6 +344,16 @@ public class CliFrontendParser { } } + public static StopOptions parseStopCommand(String[] args) throws CliArgsException { + try { + PosixParser parser = new PosixParser(); + CommandLine line = parser.parse(STOP_OPTIONS, args, false); + return new StopOptions(line); + } catch (ParseException e) { + throw new CliArgsException(e.getMessage()); + } + } + public static SavepointOptions parseSavepointCommand(String[] args) throws CliArgsException { try { PosixParser parser = new PosixParser(); http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java new file mode 100644 index 0000000..7f246c8 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.client.cli; + +import org.apache.commons.cli.CommandLine; + +/** + * Command line options for the STOP command + */ +public class StopOptions extends CommandLineOptions { + + private final String[] args; + + public StopOptions(CommandLine line) { + super(line); + this.args = line.getArgs(); + } + + public String[] getArgs() { + return args == null ? new String[0] : args; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java index 452710c..999b461 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java @@ -29,7 +29,6 @@ import com.google.common.base.Preconditions; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.accumulators.AccumulatorHelper; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.Plan; import org.apache.flink.optimizer.CompilerException; @@ -43,16 +42,17 @@ import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.client.JobClient; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous; import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound; import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -413,25 +413,60 @@ public class Client { * @throws Exception In case an error occurred. */ public void cancel(JobID jobId) throws Exception { - ActorGateway jobManagerGateway = getJobManagerGateway(); + final ActorGateway jobManagerGateway = getJobManagerGateway(); - Future<Object> response; + final Future<Object> response; try { response = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout); - } catch (Exception e) { + } catch (final Exception e) { throw new ProgramInvocationException("Failed to query the job manager gateway.", e); } - Object result = Await.result(response, timeout); + final Object result = Await.result(response, timeout); if (result instanceof JobManagerMessages.CancellationSuccess) { - LOG.debug("Job cancellation with ID " + jobId + " succeeded."); + LOG.info("Job cancellation with ID " + jobId + " succeeded."); } else if (result instanceof JobManagerMessages.CancellationFailure) { - Throwable t = ((JobManagerMessages.CancellationFailure) result).cause(); - LOG.debug("Job cancellation with ID " + jobId + " failed.", t); + final Throwable t = ((JobManagerMessages.CancellationFailure) result).cause(); + LOG.info("Job cancellation with ID " + jobId + " failed.", t); throw new Exception("Failed to cancel the job because of \n" + t.getMessage()); } else { - throw new Exception("Unknown message received while cancelling."); + throw new Exception("Unknown message received while cancelling: " + result.getClass().getName()); + } + } + + /** + * Stops a program on Flink cluster whose job-manager is configured in this client's configuration. + * Stopping works only for streaming programs. Be aware, that the program might continue to run for + * a while after sending the stop command, because after sources stopped to emit data all operators + * need to finish processing. + * + * @param jobId + * the job ID of the streaming program to stop + * @throws ProgramStopException + * If the job ID is invalid (ie, is unknown or refers to a batch job) or if sending the stop signal + * failed. That might be due to an I/O problem, ie, the job-manager is unreachable. + */ + public void stop(final JobID jobId) throws Exception { + final ActorGateway jobManagerGateway = getJobManagerGateway(); + + final Future<Object> response; + try { + response = jobManagerGateway.ask(new JobManagerMessages.StopJob(jobId), timeout); + } catch (final Exception e) { + throw new ProgramInvocationException("Failed to query the job manager gateway.", e); + } + + final Object result = Await.result(response, timeout); + + if (result instanceof JobManagerMessages.StoppingSuccess) { + LOG.info("Job stopping with ID " + jobId + " succeeded."); + } else if (result instanceof JobManagerMessages.StoppingFailure) { + final Throwable t = ((JobManagerMessages.StoppingFailure) result).cause(); + LOG.info("Job stopping with ID " + jobId + " failed.", t); + throw new Exception("Failed to stop the job because of \n" + t.getMessage()); + } else { + throw new Exception("Unknown message received while stopping: " + result.getClass().getName()); } } @@ -482,7 +517,7 @@ public class Client { // ------------------------------------------------------------------------ // Sessions // ------------------------------------------------------------------------ - + /** * Tells the JobManager to finish the session (job) defined by the given ID. * http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-clients/src/main/java/org/apache/flink/client/program/ProgramStopException.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ProgramStopException.java b/flink-clients/src/main/java/org/apache/flink/client/program/ProgramStopException.java new file mode 100644 index 0000000..a1d8a9b --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ProgramStopException.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.flink.client.program; + +/** + * Exception used to indicate that there is an error during stopping of a Flink program. + */ +public class ProgramStopException extends Exception { + private static final long serialVersionUID = -906791331829704450L; + + /** + * Creates a <tt>ProgramStopException</tt> with the given message. + * + * @param message + * The message for the exception. + */ + public ProgramStopException(String message) { + super(message); + } + + /** + * Creates a <tt>ProgramStopException</tt> for the given exception. + * + * @param cause + * The exception that causes the program invocation to fail. + */ + public ProgramStopException(Throwable cause) { + super(cause); + } + + /** + * Creates a <tt>ProgramStopException</tt> for the given exception with an additional message. + * + * @param message + * The additional message. + * @param cause + * The exception that causes the program invocation to fail. + */ + public ProgramStopException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java new file mode 100644 index 0000000..7c34c75 --- /dev/null +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client; + +import akka.actor.*; +import akka.testkit.JavaTestKit; + +import org.apache.flink.client.cli.CommandLineOptions; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.akka.FlinkUntypedActor; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.util.TestLogger; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.UUID; + +import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull; +import static org.apache.flink.client.CliFrontendTestUtils.clearGlobalConfiguration; +import static org.junit.Assert.*; + +public class CliFrontendStopTest extends TestLogger { + + private static ActorSystem actorSystem; + + @BeforeClass + public static void setup() { + pipeSystemOutToNull(); + clearGlobalConfiguration(); + actorSystem = ActorSystem.create("TestingActorSystem"); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(actorSystem); + actorSystem = null; + } + + @Test + public void testStop() throws Exception { + // test unrecognized option + { + String[] parameters = { "-v", "-l" }; + CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + int retCode = testFrontend.stop(parameters); + assertTrue(retCode != 0); + } + + // test missing job id + { + String[] parameters = {}; + CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + int retCode = testFrontend.stop(parameters); + assertTrue(retCode != 0); + } + + // test stop properly + { + JobID jid = new JobID(); + String jidString = jid.toString(); + + final UUID leaderSessionID = UUID.randomUUID(); + final ActorRef jm = actorSystem.actorOf(Props.create(CliJobManager.class, jid, leaderSessionID)); + + final ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID); + + String[] parameters = { jidString }; + StopTestCliFrontend testFrontend = new StopTestCliFrontend(gateway); + + int retCode = testFrontend.stop(parameters); + assertTrue(retCode == 0); + } + + // test unknown job Id + { + JobID jid1 = new JobID(); + JobID jid2 = new JobID(); + + final UUID leaderSessionID = UUID.randomUUID(); + final ActorRef jm = actorSystem.actorOf(Props.create(CliJobManager.class, jid1, leaderSessionID)); + + final ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID); + + String[] parameters = { jid2.toString() }; + StopTestCliFrontend testFrontend = new StopTestCliFrontend(gateway); + + assertTrue(testFrontend.stop(parameters) != 0); + } + } + + protected static final class StopTestCliFrontend extends CliFrontend { + + private ActorGateway jobManagerGateway; + + public StopTestCliFrontend(ActorGateway jobManagerGateway) throws Exception { + super(CliFrontendTestUtils.getConfigDir()); + this.jobManagerGateway = jobManagerGateway; + } + + @Override + public ActorGateway getJobManagerGateway(CommandLineOptions options) { + return jobManagerGateway; + } + } + + protected static final class CliJobManager extends FlinkUntypedActor { + private final JobID jobID; + private final UUID leaderSessionID; + + public CliJobManager(final JobID jobID, final UUID leaderSessionID) { + this.jobID = jobID; + this.leaderSessionID = leaderSessionID; + } + + @Override + public void handleMessage(Object message) { + if (message instanceof JobManagerMessages.RequestTotalNumberOfSlots$) { + getSender().tell(decorateMessage(1), getSelf()); + } else if (message instanceof JobManagerMessages.StopJob) { + JobManagerMessages.StopJob stopJob = (JobManagerMessages.StopJob) message; + + if (jobID != null && jobID.equals(stopJob.jobID())) { + getSender().tell(decorateMessage(new Status.Success(new Object())), getSelf()); + } else { + getSender() + .tell(decorateMessage(new Status.Failure(new Exception( + "Wrong or no JobID"))), getSelf()); + } + } else if (message instanceof JobManagerMessages.RequestRunningJobsStatus$) { + getSender().tell(decorateMessage(new JobManagerMessages.RunningJobsStatus()), + getSelf()); + } + } + + @Override + protected UUID getLeaderSessionID() { + return leaderSessionID; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java index 767f1b1..2541345 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java @@ -208,7 +208,7 @@ public class FlinkClient { final Client client; try { client = new Client(configuration); - } catch (IOException e) { + } catch (final IOException e) { throw new RuntimeException("Could not establish a connection to the job manager", e); } @@ -253,7 +253,7 @@ public class FlinkClient { } try { - client.cancel(jobId); + client.stop(jobId); } catch (final Exception e) { throw new RuntimeException("Cannot stop job.", e); } @@ -282,7 +282,7 @@ public class FlinkClient { final Future<Object> response = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus(), new Timeout(askTimeout)); - Object result; + final Object result; try { result = Await.result(response, askTimeout); } catch (final Exception e) { http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java index e077aeb..66b05c6 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java @@ -25,6 +25,7 @@ import backtype.storm.topology.IRichSpout; import com.google.common.collect.Sets; import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters; +import org.apache.flink.api.common.functions.StoppableFunction; import org.apache.flink.api.java.tuple.Tuple0; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple25; @@ -52,7 +53,7 @@ import java.util.HashMap; * is {@code null}, {@link SpoutWrapper} calls {@link IRichSpout#nextTuple() nextTuple()} method until * {@link FiniteSpout#reachedEnd()} returns true. */ -public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> { +public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> implements StoppableFunction { private static final long serialVersionUID = -218340336648247605L; /** Number of attributes of the spouts's output tuples per stream. */ @@ -299,6 +300,16 @@ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> { this.isRunning = false; } + /** + * {@inheritDoc} + * <p> + * Sets the {@link #isRunning} flag to {@code false}. + */ + @Override + public void stop() { + this.isRunning = false; + } + @Override public void close() throws Exception { this.spout.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java new file mode 100644 index 0000000..a83b73f --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.flink.api.common.functions; + +/** + * Must be implemented by stoppable functions, eg, source functions of streaming jobs. The method {@link #stop()} will + * be called when the job received the STOP signal. On this signal, the source function must stop emitting new data and + * terminate gracefully. + */ +public interface StoppableFunction { + /** + * Stops the source. In contrast to {@code cancel()} this is a request to the source function to shut down + * gracefully. Pending data can still be emitted and it is not required to stop immediately -- however, in the near + * future. The job will keep running until all emitted data is processed completely. + * <p> + * Most streaming sources will have a while loop inside the {@code run()} method. You need to ensure that the source + * will break out of this loop. This can be achieved by having a volatile field "isRunning" that is checked in the + * loop and that is set to false in this method. + * <p> + * <strong>The call to {@code stop()} should not block and not throw any exception.</strong> + */ + public void stop(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/pom.xml ---------------------------------------------------------------------- diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml index 1a19fb1..27bbbd5 100644 --- a/flink-runtime-web/pom.xml +++ b/flink-runtime-web/pom.xml @@ -44,6 +44,20 @@ under the License. </includes> </resource> </resources> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <inherited>true</inherited> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> </build> <dependencies> http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index 0b5de1f..67c0dab 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -53,6 +53,7 @@ import org.apache.flink.runtime.webmonitor.handlers.JobDetailsHandler; import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler; import org.apache.flink.runtime.webmonitor.handlers.JobManagerConfigHandler; import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler; +import org.apache.flink.runtime.webmonitor.handlers.JobStoppingHandler; import org.apache.flink.runtime.webmonitor.handlers.JobVertexAccumulatorsHandler; import org.apache.flink.runtime.webmonitor.handlers.JobVertexBackPressureHandler; import org.apache.flink.runtime.webmonitor.handlers.JobVertexCheckpointsHandler; @@ -245,8 +246,14 @@ public class WebRuntimeMonitor implements WebMonitor { // Cancel a job via GET (for proper integration with YARN this has to be performed via GET) .GET("/jobs/:jobid/yarn-cancel", handler(new JobCancellationHandler())) - // DELETE is the preferred way of cancelling a job (Rest-conform) - .DELETE("/jobs/:jobid", handler(new JobCancellationHandler())); + // DELETE is the preferred way of canceling a job (Rest-conform) + .DELETE("/jobs/:jobid/cancel", handler(new JobCancellationHandler())) + + // stop a job via GET (for proper integration with YARN this has to be performed via GET) + .GET("/jobs/:jobid/yarn-stop", handler(new JobStoppingHandler())) + + // DELETE is the preferred way of stopping a job (Rest-conform) + .DELETE("/jobs/:jobid/stop", handler(new JobStoppingHandler())); if (webSubmitAllow) { router http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java index aae8b34..b17acdc 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java @@ -25,6 +25,9 @@ import org.apache.flink.util.StringUtils; import java.util.Map; +/** + * Request handler for the CANCEL request. + */ public class JobCancellationHandler implements RequestHandler { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java index 4511c17..4f31128 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java @@ -61,6 +61,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { // basic info gen.writeStringField("jid", graph.getJobID().toString()); gen.writeStringField("name", graph.getJobName()); + gen.writeBooleanField("isStoppable", graph.isStoppable()); gen.writeStringField("state", graph.getState().name()); // times and duration http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java new file mode 100644 index 0000000..791790a --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.util.StringUtils; + +import java.util.Map; + +/** + * Request handler for the STOP request. + */ +public class JobStoppingHandler implements RequestHandler { + + @Override + public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + try { + JobID jobid = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid"))); + if (jobManager != null) { + jobManager.tell(new JobManagerMessages.StopJob(jobid)); + return "{}"; + } + else { + throw new Exception("No connection to the leading JobManager."); + } + } + catch (Exception e) { + throw new Exception("Failed to stop the job with id: " + pathParams.get("jobid") + e.getMessage(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java index d7d4457..9a396d3 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java @@ -165,6 +165,25 @@ public class HttpTestClient implements AutoCloseable { } /** + * Sends a simple DELETE request to the given path. You only specify the $path part of + * http://$host:$host/$path. + * + * @param path The $path to DELETE (http://$host:$host/$path) + */ + public void sendDeleteRequest(String path, FiniteDuration timeout) throws TimeoutException, InterruptedException { + if (!path.startsWith("/")) { + path = "/" + path; + } + + HttpRequest getRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, + HttpMethod.DELETE, path); + getRequest.headers().set(HttpHeaders.Names.HOST, host); + getRequest.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); + + sendRequest(getRequest, timeout); + } + + /** * Returns the next available HTTP response. A call to this method blocks until a response * becomes available. * http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade index 5b541ae..fe3e0fc 100644 --- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade +++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.jade @@ -43,6 +43,10 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main(ng-if="job") span.navbar-info-button.btn.btn-default(ng-click="cancelJob($event)") | Cancel + .navbar-info.last.first(ng-if!="job.isStoppable && (job.state=='CREATED' || job.state=='RUNNING' || job.state=='RESTARTING')") + span.navbar-info-button.btn.btn-default(ng-click="stopJob($event)") + | Stop + nav.navbar.navbar-default.navbar-fixed-top.navbar-main-additional(ng-if="job") ul.nav.nav-tabs li(ui-sref-active='active') http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee index f0ce892..931976d 100644 --- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee +++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee @@ -80,6 +80,11 @@ angular.module('flinkApp') JobsService.cancelJob($stateParams.jobid).then (data) -> {} + $scope.stopJob = (stopEvent) -> + angular.element(stopEvent.currentTarget).removeClass("btn").removeClass("btn-default").html('Stopping...') + JobsService.stopJob($stateParams.jobid).then (data) -> + {} + $scope.toggleHistory = -> $scope.showHistory = !$scope.showHistory http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee index 65ae5cb..71f0921 100644 --- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee +++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee @@ -282,4 +282,9 @@ angular.module('flinkApp') # proper "DELETE jobs/<jobid>/" $http.get flinkConfig.jobServer + "jobs/" + jobid + "/yarn-cancel" + @stopJob = (jobid) -> + # uses the non REST-compliant GET yarn-cancel handler which is available in addition to the + # proper "DELETE jobs/<jobid>/" + $http.get "jobs/" + jobid + "/yarn-stop" + @ http://git-wip-us.apache.org/repos/asf/flink/blob/bdd4024e/flink-runtime-web/web-dashboard/web/css/vendor.css ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/web/css/vendor.css b/flink-runtime-web/web-dashboard/web/css/vendor.css index 2a8d00f..e0c9259 100644 --- a/flink-runtime-web/web-dashboard/web/css/vendor.css +++ b/flink-runtime-web/web-dashboard/web/css/vendor.css @@ -5902,6 +5902,7 @@ button.close { .modal-header { padding: 15px; border-bottom: 1px solid #e5e5e5; + min-height: 16.42857143px; } .modal-header .close { margin-top: -2px;