This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 566d10d [FLINK-12312][runtime] Remove CLI command for rescaling 566d10d is described below commit 566d10d958d71fd113f8f7ecc08fa9b63a072919 Author: Gary Yao <g...@apache.org> AuthorDate: Wed Apr 24 21:50:22 2019 +0200 [FLINK-12312][runtime] Remove CLI command for rescaling This closes #8260. --- docs/ops/cli.md | 23 -- docs/ops/cli.zh.md | 23 -- .../org/apache/flink/client/cli/CliFrontend.java | 59 ----- .../apache/flink/client/cli/CliFrontendParser.java | 27 -- .../apache/flink/client/program/ClusterClient.java | 11 - .../client/program/rest/RestClusterClient.java | 41 --- .../flink/client/cli/CliFrontendModifyTest.java | 136 ---------- .../flink/runtime/dispatcher/Dispatcher.java | 10 - .../apache/flink/runtime/jobmaster/JobMaster.java | 289 +-------------------- .../flink/runtime/jobmaster/JobMasterGateway.java | 36 +-- .../runtime/jobmaster/RescalingBehaviour.java | 49 ---- .../handler/job/rescaling/RescalingHandlers.java | 56 ++-- .../flink/runtime/webmonitor/RestfulGateway.java | 18 -- .../jobmaster/utils/TestingJobMasterGateway.java | 21 -- .../utils/TestingJobMasterGatewayBuilder.java | 15 +- 15 files changed, 30 insertions(+), 784 deletions(-) diff --git a/docs/ops/cli.md b/docs/ops/cli.md index 42698e8..b414b30 100644 --- a/docs/ops/cli.md +++ b/docs/ops/cli.md @@ -37,7 +37,6 @@ The command line can be used to - provide information about a job, - list running and waiting jobs, - trigger and dispose savepoints, and -- modify a running job A prerequisite to using the command line interface is that the Flink master (JobManager) has been started (via @@ -126,10 +125,6 @@ available. ./bin/flink stop -s [targetDirectory] -d <jobID> -- Modify a running job (streaming jobs only): - - ./bin/flink modify <jobID> -p <newParallelism> - **NOTE**: The difference between cancelling and stopping a (streaming) job is the following: @@ -429,24 +424,6 @@ Action "savepoint" triggers savepoints for a running job or disposes existing on in the configuration. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode - - - -Action "modify" modifies a running job (e.g. change of parallelism). - - Syntax: modify <Job ID> [OPTIONS] - "modify" action options: - -h,--help Show the help message for the CLI - Frontend or the action. - -p,--parallelism <newParallelism> New parallelism for the specified job. - -v,--verbose This option is deprecated. - Options for default mode: - -m,--jobmanager <arg> Address of the JobManager (master) to which - to connect. Use this flag to connect to a - different JobManager than the one specified - in the configuration. - -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths - for high availability mode {% endhighlight %} {% top %} diff --git a/docs/ops/cli.zh.md b/docs/ops/cli.zh.md index bf995b4..7c02047 100644 --- a/docs/ops/cli.zh.md +++ b/docs/ops/cli.zh.md @@ -37,7 +37,6 @@ The command line can be used to - provide information about a job, - list running and waiting jobs, - trigger and dispose savepoints, and -- modify a running job A prerequisite to using the command line interface is that the Flink master (JobManager) has been started (via @@ -126,10 +125,6 @@ available. ./bin/flink stop -s [targetDirectory] -d <jobID> -- Modify a running job (streaming jobs only): - - ./bin/flink modify <jobID> -p <newParallelism> - **NOTE**: The difference between cancelling and stopping a (streaming) job is the following: @@ -429,24 +424,6 @@ Action "savepoint" triggers savepoints for a running job or disposes existing on in the configuration. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode - - - -Action "modify" modifies a running job (e.g. change of parallelism). - - Syntax: modify <Job ID> [OPTIONS] - "modify" action options: - -h,--help Show the help message for the CLI - Frontend or the action. - -p,--parallelism <newParallelism> New parallelism for the specified job. - -v,--verbose This option is deprecated. - Options for default mode: - -m,--jobmanager <arg> Address of the JobManager (master) to which - to connect. Use this flag to connect to a - different JobManager than the one specified - in the configuration. - -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths - for high availability mode {% endhighlight %} {% top %} diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index cdd7616..c6b5c9a 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -87,9 +87,6 @@ import java.util.stream.Collectors; import scala.concurrent.duration.FiniteDuration; -import static org.apache.flink.client.cli.CliFrontendParser.HELP_OPTION; -import static org.apache.flink.client.cli.CliFrontendParser.MODIFY_PARALLELISM_OPTION; - /** * Implementation of a simple command line frontend for executing programs. */ @@ -104,7 +101,6 @@ public class CliFrontend { private static final String ACTION_CANCEL = "cancel"; private static final String ACTION_STOP = "stop"; private static final String ACTION_SAVEPOINT = "savepoint"; - private static final String ACTION_MODIFY = "modify"; // configuration dir parameters private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf"; @@ -736,58 +732,6 @@ public class CliFrontend { logAndSysout("Savepoint '" + savepointPath + "' disposed."); } - protected void modify(String[] args) throws CliArgsException, FlinkException { - LOG.info("Running 'modify' command."); - - final Options commandOptions = CliFrontendParser.getModifyOptions(); - - final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions); - - final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, false); - - if (commandLine.hasOption(HELP_OPTION.getOpt())) { - CliFrontendParser.printHelpForModify(customCommandLines); - } - - final JobID jobId; - final String[] modifyArgs = commandLine.getArgs(); - - if (modifyArgs.length > 0) { - jobId = parseJobId(modifyArgs[0]); - } else { - throw new CliArgsException("Missing JobId"); - } - - final int newParallelism; - if (commandLine.hasOption(MODIFY_PARALLELISM_OPTION.getOpt())) { - try { - newParallelism = Integer.parseInt(commandLine.getOptionValue(MODIFY_PARALLELISM_OPTION.getOpt())); - } catch (NumberFormatException e) { - throw new CliArgsException("Could not parse the parallelism which is supposed to be an integer.", e); - } - } else { - throw new CliArgsException("Missing new parallelism."); - } - - final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine); - - logAndSysout("Modify job " + jobId + '.'); - runClusterAction( - activeCommandLine, - commandLine, - clusterClient -> { - CompletableFuture<Acknowledge> rescaleFuture = clusterClient.rescaleJob(jobId, newParallelism); - - try { - rescaleFuture.get(); - } catch (Exception e) { - throw new FlinkException("Could not rescale job " + jobId + '.', ExceptionUtils.stripExecutionException(e)); - } - logAndSysout("Rescaled job " + jobId + ". Its new parallelism is " + newParallelism + '.'); - } - ); - } - // -------------------------------------------------------------------------------------------- // Interaction with programs and JobManager // -------------------------------------------------------------------------------------------- @@ -1053,9 +997,6 @@ public class CliFrontend { case ACTION_SAVEPOINT: savepoint(params); return 0; - case ACTION_MODIFY: - modify(params); - return 0; case "-h": case "--help": CliFrontendParser.printHelp(customCommandLines); diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java index 398efd2..cea3998 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java @@ -118,8 +118,6 @@ public class CliFrontendParser { public static final Option STOP_AND_DRAIN = new Option("d", "drain", false, "Send MAX_WATERMARK before taking the savepoint and stopping the pipelne."); - static final Option MODIFY_PARALLELISM_OPTION = new Option("p", "parallelism", true, "New parallelism for the specified job."); - static { HELP_OPTION.setRequired(false); @@ -162,9 +160,6 @@ public class CliFrontendParser { CANCEL_WITH_SAVEPOINT_OPTION.setArgName("targetDirectory"); CANCEL_WITH_SAVEPOINT_OPTION.setOptionalArg(true); - MODIFY_PARALLELISM_OPTION.setRequired(false); - MODIFY_PARALLELISM_OPTION.setArgName("newParallelism"); - STOP_WITH_SAVEPOINT.setRequired(false); STOP_WITH_SAVEPOINT.setArgName("withSavepoint"); STOP_WITH_SAVEPOINT.setOptionalArg(true); @@ -240,12 +235,6 @@ public class CliFrontendParser { return options.addOption(JAR_OPTION); } - static Options getModifyOptions() { - final Options options = buildGeneralOptions(new Options()); - options.addOption(MODIFY_PARALLELISM_OPTION); - return options; - } - // -------------------------------------------------------------------------------------------- // Help // -------------------------------------------------------------------------------------------- @@ -297,7 +286,6 @@ public class CliFrontendParser { printHelpForStop(customCommandLines); printHelpForCancel(customCommandLines); printHelpForSavepoint(customCommandLines); - printHelpForModify(customCommandLines); System.out.println(); } @@ -390,21 +378,6 @@ public class CliFrontendParser { System.out.println(); } - public static void printHelpForModify(Collection<CustomCommandLine<?>> customCommandLines) { - HelpFormatter formatter = new HelpFormatter(); - formatter.setLeftPadding(5); - formatter.setWidth(80); - - System.out.println("\nAction \"modify\" modifies a running job (e.g. change of parallelism)."); - System.out.println("\n Syntax: modify <Job ID> [OPTIONS]"); - formatter.setSyntaxPrefix(" \"modify\" action options:"); - formatter.printHelp(" ", getModifyOptions()); - - printCustomCliOptions(customCommandLines, formatter, false); - - System.out.println(); - } - /** * Prints custom cli options. * @param formatter The formatter to use for printing diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index 0be7dff..0da7997 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -499,17 +499,6 @@ public abstract class ClusterClient<T> { public abstract JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException; - /** - * Rescales the specified job such that it will have the new parallelism. - * - * @param jobId specifying the job to modify - * @param newParallelism specifying the new parallelism of the rescaled job - * @return Future which is completed once the rescaling has been completed - */ - public CompletableFuture<Acknowledge> rescaleJob(JobID jobId, int newParallelism) { - throw new UnsupportedOperationException("The " + getClass().getSimpleName() + " does not support rescaling."); - } - public void shutDownCluster() { throw new UnsupportedOperationException("The " + getClass().getSimpleName() + " does not support shutDownCluster."); } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 269462a..1d08992 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -46,10 +46,6 @@ import org.apache.flink.runtime.rest.FileUpload; import org.apache.flink.runtime.rest.RestClient; import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo; import org.apache.flink.runtime.rest.handler.async.TriggerResponse; -import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusHeaders; -import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusMessageParameters; -import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerHeaders; -import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.EmptyResponseBody; @@ -532,43 +528,6 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster } @Override - public CompletableFuture<Acknowledge> rescaleJob(JobID jobId, int newParallelism) { - - final RescalingTriggerHeaders rescalingTriggerHeaders = RescalingTriggerHeaders.getInstance(); - final RescalingTriggerMessageParameters rescalingTriggerMessageParameters = rescalingTriggerHeaders.getUnresolvedMessageParameters(); - rescalingTriggerMessageParameters.jobPathParameter.resolve(jobId); - rescalingTriggerMessageParameters.rescalingParallelismQueryParameter.resolve(Collections.singletonList(newParallelism)); - - final CompletableFuture<TriggerResponse> rescalingTriggerResponseFuture = sendRequest( - rescalingTriggerHeaders, - rescalingTriggerMessageParameters); - - final CompletableFuture<AsynchronousOperationInfo> rescalingOperationFuture = rescalingTriggerResponseFuture.thenCompose( - (TriggerResponse triggerResponse) -> { - final TriggerId triggerId = triggerResponse.getTriggerId(); - final RescalingStatusHeaders rescalingStatusHeaders = RescalingStatusHeaders.getInstance(); - final RescalingStatusMessageParameters rescalingStatusMessageParameters = rescalingStatusHeaders.getUnresolvedMessageParameters(); - - rescalingStatusMessageParameters.jobPathParameter.resolve(jobId); - rescalingStatusMessageParameters.triggerIdPathParameter.resolve(triggerId); - - return pollResourceAsync( - () -> sendRequest( - rescalingStatusHeaders, - rescalingStatusMessageParameters)); - }); - - return rescalingOperationFuture.thenApply( - (AsynchronousOperationInfo asynchronousOperationInfo) -> { - if (asynchronousOperationInfo.getFailureCause() == null) { - return Acknowledge.get(); - } else { - throw new CompletionException(asynchronousOperationInfo.getFailureCause()); - } - }); - } - - @Override public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) { final SavepointDisposalRequest savepointDisposalRequest = new SavepointDisposalRequest(savepointPath); diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java deleted file mode 100644 index 05dacec..0000000 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.client.cli; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.client.cli.util.MockedCliFrontend; -import org.apache.flink.client.deployment.StandaloneClusterId; -import org.apache.flink.client.program.rest.RestClusterClient; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.messages.Acknowledge; - -import org.hamcrest.Matchers; -import org.junit.Test; - -import java.util.concurrent.CompletableFuture; - -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; - -/** - * Tests for the modify command. - */ -public class CliFrontendModifyTest extends CliFrontendTestBase { - - @Test - public void testModifyJob() throws Exception { - final JobID jobId = new JobID(); - final int parallelism = 42; - String[] args = {jobId.toString(), "-p", String.valueOf(parallelism)}; - - Tuple2<JobID, Integer> jobIdParallelism = callModify(args); - - assertThat(jobIdParallelism.f0, Matchers.is(jobId)); - assertThat(jobIdParallelism.f1, Matchers.is(parallelism)); - } - - @Test - public void testMissingJobId() throws Exception { - final int parallelism = 42; - final String[] args = {"-p", String.valueOf(parallelism)}; - - try { - callModify(args); - fail("Expected CliArgsException"); - } catch (CliArgsException expected) { - // expected - } - } - - @Test - public void testMissingParallelism() throws Exception { - final JobID jobId = new JobID(); - final String[] args = {jobId.toString()}; - - try { - callModify(args); - fail("Expected CliArgsException"); - } catch (CliArgsException expected) { - // expected - } - } - - @Test - public void testUnparsableParalllelism() throws Exception { - final JobID jobId = new JobID(); - final String[] args = {jobId.toString(), "-p", "foobar"}; - - try { - callModify(args); - fail("Expected CliArgsException"); - } catch (CliArgsException expected) { - // expected - } - } - - @Test - public void testUnparsableJobId() throws Exception { - final int parallelism = 42; - final String[] args = {"foobar", "-p", String.valueOf(parallelism)}; - - try { - callModify(args); - fail("Expected CliArgsException"); - } catch (CliArgsException expected) { - // expected - } - } - - private Tuple2<JobID, Integer> callModify(String[] args) throws Exception { - final CompletableFuture<Tuple2<JobID, Integer>> rescaleJobFuture = new CompletableFuture<>(); - final TestingClusterClient clusterClient = new TestingClusterClient(rescaleJobFuture, getConfiguration()); - final MockedCliFrontend cliFrontend = new MockedCliFrontend(clusterClient); - - cliFrontend.modify(args); - - assertThat(rescaleJobFuture.isDone(), Matchers.is(true)); - - return rescaleJobFuture.get(); - } - - private static final class TestingClusterClient extends RestClusterClient<StandaloneClusterId> { - - private final CompletableFuture<Tuple2<JobID, Integer>> rescaleJobFuture; - - TestingClusterClient(CompletableFuture<Tuple2<JobID, Integer>> rescaleJobFuture, Configuration configuration) throws Exception { - super(configuration, StandaloneClusterId.getInstance()); - - this.rescaleJobFuture = rescaleJobFuture; - } - - @Override - public CompletableFuture<Acknowledge> rescaleJob(JobID jobId, int newParallelism) { - rescaleJobFuture.complete(Tuple2.of(jobId, newParallelism)); - - return CompletableFuture.completedFuture(Acknowledge.get()); - } - } - -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 3c4028c..1f1fc62 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -43,7 +43,6 @@ import org.apache.flink.runtime.jobmaster.JobManagerSharedServices; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.jobmaster.JobNotFinishedException; import org.apache.flink.runtime.jobmaster.JobResult; -import org.apache.flink.runtime.jobmaster.RescalingBehaviour; import org.apache.flink.runtime.jobmaster.factories.DefaultJobManagerJobMetricGroupFactory; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; @@ -423,15 +422,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme } @Override - public CompletableFuture<Acknowledge> rescaleJob(JobID jobId, int newParallelism, RescalingBehaviour rescalingBehaviour, Time timeout) { - final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId); - - return jobMasterGatewayFuture.thenCompose( - (JobMasterGateway jobMasterGateway) -> - jobMasterGateway.rescaleJob(newParallelism, rescalingBehaviour, timeout)); - } - - @Override public CompletableFuture<ClusterOverview> requestClusterOverview(Time timeout) { CompletableFuture<ResourceOverview> taskManagerOverviewFuture = runResourceManagerCommand(resourceManagerGateway -> resourceManagerGateway.requestResourceOverview(timeout)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index f912f45..6c83c7a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -31,10 +31,7 @@ import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; -import org.apache.flink.runtime.checkpoint.CheckpointException; -import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; -import org.apache.flink.runtime.checkpoint.Checkpoints; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.client.JobExecutionException; @@ -42,7 +39,6 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.execution.SuppressRestartsException; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -62,12 +58,10 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; -import org.apache.flink.runtime.jobmaster.exceptions.JobModificationException; import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory; import org.apache.flink.runtime.jobmaster.slotpool.Scheduler; import org.apache.flink.runtime.jobmaster.slotpool.SchedulerFactory; @@ -115,9 +109,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -208,9 +200,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast private JobManagerJobMetricGroup jobManagerJobMetricGroup; @Nullable - private String lastInternalSavepoint; - - @Nullable private ResourceManagerAddress resourceManagerAddress; @Nullable @@ -291,7 +280,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast this.registeredTaskManagers = new HashMap<>(4); this.backPressureStatsTracker = checkNotNull(jobManagerSharedServices.getBackPressureStatsTracker()); - this.lastInternalSavepoint = null; this.jobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph); this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup); @@ -365,15 +353,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast // shut down will internally release all registered slots slotPool.close(); - final CompletableFuture<Void> disposeInternalSavepointFuture; - - if (lastInternalSavepoint != null) { - disposeInternalSavepointFuture = CompletableFuture.runAsync(() -> disposeSavepoint(lastInternalSavepoint)); - } else { - disposeInternalSavepointFuture = CompletableFuture.completedFuture(null); - } - - return FutureUtils.completeAll(Collections.singletonList(disposeInternalSavepointFuture)); + return CompletableFuture.completedFuture(null); } //---------------------------------------------------------------------------------------------- @@ -387,130 +367,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast return CompletableFuture.completedFuture(Acknowledge.get()); } - @Override - public CompletableFuture<Acknowledge> rescaleJob( - int newParallelism, - RescalingBehaviour rescalingBehaviour, - Time timeout) { - final ArrayList<JobVertexID> allOperators = new ArrayList<>(jobGraph.getNumberOfVertices()); - - for (JobVertex jobVertex : jobGraph.getVertices()) { - allOperators.add(jobVertex.getID()); - } - - return rescaleOperators(allOperators, newParallelism, rescalingBehaviour, timeout); - } - - @Override - public CompletableFuture<Acknowledge> rescaleOperators( - Collection<JobVertexID> operators, - int newParallelism, - RescalingBehaviour rescalingBehaviour, - Time timeout) { - - if (newParallelism <= 0) { - return FutureUtils.completedExceptionally( - new JobModificationException("The target parallelism of a rescaling operation must be larger than 0.")); - } - - // 1. Check whether we can rescale the job & rescale the respective vertices - try { - rescaleJobGraph(operators, newParallelism, rescalingBehaviour); - } catch (FlinkException e) { - final String msg = String.format("Cannot rescale job %s.", jobGraph.getName()); - - log.info(msg, e); - return FutureUtils.completedExceptionally(new JobModificationException(msg, e)); - } - - final ExecutionGraph currentExecutionGraph = executionGraph; - - final JobManagerJobMetricGroup newJobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph); - final ExecutionGraph newExecutionGraph; - - try { - newExecutionGraph = createExecutionGraph(newJobManagerJobMetricGroup); - } catch (JobExecutionException | JobException e) { - return FutureUtils.completedExceptionally( - new JobModificationException("Could not create rescaled ExecutionGraph.", e)); - } - - // 3. disable checkpoint coordinator to suppress subsequent checkpoints - final CheckpointCoordinator checkpointCoordinator = currentExecutionGraph.getCheckpointCoordinator(); - checkpointCoordinator.stopCheckpointScheduler(); - - // 4. take a savepoint - final CompletableFuture<String> savepointFuture = getJobModificationSavepoint(timeout); - - final CompletableFuture<ExecutionGraph> executionGraphFuture = restoreExecutionGraphFromRescalingSavepoint( - newExecutionGraph, - savepointFuture) - .handleAsync( - (ExecutionGraph executionGraph, Throwable failure) -> { - if (failure != null) { - // in case that we couldn't take a savepoint or restore from it, let's restart the checkpoint - // coordinator and abort the rescaling operation - if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) { - checkpointCoordinator.startCheckpointScheduler(); - } - - throw new CompletionException(ExceptionUtils.stripCompletionException(failure)); - } else { - return executionGraph; - } - }, - getMainThreadExecutor()); - - // 5. suspend the current job - final CompletableFuture<JobStatus> terminationFuture = executionGraphFuture.thenComposeAsync( - (ExecutionGraph ignored) -> { - suspendExecutionGraph(new FlinkException("Job is being rescaled.")); - return currentExecutionGraph.getTerminationFuture(); - }, - getMainThreadExecutor()); - - final CompletableFuture<Void> suspendedFuture = terminationFuture.thenAccept( - (JobStatus jobStatus) -> { - if (jobStatus != JobStatus.SUSPENDED) { - final String msg = String.format("Job %s rescaling failed because we could not suspend the execution graph.", jobGraph.getName()); - log.info(msg); - throw new CompletionException(new JobModificationException(msg)); - } - }); - - // 6. resume the new execution graph from the taken savepoint - final CompletableFuture<Acknowledge> rescalingFuture = suspendedFuture.thenCombineAsync( - executionGraphFuture, - (Void ignored, ExecutionGraph restoredExecutionGraph) -> { - // check if the ExecutionGraph is still the same - if (executionGraph == currentExecutionGraph) { - clearExecutionGraphFields(); - assignExecutionGraph(restoredExecutionGraph, newJobManagerJobMetricGroup); - scheduleExecutionGraph(); - - return Acknowledge.get(); - } else { - throw new CompletionException(new JobModificationException("Detected concurrent modification of ExecutionGraph. Aborting the rescaling.")); - } - - }, - getMainThreadExecutor()); - - rescalingFuture.whenCompleteAsync( - (Acknowledge ignored, Throwable throwable) -> { - if (throwable != null) { - // fail the newly created execution graph - newExecutionGraph.failGlobal( - new SuppressRestartsException( - new FlinkException( - String.format("Failed to rescale the job %s.", jobGraph.getJobID()), - throwable))); - } - }, getMainThreadExecutor()); - - return rescalingFuture; - } - /** * Updates the task execution state for a given task. * @@ -1261,24 +1117,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast } /** - * Dispose the savepoint stored under the given path. - * - * @param savepointPath path where the savepoint is stored - */ - private void disposeSavepoint(String savepointPath) { - try { - // delete the temporary savepoint - Checkpoints.disposeSavepoint( - savepointPath, - jobMasterConfiguration.getConfiguration(), - userCodeLoader, - log); - } catch (FlinkException | IOException e) { - log.info("Could not dispose temporary rescaling savepoint under {}.", savepointPath, e); - } - } - - /** * Tries to restore the given {@link ExecutionGraph} from the provided {@link SavepointRestoreSettings}. * * @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored @@ -1435,131 +1273,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast slotPool.disconnectResourceManager(); } - /** - * Restore the given {@link ExecutionGraph} from the rescaling savepoint. If the {@link ExecutionGraph} could - * be restored, then this savepoint will be recorded as the latest successful modification savepoint. A previous - * savepoint will be disposed. If the rescaling savepoint is empty, the job will be restored from the initially - * provided savepoint. - * - * @param newExecutionGraph to restore - * @param savepointFuture containing the path to the internal modification savepoint - * @return Future which is completed with the restored {@link ExecutionGraph} - */ - private CompletableFuture<ExecutionGraph> restoreExecutionGraphFromRescalingSavepoint(ExecutionGraph newExecutionGraph, CompletableFuture<String> savepointFuture) { - return savepointFuture - .thenApplyAsync( - (@Nullable String savepointPath) -> { - if (savepointPath != null) { - try { - tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, SavepointRestoreSettings.forPath(savepointPath, false)); - } catch (Exception e) { - final String message = String.format("Could not restore from temporary rescaling savepoint. This might indicate " + - "that the savepoint %s got corrupted. Deleting this savepoint as a precaution.", - savepointPath); - - log.info(message); - - CompletableFuture - .runAsync( - () -> { - if (savepointPath.equals(lastInternalSavepoint)) { - lastInternalSavepoint = null; - } - }, - getMainThreadExecutor()) - .thenRunAsync( - () -> disposeSavepoint(savepointPath), - scheduledExecutorService); - - throw new CompletionException(new JobModificationException(message, e)); - } - } else { - // No rescaling savepoint, restart from the initial savepoint or none - try { - tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings()); - } catch (Exception e) { - final String message = String.format("Could not restore from initial savepoint. This might indicate " + - "that the savepoint %s got corrupted.", jobGraph.getSavepointRestoreSettings().getRestorePath()); - - log.info(message); - - throw new CompletionException(new JobModificationException(message, e)); - } - } - - return newExecutionGraph; - }, scheduledExecutorService); - } - - /** - * Takes an internal savepoint for job modification purposes. If the savepoint was not successful because - * not all tasks were running, it returns the last successful modification savepoint. - * - * @param timeout for the operation - * @return Future which is completed with the savepoint path or the last successful modification savepoint if the - * former was not successful - */ - private CompletableFuture<String> getJobModificationSavepoint(Time timeout) { - return triggerSavepoint( - null, - false, - timeout) - .handleAsync( - (String savepointPath, Throwable throwable) -> { - if (throwable != null) { - final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); - if (strippedThrowable instanceof CheckpointException) { - final CheckpointException checkpointException = (CheckpointException) strippedThrowable; - - if (checkpointException.getCheckpointFailureReason() == CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING) { - return lastInternalSavepoint; - } else { - throw new CompletionException(checkpointException); - } - } else { - throw new CompletionException(strippedThrowable); - } - } else { - final String savepointToDispose = lastInternalSavepoint; - lastInternalSavepoint = savepointPath; - - if (savepointToDispose != null) { - // dispose the old savepoint asynchronously - CompletableFuture.runAsync( - () -> disposeSavepoint(savepointToDispose), - scheduledExecutorService); - } - - return lastInternalSavepoint; - } - }, - getMainThreadExecutor()); - } - - /** - * Rescales the given operators of the {@link JobGraph} of this {@link JobMaster} with respect to given - * parallelism and {@link RescalingBehaviour}. - * - * @param operators to rescale - * @param newParallelism new parallelism for these operators - * @param rescalingBehaviour of the rescaling operation - * @throws FlinkException if the {@link JobGraph} could not be rescaled - */ - private void rescaleJobGraph(Collection<JobVertexID> operators, int newParallelism, RescalingBehaviour rescalingBehaviour) throws FlinkException { - for (JobVertexID jobVertexId : operators) { - final JobVertex jobVertex = jobGraph.findVertexByID(jobVertexId); - - // update max parallelism in case that it has not been configured - final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId); - - if (executionJobVertex != null) { - jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism()); - } - - rescalingBehaviour.accept(jobVertex, newParallelism); - } - } - //---------------------------------------------------------------------------------------------- // Service methods //---------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index b15d704..b2a282d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -18,9 +18,6 @@ package org.apache.flink.runtime.jobmaster; -import java.util.Collection; -import java.util.concurrent.CompletableFuture; -import javax.annotation.Nullable; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway; @@ -48,6 +45,11 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + /** * {@link JobMaster} rpc gateway interface. */ @@ -66,34 +68,6 @@ public interface JobMasterGateway extends CompletableFuture<Acknowledge> cancel(@RpcTimeout Time timeout); /** - * Triggers rescaling of the executed job. - * - * @param newParallelism new parallelism of the job - * @param rescalingBehaviour defining how strict the rescaling has to be executed - * @param timeout of this operation - * @return Future which is completed with {@link Acknowledge} once the rescaling was successful - */ - CompletableFuture<Acknowledge> rescaleJob( - int newParallelism, - RescalingBehaviour rescalingBehaviour, - @RpcTimeout Time timeout); - - /** - * Triggers rescaling of the given set of operators. - * - * @param operators set of operators which shall be rescaled - * @param newParallelism new parallelism of the given set of operators - * @param rescalingBehaviour defining how strict the rescaling has to be executed - * @param timeout of this operation - * @return Future which is completed with {@link Acknowledge} once the rescaling was successful - */ - CompletableFuture<Acknowledge> rescaleOperators( - Collection<JobVertexID> operators, - int newParallelism, - RescalingBehaviour rescalingBehaviour, - @RpcTimeout Time timeout); - - /** * Updates the task execution state for a given task. * * @param taskExecutionState New task execution state for a given task diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java deleted file mode 100644 index 64e2ffa..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmaster; - -import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.util.FlinkException; -import org.apache.flink.util.function.BiConsumerWithException; - -/** - * Definition of the rescaling behaviour. - */ -public enum RescalingBehaviour implements BiConsumerWithException<JobVertex, Integer, FlinkException> { - // rescaling is only executed if the operator can be set to the given parallelism - STRICT { - @Override - public void accept(JobVertex jobVertex, Integer newParallelism) throws FlinkException { - if (jobVertex.getMaxParallelism() < newParallelism) { - throw new FlinkException("Cannot rescale vertex " + jobVertex.getName() + - " because its maximum parallelism " + jobVertex.getMaxParallelism() + - " is smaller than the new parallelism " + newParallelism + '.'); - } else { - jobVertex.setParallelism(newParallelism); - } - } - }, - // the new parallelism will be the minimum of the given parallelism and the maximum parallelism - RELAXED { - @Override - public void accept(JobVertex jobVertex, Integer newParallelism) { - jobVertex.setParallelism(Math.min(jobVertex.getMaxParallelism(), newParallelism)); - } - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingHandlers.java index 8860e5b..2b0f156 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingHandlers.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingHandlers.java @@ -18,28 +18,23 @@ package org.apache.flink.runtime.rest.handler.job.rescaling; -import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.jobmaster.RescalingBehaviour; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers; import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo; +import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult; +import org.apache.flink.runtime.rest.handler.async.TriggerResponse; import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; -import org.apache.flink.runtime.rest.messages.JobIDPathParameter; -import org.apache.flink.runtime.rest.messages.RescalingParallelismQueryParameter; -import org.apache.flink.runtime.rest.messages.TriggerId; -import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter; -import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; -import org.apache.flink.util.SerializedThrowable; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; -import java.util.List; +import javax.annotation.Nonnull; + import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -48,6 +43,10 @@ import java.util.concurrent.CompletableFuture; */ public class RescalingHandlers extends AbstractAsynchronousOperationHandlers<AsynchronousJobOperationKey, Acknowledge> { + private static RestHandlerException featureDisabledException() { + return new RestHandlerException("Rescaling is temporarily disabled. See FLINK-12312.", HttpResponseStatus.SERVICE_UNAVAILABLE); + } + /** * Handler which triggers the rescaling of the specified job. */ @@ -65,29 +64,18 @@ public class RescalingHandlers extends AbstractAsynchronousOperationHandlers<Asy } @Override - protected CompletableFuture<Acknowledge> triggerOperation(HandlerRequest<EmptyRequestBody, RescalingTriggerMessageParameters> request, RestfulGateway gateway) throws RestHandlerException { - final JobID jobId = request.getPathParameter(JobIDPathParameter.class); - final List<Integer> queryParameter = request.getQueryParameter(RescalingParallelismQueryParameter.class); - - if (queryParameter.isEmpty()) { - throw new RestHandlerException("No new parallelism was specified.", HttpResponseStatus.BAD_REQUEST); - } - - final int newParallelism = queryParameter.get(0); - - final CompletableFuture<Acknowledge> rescalingFuture = gateway.rescaleJob( - jobId, - newParallelism, - RescalingBehaviour.STRICT, - RpcUtils.INF_TIMEOUT); + public CompletableFuture<TriggerResponse> handleRequest(@Nonnull final HandlerRequest<EmptyRequestBody, RescalingTriggerMessageParameters> request, @Nonnull final RestfulGateway gateway) throws RestHandlerException { + throw featureDisabledException(); + } - return rescalingFuture; + @Override + protected CompletableFuture<Acknowledge> triggerOperation(HandlerRequest<EmptyRequestBody, RescalingTriggerMessageParameters> request, RestfulGateway gateway) { + throw new UnsupportedOperationException(); } @Override protected AsynchronousJobOperationKey createOperationKey(HandlerRequest<EmptyRequestBody, RescalingTriggerMessageParameters> request) { - final JobID jobId = request.getPathParameter(JobIDPathParameter.class); - return AsynchronousJobOperationKey.of(new TriggerId(), jobId); + throw new UnsupportedOperationException(); } } @@ -108,21 +96,23 @@ public class RescalingHandlers extends AbstractAsynchronousOperationHandlers<Asy } @Override - protected AsynchronousJobOperationKey getOperationKey(HandlerRequest<EmptyRequestBody, RescalingStatusMessageParameters> request) { - final JobID jobId = request.getPathParameter(JobIDPathParameter.class); - final TriggerId triggerId = request.getPathParameter(TriggerIdPathParameter.class); + public CompletableFuture<AsynchronousOperationResult<AsynchronousOperationInfo>> handleRequest(@Nonnull final HandlerRequest<EmptyRequestBody, RescalingStatusMessageParameters> request, @Nonnull final RestfulGateway gateway) throws RestHandlerException { + throw featureDisabledException(); + } - return AsynchronousJobOperationKey.of(triggerId, jobId); + @Override + protected AsynchronousJobOperationKey getOperationKey(HandlerRequest<EmptyRequestBody, RescalingStatusMessageParameters> request) { + throw new UnsupportedOperationException(); } @Override protected AsynchronousOperationInfo exceptionalOperationResultResponse(Throwable throwable) { - return AsynchronousOperationInfo.completeExceptional(new SerializedThrowable(throwable)); + throw new UnsupportedOperationException(); } @Override protected AsynchronousOperationInfo operationResultResponse(Acknowledge operationResult) { - return AsynchronousOperationInfo.complete(); + throw new UnsupportedOperationException(); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java index dcfbf6b..33bb50a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java @@ -27,7 +27,6 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmaster.JobResult; -import org.apache.flink.runtime.jobmaster.RescalingBehaviour; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.webmonitor.ClusterOverview; @@ -184,23 +183,6 @@ public interface RestfulGateway extends RpcGateway { throw new UnsupportedOperationException(); } - /** - * Trigger rescaling of the given job. - * - * @param jobId specifying the job to rescale - * @param newParallelism new parallelism of the job - * @param rescalingBehaviour defining how strict the rescaling has to be executed - * @param timeout of this operation - * @return Future which is completed with {@link Acknowledge} once the rescaling was successful - */ - default CompletableFuture<Acknowledge> rescaleJob( - JobID jobId, - int newParallelism, - RescalingBehaviour rescalingBehaviour, - @RpcTimeout Time timeout) { - throw new UnsupportedOperationException(); - } - default CompletableFuture<Acknowledge> shutDownCluster() { throw new UnsupportedOperationException(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java index 13ffe52..a2d3c3b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java @@ -37,7 +37,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; -import org.apache.flink.runtime.jobmaster.RescalingBehaviour; import org.apache.flink.runtime.jobmaster.SerializedInputSplit; import org.apache.flink.runtime.jobmaster.message.ClassloadingProps; import org.apache.flink.runtime.messages.Acknowledge; @@ -82,12 +81,6 @@ public class TestingJobMasterGateway implements JobMasterGateway { private final Supplier<CompletableFuture<Acknowledge>> cancelFunction; @Nonnull - private final BiFunction<Integer, RescalingBehaviour, CompletableFuture<Acknowledge>> rescalingJobFunction; - - @Nonnull - private final TriFunction<Collection<JobVertexID>, Integer, RescalingBehaviour, CompletableFuture<Acknowledge>> rescalingOperatorsFunction; - - @Nonnull private final Function<TaskExecutionState, CompletableFuture<Acknowledge>> updateTaskExecutionStateFunction; @Nonnull @@ -166,8 +159,6 @@ public class TestingJobMasterGateway implements JobMasterGateway { @Nonnull String address, @Nonnull String hostname, @Nonnull Supplier<CompletableFuture<Acknowledge>> cancelFunction, - @Nonnull BiFunction<Integer, RescalingBehaviour, CompletableFuture<Acknowledge>> rescalingJobFunction, - @Nonnull TriFunction<Collection<JobVertexID>, Integer, RescalingBehaviour, CompletableFuture<Acknowledge>> rescalingOperatorsFunction, @Nonnull Function<TaskExecutionState, CompletableFuture<Acknowledge>> updateTaskExecutionStateFunction, @Nonnull BiFunction<JobVertexID, ExecutionAttemptID, CompletableFuture<SerializedInputSplit>> requestNextInputSplitFunction, @Nonnull BiFunction<IntermediateDataSetID, ResultPartitionID, CompletableFuture<ExecutionState>> requestPartitionStateFunction, @@ -196,8 +187,6 @@ public class TestingJobMasterGateway implements JobMasterGateway { this.address = address; this.hostname = hostname; this.cancelFunction = cancelFunction; - this.rescalingJobFunction = rescalingJobFunction; - this.rescalingOperatorsFunction = rescalingOperatorsFunction; this.updateTaskExecutionStateFunction = updateTaskExecutionStateFunction; this.requestNextInputSplitFunction = requestNextInputSplitFunction; this.requestPartitionStateFunction = requestPartitionStateFunction; @@ -231,16 +220,6 @@ public class TestingJobMasterGateway implements JobMasterGateway { } @Override - public CompletableFuture<Acknowledge> rescaleJob(int newParallelism, RescalingBehaviour rescalingBehaviour, Time timeout) { - return rescalingJobFunction.apply(newParallelism, rescalingBehaviour); - } - - @Override - public CompletableFuture<Acknowledge> rescaleOperators(Collection<JobVertexID> operators, int newParallelism, RescalingBehaviour rescalingBehaviour, Time timeout) { - return rescalingOperatorsFunction.apply(operators, newParallelism, rescalingBehaviour); - } - - @Override public CompletableFuture<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState) { return updateTaskExecutionStateFunction.apply(taskExecutionState); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java index c13cbf6..a023917 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java @@ -36,7 +36,6 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; import org.apache.flink.runtime.jobmaster.JobMasterId; -import org.apache.flink.runtime.jobmaster.RescalingBehaviour; import org.apache.flink.runtime.jobmaster.SerializedInputSplit; import org.apache.flink.runtime.jobmaster.message.ClassloadingProps; import org.apache.flink.runtime.messages.Acknowledge; @@ -76,8 +75,6 @@ public class TestingJobMasterGatewayBuilder { private String address = "akka.tcp://flink@localhost:6130/user/jobmanager"; private String hostname = "localhost"; private Supplier<CompletableFuture<Acknowledge>> cancelFunction = () -> CompletableFuture.completedFuture(Acknowledge.get()); - private BiFunction<Integer, RescalingBehaviour, CompletableFuture<Acknowledge>> rescalingJobFunction = (ignoredA, ignoredB) -> CompletableFuture.completedFuture(Acknowledge.get()); - private TriFunction<Collection<JobVertexID>, Integer, RescalingBehaviour, CompletableFuture<Acknowledge>> rescalingOperatorsFunction = (ignoredA, ignoredB, ignoredC) -> CompletableFuture.completedFuture(Acknowledge.get()); private Function<TaskExecutionState, CompletableFuture<Acknowledge>> updateTaskExecutionStateFunction = ignored -> CompletableFuture.completedFuture(Acknowledge.get()); private BiFunction<JobVertexID, ExecutionAttemptID, CompletableFuture<SerializedInputSplit>> requestNextInputSplitFunction = (ignoredA, ignoredB) -> CompletableFuture.completedFuture(new SerializedInputSplit(null)); private BiFunction<IntermediateDataSetID, ResultPartitionID, CompletableFuture<ExecutionState>> requestPartitionStateFunction = (ignoredA, ignoredB) -> CompletableFuture.completedFuture(ExecutionState.RUNNING); @@ -119,16 +116,6 @@ public class TestingJobMasterGatewayBuilder { return this; } - public TestingJobMasterGatewayBuilder setRescalingJobFunction(BiFunction<Integer, RescalingBehaviour, CompletableFuture<Acknowledge>> rescalingJobFunction) { - this.rescalingJobFunction = rescalingJobFunction; - return this; - } - - public TestingJobMasterGatewayBuilder setRescalingOperatorsFunction(TriFunction<Collection<JobVertexID>, Integer, RescalingBehaviour, CompletableFuture<Acknowledge>> rescalingOperatorsFunction) { - this.rescalingOperatorsFunction = rescalingOperatorsFunction; - return this; - } - public TestingJobMasterGatewayBuilder setUpdateTaskExecutionStateFunction(Function<TaskExecutionState, CompletableFuture<Acknowledge>> updateTaskExecutionStateFunction) { this.updateTaskExecutionStateFunction = updateTaskExecutionStateFunction; return this; @@ -255,6 +242,6 @@ public class TestingJobMasterGatewayBuilder { } public TestingJobMasterGateway build() { - return new TestingJobMasterGateway(address, hostname, cancelFunction, rescalingJobFunction, rescalingOperatorsFunction, updateTaskExecutionStateFunction, requestNextInputSplitFunction, requestPartitionStateFunction, scheduleOrUpdateConsumersFunction, disconnectTaskManagerFunction, disconnectResourceManagerConsumer, classloadingPropsSupplier, offerSlotsFunction, failSlotConsumer, registerTaskManagerFunction, taskManagerHeartbeatConsumer, resourceManagerHeartbeatConsumer, requestJobDetai [...] + return new TestingJobMasterGateway(address, hostname, cancelFunction, updateTaskExecutionStateFunction, requestNextInputSplitFunction, requestPartitionStateFunction, scheduleOrUpdateConsumersFunction, disconnectTaskManagerFunction, disconnectResourceManagerConsumer, classloadingPropsSupplier, offerSlotsFunction, failSlotConsumer, registerTaskManagerFunction, taskManagerHeartbeatConsumer, resourceManagerHeartbeatConsumer, requestJobDetailsSupplier, requestJobSupplier, triggerSavepointFu [...] } }