This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 566d10d  [FLINK-12312][runtime] Remove CLI command for rescaling
566d10d is described below

commit 566d10d958d71fd113f8f7ecc08fa9b63a072919
Author: Gary Yao <g...@apache.org>
AuthorDate: Wed Apr 24 21:50:22 2019 +0200

    [FLINK-12312][runtime] Remove CLI command for rescaling
    
    This closes #8260.
---
 docs/ops/cli.md                                    |  23 --
 docs/ops/cli.zh.md                                 |  23 --
 .../org/apache/flink/client/cli/CliFrontend.java   |  59 -----
 .../apache/flink/client/cli/CliFrontendParser.java |  27 --
 .../apache/flink/client/program/ClusterClient.java |  11 -
 .../client/program/rest/RestClusterClient.java     |  41 ---
 .../flink/client/cli/CliFrontendModifyTest.java    | 136 ----------
 .../flink/runtime/dispatcher/Dispatcher.java       |  10 -
 .../apache/flink/runtime/jobmaster/JobMaster.java  | 289 +--------------------
 .../flink/runtime/jobmaster/JobMasterGateway.java  |  36 +--
 .../runtime/jobmaster/RescalingBehaviour.java      |  49 ----
 .../handler/job/rescaling/RescalingHandlers.java   |  56 ++--
 .../flink/runtime/webmonitor/RestfulGateway.java   |  18 --
 .../jobmaster/utils/TestingJobMasterGateway.java   |  21 --
 .../utils/TestingJobMasterGatewayBuilder.java      |  15 +-
 15 files changed, 30 insertions(+), 784 deletions(-)

diff --git a/docs/ops/cli.md b/docs/ops/cli.md
index 42698e8..b414b30 100644
--- a/docs/ops/cli.md
+++ b/docs/ops/cli.md
@@ -37,7 +37,6 @@ The command line can be used to
 - provide information about a job,
 - list running and waiting jobs,
 - trigger and dispose savepoints, and
-- modify a running job
 
 A prerequisite to using the command line interface is that the Flink
 master (JobManager) has been started (via
@@ -126,10 +125,6 @@ available.
 
         ./bin/flink stop -s [targetDirectory] -d <jobID>
         
--   Modify a running job (streaming jobs only):
-
-        ./bin/flink modify <jobID> -p <newParallelism>
-
 
 **NOTE**: The difference between cancelling and stopping a (streaming) job is 
the following:
 
@@ -429,24 +424,6 @@ Action "savepoint" triggers savepoints for a running job 
or disposes existing on
                                      in the configuration.
      -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper 
sub-paths
                                      for high availability mode
-
-
-
-Action "modify" modifies a running job (e.g. change of parallelism).
-
-  Syntax: modify <Job ID> [OPTIONS]
-  "modify" action options:
-     -h,--help                           Show the help message for the CLI
-                                         Frontend or the action.
-     -p,--parallelism <newParallelism>   New parallelism for the specified job.
-     -v,--verbose                        This option is deprecated.
-  Options for default mode:
-     -m,--jobmanager <arg>           Address of the JobManager (master) to 
which
-                                     to connect. Use this flag to connect to a
-                                     different JobManager than the one 
specified
-                                     in the configuration.
-     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper 
sub-paths
-                                     for high availability mode
 {% endhighlight %}
 
 {% top %}
diff --git a/docs/ops/cli.zh.md b/docs/ops/cli.zh.md
index bf995b4..7c02047 100644
--- a/docs/ops/cli.zh.md
+++ b/docs/ops/cli.zh.md
@@ -37,7 +37,6 @@ The command line can be used to
 - provide information about a job,
 - list running and waiting jobs,
 - trigger and dispose savepoints, and
-- modify a running job
 
 A prerequisite to using the command line interface is that the Flink
 master (JobManager) has been started (via
@@ -126,10 +125,6 @@ available.
 
         ./bin/flink stop -s [targetDirectory] -d <jobID>
 
--   Modify a running job (streaming jobs only):
-
-        ./bin/flink modify <jobID> -p <newParallelism>
-
 
 **NOTE**: The difference between cancelling and stopping a (streaming) job is 
the following:
 
@@ -429,24 +424,6 @@ Action "savepoint" triggers savepoints for a running job 
or disposes existing on
                                      in the configuration.
      -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper 
sub-paths
                                      for high availability mode
-
-
-
-Action "modify" modifies a running job (e.g. change of parallelism).
-
-  Syntax: modify <Job ID> [OPTIONS]
-  "modify" action options:
-     -h,--help                           Show the help message for the CLI
-                                         Frontend or the action.
-     -p,--parallelism <newParallelism>   New parallelism for the specified job.
-     -v,--verbose                        This option is deprecated.
-  Options for default mode:
-     -m,--jobmanager <arg>           Address of the JobManager (master) to 
which
-                                     to connect. Use this flag to connect to a
-                                     different JobManager than the one 
specified
-                                     in the configuration.
-     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper 
sub-paths
-                                     for high availability mode
 {% endhighlight %}
 
 {% top %}
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index cdd7616..c6b5c9a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -87,9 +87,6 @@ import java.util.stream.Collectors;
 
 import scala.concurrent.duration.FiniteDuration;
 
-import static org.apache.flink.client.cli.CliFrontendParser.HELP_OPTION;
-import static 
org.apache.flink.client.cli.CliFrontendParser.MODIFY_PARALLELISM_OPTION;
-
 /**
  * Implementation of a simple command line frontend for executing programs.
  */
@@ -104,7 +101,6 @@ public class CliFrontend {
        private static final String ACTION_CANCEL = "cancel";
        private static final String ACTION_STOP = "stop";
        private static final String ACTION_SAVEPOINT = "savepoint";
-       private static final String ACTION_MODIFY = "modify";
 
        // configuration dir parameters
        private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
@@ -736,58 +732,6 @@ public class CliFrontend {
                logAndSysout("Savepoint '" + savepointPath + "' disposed.");
        }
 
-       protected void modify(String[] args) throws CliArgsException, 
FlinkException {
-               LOG.info("Running 'modify' command.");
-
-               final Options commandOptions = 
CliFrontendParser.getModifyOptions();
-
-               final Options commandLineOptions = 
CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions);
-
-               final CommandLine commandLine = 
CliFrontendParser.parse(commandLineOptions, args, false);
-
-               if (commandLine.hasOption(HELP_OPTION.getOpt())) {
-                       
CliFrontendParser.printHelpForModify(customCommandLines);
-               }
-
-               final JobID jobId;
-               final String[] modifyArgs = commandLine.getArgs();
-
-               if (modifyArgs.length > 0) {
-                       jobId = parseJobId(modifyArgs[0]);
-               } else {
-                       throw new CliArgsException("Missing JobId");
-               }
-
-               final int newParallelism;
-               if (commandLine.hasOption(MODIFY_PARALLELISM_OPTION.getOpt())) {
-                       try {
-                               newParallelism = 
Integer.parseInt(commandLine.getOptionValue(MODIFY_PARALLELISM_OPTION.getOpt()));
-                       } catch (NumberFormatException e) {
-                               throw new CliArgsException("Could not parse the 
parallelism which is supposed to be an integer.", e);
-                       }
-               } else {
-                       throw new CliArgsException("Missing new parallelism.");
-               }
-
-               final CustomCommandLine<?> activeCommandLine = 
getActiveCustomCommandLine(commandLine);
-
-               logAndSysout("Modify job " + jobId + '.');
-               runClusterAction(
-                       activeCommandLine,
-                       commandLine,
-                       clusterClient -> {
-                               CompletableFuture<Acknowledge> rescaleFuture = 
clusterClient.rescaleJob(jobId, newParallelism);
-
-                               try {
-                                       rescaleFuture.get();
-                               } catch (Exception e) {
-                                       throw new FlinkException("Could not 
rescale job " + jobId + '.', ExceptionUtils.stripExecutionException(e));
-                               }
-                               logAndSysout("Rescaled job " + jobId + ". Its 
new parallelism is " + newParallelism + '.');
-                       }
-               );
-       }
-
        // 
--------------------------------------------------------------------------------------------
        //  Interaction with programs and JobManager
        // 
--------------------------------------------------------------------------------------------
@@ -1053,9 +997,6 @@ public class CliFrontend {
                                case ACTION_SAVEPOINT:
                                        savepoint(params);
                                        return 0;
-                               case ACTION_MODIFY:
-                                       modify(params);
-                                       return 0;
                                case "-h":
                                case "--help":
                                        
CliFrontendParser.printHelp(customCommandLines);
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 398efd2..cea3998 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -118,8 +118,6 @@ public class CliFrontendParser {
        public static final Option STOP_AND_DRAIN = new Option("d", "drain", 
false,
                        "Send MAX_WATERMARK before taking the savepoint and 
stopping the pipelne.");
 
-       static final Option MODIFY_PARALLELISM_OPTION = new Option("p", 
"parallelism", true, "New parallelism for the specified job.");
-
        static {
                HELP_OPTION.setRequired(false);
 
@@ -162,9 +160,6 @@ public class CliFrontendParser {
                CANCEL_WITH_SAVEPOINT_OPTION.setArgName("targetDirectory");
                CANCEL_WITH_SAVEPOINT_OPTION.setOptionalArg(true);
 
-               MODIFY_PARALLELISM_OPTION.setRequired(false);
-               MODIFY_PARALLELISM_OPTION.setArgName("newParallelism");
-
                STOP_WITH_SAVEPOINT.setRequired(false);
                STOP_WITH_SAVEPOINT.setArgName("withSavepoint");
                STOP_WITH_SAVEPOINT.setOptionalArg(true);
@@ -240,12 +235,6 @@ public class CliFrontendParser {
                return options.addOption(JAR_OPTION);
        }
 
-       static Options getModifyOptions() {
-               final Options options = buildGeneralOptions(new Options());
-               options.addOption(MODIFY_PARALLELISM_OPTION);
-               return options;
-       }
-
        // 
--------------------------------------------------------------------------------------------
        //  Help
        // 
--------------------------------------------------------------------------------------------
@@ -297,7 +286,6 @@ public class CliFrontendParser {
                printHelpForStop(customCommandLines);
                printHelpForCancel(customCommandLines);
                printHelpForSavepoint(customCommandLines);
-               printHelpForModify(customCommandLines);
 
                System.out.println();
        }
@@ -390,21 +378,6 @@ public class CliFrontendParser {
                System.out.println();
        }
 
-       public static void printHelpForModify(Collection<CustomCommandLine<?>> 
customCommandLines) {
-               HelpFormatter formatter = new HelpFormatter();
-               formatter.setLeftPadding(5);
-               formatter.setWidth(80);
-
-               System.out.println("\nAction \"modify\" modifies a running job 
(e.g. change of parallelism).");
-               System.out.println("\n  Syntax: modify <Job ID> [OPTIONS]");
-               formatter.setSyntaxPrefix("  \"modify\" action options:");
-               formatter.printHelp(" ", getModifyOptions());
-
-               printCustomCliOptions(customCommandLines, formatter, false);
-
-               System.out.println();
-       }
-
        /**
         * Prints custom cli options.
         * @param formatter The formatter to use for printing
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 0be7dff..0da7997 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -499,17 +499,6 @@ public abstract class ClusterClient<T> {
        public abstract JobSubmissionResult submitJob(JobGraph jobGraph, 
ClassLoader classLoader)
                throws ProgramInvocationException;
 
-       /**
-        * Rescales the specified job such that it will have the new 
parallelism.
-        *
-        * @param jobId specifying the job to modify
-        * @param newParallelism specifying the new parallelism of the rescaled 
job
-        * @return Future which is completed once the rescaling has been 
completed
-        */
-       public CompletableFuture<Acknowledge> rescaleJob(JobID jobId, int 
newParallelism) {
-               throw new UnsupportedOperationException("The " + 
getClass().getSimpleName() + " does not support rescaling.");
-       }
-
        public void shutDownCluster() {
                throw new UnsupportedOperationException("The " + 
getClass().getSimpleName() + " does not support shutDownCluster.");
        }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 269462a..1d08992 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -46,10 +46,6 @@ import org.apache.flink.runtime.rest.FileUpload;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
 import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
-import 
org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusHeaders;
-import 
org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusMessageParameters;
-import 
org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerHeaders;
-import 
org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
@@ -532,43 +528,6 @@ public class RestClusterClient<T> extends ClusterClient<T> 
implements NewCluster
        }
 
        @Override
-       public CompletableFuture<Acknowledge> rescaleJob(JobID jobId, int 
newParallelism) {
-
-               final RescalingTriggerHeaders rescalingTriggerHeaders = 
RescalingTriggerHeaders.getInstance();
-               final RescalingTriggerMessageParameters 
rescalingTriggerMessageParameters = 
rescalingTriggerHeaders.getUnresolvedMessageParameters();
-               
rescalingTriggerMessageParameters.jobPathParameter.resolve(jobId);
-               
rescalingTriggerMessageParameters.rescalingParallelismQueryParameter.resolve(Collections.singletonList(newParallelism));
-
-               final CompletableFuture<TriggerResponse> 
rescalingTriggerResponseFuture = sendRequest(
-                       rescalingTriggerHeaders,
-                       rescalingTriggerMessageParameters);
-
-               final CompletableFuture<AsynchronousOperationInfo> 
rescalingOperationFuture = rescalingTriggerResponseFuture.thenCompose(
-                       (TriggerResponse triggerResponse) -> {
-                               final TriggerId triggerId = 
triggerResponse.getTriggerId();
-                               final RescalingStatusHeaders 
rescalingStatusHeaders = RescalingStatusHeaders.getInstance();
-                               final RescalingStatusMessageParameters 
rescalingStatusMessageParameters = 
rescalingStatusHeaders.getUnresolvedMessageParameters();
-
-                               
rescalingStatusMessageParameters.jobPathParameter.resolve(jobId);
-                               
rescalingStatusMessageParameters.triggerIdPathParameter.resolve(triggerId);
-
-                               return pollResourceAsync(
-                                       () -> sendRequest(
-                                               rescalingStatusHeaders,
-                                               
rescalingStatusMessageParameters));
-                       });
-
-               return rescalingOperationFuture.thenApply(
-                       (AsynchronousOperationInfo asynchronousOperationInfo) 
-> {
-                               if (asynchronousOperationInfo.getFailureCause() 
== null) {
-                                       return Acknowledge.get();
-                               } else {
-                                       throw new 
CompletionException(asynchronousOperationInfo.getFailureCause());
-                               }
-                       });
-       }
-
-       @Override
        public CompletableFuture<Acknowledge> disposeSavepoint(String 
savepointPath) {
                final SavepointDisposalRequest savepointDisposalRequest = new 
SavepointDisposalRequest(savepointPath);
 
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java
deleted file mode 100644
index 05dacec..0000000
--- 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.client.cli;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.cli.util.MockedCliFrontend;
-import org.apache.flink.client.deployment.StandaloneClusterId;
-import org.apache.flink.client.program.rest.RestClusterClient;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.messages.Acknowledge;
-
-import org.hamcrest.Matchers;
-import org.junit.Test;
-
-import java.util.concurrent.CompletableFuture;
-
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for the modify command.
- */
-public class CliFrontendModifyTest extends CliFrontendTestBase {
-
-       @Test
-       public void testModifyJob() throws Exception {
-               final JobID jobId = new JobID();
-               final int parallelism = 42;
-               String[] args = {jobId.toString(), "-p", 
String.valueOf(parallelism)};
-
-               Tuple2<JobID, Integer> jobIdParallelism = callModify(args);
-
-               assertThat(jobIdParallelism.f0, Matchers.is(jobId));
-               assertThat(jobIdParallelism.f1, Matchers.is(parallelism));
-       }
-
-       @Test
-       public void testMissingJobId() throws Exception {
-               final int parallelism = 42;
-               final String[] args = {"-p", String.valueOf(parallelism)};
-
-               try {
-                       callModify(args);
-                       fail("Expected CliArgsException");
-               } catch (CliArgsException expected) {
-                       // expected
-               }
-       }
-
-       @Test
-       public void testMissingParallelism() throws Exception {
-               final JobID jobId = new JobID();
-               final String[] args = {jobId.toString()};
-
-               try {
-                       callModify(args);
-                       fail("Expected CliArgsException");
-               } catch (CliArgsException expected) {
-                       // expected
-               }
-       }
-
-       @Test
-       public void testUnparsableParalllelism() throws Exception {
-               final JobID jobId = new JobID();
-               final String[] args = {jobId.toString(), "-p", "foobar"};
-
-               try {
-                       callModify(args);
-                       fail("Expected CliArgsException");
-               } catch (CliArgsException expected) {
-                       // expected
-               }
-       }
-
-       @Test
-       public void testUnparsableJobId() throws Exception {
-               final int parallelism = 42;
-               final String[] args = {"foobar", "-p", 
String.valueOf(parallelism)};
-
-               try {
-                       callModify(args);
-                       fail("Expected CliArgsException");
-               } catch (CliArgsException expected) {
-                       // expected
-               }
-       }
-
-       private Tuple2<JobID, Integer> callModify(String[] args) throws 
Exception {
-               final CompletableFuture<Tuple2<JobID, Integer>> 
rescaleJobFuture = new CompletableFuture<>();
-               final TestingClusterClient clusterClient = new 
TestingClusterClient(rescaleJobFuture, getConfiguration());
-               final MockedCliFrontend cliFrontend = new 
MockedCliFrontend(clusterClient);
-
-               cliFrontend.modify(args);
-
-               assertThat(rescaleJobFuture.isDone(), Matchers.is(true));
-
-               return rescaleJobFuture.get();
-       }
-
-       private static final class TestingClusterClient extends 
RestClusterClient<StandaloneClusterId> {
-
-               private final CompletableFuture<Tuple2<JobID, Integer>> 
rescaleJobFuture;
-
-               TestingClusterClient(CompletableFuture<Tuple2<JobID, Integer>> 
rescaleJobFuture, Configuration configuration) throws Exception {
-                       super(configuration, StandaloneClusterId.getInstance());
-
-                       this.rescaleJobFuture = rescaleJobFuture;
-               }
-
-               @Override
-               public CompletableFuture<Acknowledge> rescaleJob(JobID jobId, 
int newParallelism) {
-                       rescaleJobFuture.complete(Tuple2.of(jobId, 
newParallelism));
-
-                       return 
CompletableFuture.completedFuture(Acknowledge.get());
-               }
-       }
-
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 3c4028c..1f1fc62 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -43,7 +43,6 @@ import 
org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
 import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.jobmaster.RescalingBehaviour;
 import 
org.apache.flink.runtime.jobmaster.factories.DefaultJobManagerJobMetricGroupFactory;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
@@ -423,15 +422,6 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
        }
 
        @Override
-       public CompletableFuture<Acknowledge> rescaleJob(JobID jobId, int 
newParallelism, RescalingBehaviour rescalingBehaviour, Time timeout) {
-               final CompletableFuture<JobMasterGateway> 
jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
-
-               return jobMasterGatewayFuture.thenCompose(
-                       (JobMasterGateway jobMasterGateway) ->
-                               jobMasterGateway.rescaleJob(newParallelism, 
rescalingBehaviour, timeout));
-       }
-
-       @Override
        public CompletableFuture<ClusterOverview> requestClusterOverview(Time 
timeout) {
                CompletableFuture<ResourceOverview> taskManagerOverviewFuture = 
runResourceManagerCommand(resourceManagerGateway -> 
resourceManagerGateway.requestResourceOverview(timeout));
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index f912f45..6c83c7a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -31,10 +31,7 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.CheckpointException;
-import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.client.JobExecutionException;
@@ -42,7 +39,6 @@ import 
org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -62,12 +58,10 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
-import org.apache.flink.runtime.jobmaster.exceptions.JobModificationException;
 import 
org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
 import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
 import org.apache.flink.runtime.jobmaster.slotpool.SchedulerFactory;
@@ -115,9 +109,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -208,9 +200,6 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
        private JobManagerJobMetricGroup jobManagerJobMetricGroup;
 
        @Nullable
-       private String lastInternalSavepoint;
-
-       @Nullable
        private ResourceManagerAddress resourceManagerAddress;
 
        @Nullable
@@ -291,7 +280,6 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                this.registeredTaskManagers = new HashMap<>(4);
 
                this.backPressureStatsTracker = 
checkNotNull(jobManagerSharedServices.getBackPressureStatsTracker());
-               this.lastInternalSavepoint = null;
 
                this.jobManagerJobMetricGroup = 
jobMetricGroupFactory.create(jobGraph);
                this.executionGraph = 
createAndRestoreExecutionGraph(jobManagerJobMetricGroup);
@@ -365,15 +353,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                // shut down will internally release all registered slots
                slotPool.close();
 
-               final CompletableFuture<Void> disposeInternalSavepointFuture;
-
-               if (lastInternalSavepoint != null) {
-                       disposeInternalSavepointFuture = 
CompletableFuture.runAsync(() -> disposeSavepoint(lastInternalSavepoint));
-               } else {
-                       disposeInternalSavepointFuture = 
CompletableFuture.completedFuture(null);
-               }
-
-               return 
FutureUtils.completeAll(Collections.singletonList(disposeInternalSavepointFuture));
+               return CompletableFuture.completedFuture(null);
        }
 
        
//----------------------------------------------------------------------------------------------
@@ -387,130 +367,6 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                return CompletableFuture.completedFuture(Acknowledge.get());
        }
 
-       @Override
-       public CompletableFuture<Acknowledge> rescaleJob(
-                       int newParallelism,
-                       RescalingBehaviour rescalingBehaviour,
-                       Time timeout) {
-               final ArrayList<JobVertexID> allOperators = new 
ArrayList<>(jobGraph.getNumberOfVertices());
-
-               for (JobVertex jobVertex : jobGraph.getVertices()) {
-                       allOperators.add(jobVertex.getID());
-               }
-
-               return rescaleOperators(allOperators, newParallelism, 
rescalingBehaviour, timeout);
-       }
-
-       @Override
-       public CompletableFuture<Acknowledge> rescaleOperators(
-                       Collection<JobVertexID> operators,
-                       int newParallelism,
-                       RescalingBehaviour rescalingBehaviour,
-                       Time timeout) {
-
-               if (newParallelism <= 0) {
-                       return FutureUtils.completedExceptionally(
-                               new JobModificationException("The target 
parallelism of a rescaling operation must be larger than 0."));
-               }
-
-               // 1. Check whether we can rescale the job & rescale the 
respective vertices
-               try {
-                       rescaleJobGraph(operators, newParallelism, 
rescalingBehaviour);
-               } catch (FlinkException e) {
-                       final String msg = String.format("Cannot rescale job 
%s.", jobGraph.getName());
-
-                       log.info(msg, e);
-                       return FutureUtils.completedExceptionally(new 
JobModificationException(msg, e));
-               }
-
-               final ExecutionGraph currentExecutionGraph = executionGraph;
-
-               final JobManagerJobMetricGroup newJobManagerJobMetricGroup = 
jobMetricGroupFactory.create(jobGraph);
-               final ExecutionGraph newExecutionGraph;
-
-               try {
-                       newExecutionGraph = 
createExecutionGraph(newJobManagerJobMetricGroup);
-               } catch (JobExecutionException | JobException e) {
-                       return FutureUtils.completedExceptionally(
-                               new JobModificationException("Could not create 
rescaled ExecutionGraph.", e));
-               }
-
-               // 3. disable checkpoint coordinator to suppress subsequent 
checkpoints
-               final CheckpointCoordinator checkpointCoordinator = 
currentExecutionGraph.getCheckpointCoordinator();
-               checkpointCoordinator.stopCheckpointScheduler();
-
-               // 4. take a savepoint
-               final CompletableFuture<String> savepointFuture = 
getJobModificationSavepoint(timeout);
-
-               final CompletableFuture<ExecutionGraph> executionGraphFuture = 
restoreExecutionGraphFromRescalingSavepoint(
-                       newExecutionGraph,
-                       savepointFuture)
-                       .handleAsync(
-                               (ExecutionGraph executionGraph, Throwable 
failure) -> {
-                                       if (failure != null) {
-                                               // in case that we couldn't 
take a savepoint or restore from it, let's restart the checkpoint
-                                               // coordinator and abort the 
rescaling operation
-                                               if 
(checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
-                                                       
checkpointCoordinator.startCheckpointScheduler();
-                                               }
-
-                                               throw new 
CompletionException(ExceptionUtils.stripCompletionException(failure));
-                                       } else {
-                                               return executionGraph;
-                                       }
-                               },
-                               getMainThreadExecutor());
-
-               // 5. suspend the current job
-               final CompletableFuture<JobStatus> terminationFuture = 
executionGraphFuture.thenComposeAsync(
-                       (ExecutionGraph ignored) -> {
-                               suspendExecutionGraph(new FlinkException("Job 
is being rescaled."));
-                               return 
currentExecutionGraph.getTerminationFuture();
-                       },
-                       getMainThreadExecutor());
-
-               final CompletableFuture<Void> suspendedFuture = 
terminationFuture.thenAccept(
-                       (JobStatus jobStatus) -> {
-                               if (jobStatus != JobStatus.SUSPENDED) {
-                                       final String msg = String.format("Job 
%s rescaling failed because we could not suspend the execution graph.", 
jobGraph.getName());
-                                       log.info(msg);
-                                       throw new CompletionException(new 
JobModificationException(msg));
-                               }
-                       });
-
-               // 6. resume the new execution graph from the taken savepoint
-               final CompletableFuture<Acknowledge> rescalingFuture = 
suspendedFuture.thenCombineAsync(
-                       executionGraphFuture,
-                       (Void ignored, ExecutionGraph restoredExecutionGraph) 
-> {
-                               // check if the ExecutionGraph is still the same
-                               if (executionGraph == currentExecutionGraph) {
-                                       clearExecutionGraphFields();
-                                       
assignExecutionGraph(restoredExecutionGraph, newJobManagerJobMetricGroup);
-                                       scheduleExecutionGraph();
-
-                                       return Acknowledge.get();
-                               } else {
-                                       throw new CompletionException(new 
JobModificationException("Detected concurrent modification of ExecutionGraph. 
Aborting the rescaling."));
-                               }
-
-                       },
-                       getMainThreadExecutor());
-
-               rescalingFuture.whenCompleteAsync(
-                       (Acknowledge ignored, Throwable throwable) -> {
-                               if (throwable != null) {
-                                       // fail the newly created execution 
graph
-                                       newExecutionGraph.failGlobal(
-                                               new SuppressRestartsException(
-                                                       new FlinkException(
-                                                               
String.format("Failed to rescale the job %s.", jobGraph.getJobID()),
-                                                               throwable)));
-                               }
-                       }, getMainThreadExecutor());
-
-               return rescalingFuture;
-       }
-
        /**
         * Updates the task execution state for a given task.
         *
@@ -1261,24 +1117,6 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
        }
 
        /**
-        * Dispose the savepoint stored under the given path.
-        *
-        * @param savepointPath path where the savepoint is stored
-        */
-       private void disposeSavepoint(String savepointPath) {
-               try {
-                       // delete the temporary savepoint
-                       Checkpoints.disposeSavepoint(
-                               savepointPath,
-                               jobMasterConfiguration.getConfiguration(),
-                               userCodeLoader,
-                               log);
-               } catch (FlinkException | IOException e) {
-                       log.info("Could not dispose temporary rescaling 
savepoint under {}.", savepointPath, e);
-               }
-       }
-
-       /**
         * Tries to restore the given {@link ExecutionGraph} from the provided 
{@link SavepointRestoreSettings}.
         *
         * @param executionGraphToRestore {@link ExecutionGraph} which is 
supposed to be restored
@@ -1435,131 +1273,6 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                slotPool.disconnectResourceManager();
        }
 
-       /**
-        * Restore the given {@link ExecutionGraph} from the rescaling 
savepoint. If the {@link ExecutionGraph} could
-        * be restored, then this savepoint will be recorded as the latest 
successful modification savepoint. A previous
-        * savepoint will be disposed. If the rescaling savepoint is empty, the 
job will be restored from the initially
-        * provided savepoint.
-        *
-        * @param newExecutionGraph to restore
-        * @param savepointFuture containing the path to the internal 
modification savepoint
-        * @return Future which is completed with the restored {@link 
ExecutionGraph}
-        */
-       private CompletableFuture<ExecutionGraph> 
restoreExecutionGraphFromRescalingSavepoint(ExecutionGraph newExecutionGraph, 
CompletableFuture<String> savepointFuture) {
-               return savepointFuture
-                       .thenApplyAsync(
-                               (@Nullable String savepointPath) -> {
-                                       if (savepointPath != null) {
-                                               try {
-                                                       
tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, 
SavepointRestoreSettings.forPath(savepointPath, false));
-                                               } catch (Exception e) {
-                                                       final String message = 
String.format("Could not restore from temporary rescaling savepoint. This might 
indicate " +
-                                                                       "that 
the savepoint %s got corrupted. Deleting this savepoint as a precaution.",
-                                                               savepointPath);
-
-                                                       log.info(message);
-
-                                                       CompletableFuture
-                                                               .runAsync(
-                                                                       () -> {
-                                                                               
if (savepointPath.equals(lastInternalSavepoint)) {
-                                                                               
        lastInternalSavepoint = null;
-                                                                               
}
-                                                                       },
-                                                                       
getMainThreadExecutor())
-                                                               .thenRunAsync(
-                                                                       () -> 
disposeSavepoint(savepointPath),
-                                                                       
scheduledExecutorService);
-
-                                                       throw new 
CompletionException(new JobModificationException(message, e));
-                                               }
-                                       } else {
-                                               // No rescaling savepoint, 
restart from the initial savepoint or none
-                                               try {
-                                                       
tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, 
jobGraph.getSavepointRestoreSettings());
-                                               } catch (Exception e) {
-                                                       final String message = 
String.format("Could not restore from initial savepoint. This might indicate " +
-                                                               "that the 
savepoint %s got corrupted.", 
jobGraph.getSavepointRestoreSettings().getRestorePath());
-
-                                                       log.info(message);
-
-                                                       throw new 
CompletionException(new JobModificationException(message, e));
-                                               }
-                                       }
-
-                                       return newExecutionGraph;
-                               }, scheduledExecutorService);
-       }
-
-       /**
-        * Takes an internal savepoint for job modification purposes. If the 
savepoint was not successful because
-        * not all tasks were running, it returns the last successful 
modification savepoint.
-        *
-        * @param timeout for the operation
-        * @return Future which is completed with the savepoint path or the 
last successful modification savepoint if the
-        * former was not successful
-        */
-       private CompletableFuture<String> getJobModificationSavepoint(Time 
timeout) {
-               return triggerSavepoint(
-                       null,
-                       false,
-                       timeout)
-                       .handleAsync(
-                               (String savepointPath, Throwable throwable) -> {
-                                       if (throwable != null) {
-                                               final Throwable 
strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
-                                               if (strippedThrowable 
instanceof CheckpointException) {
-                                                       final 
CheckpointException checkpointException = (CheckpointException) 
strippedThrowable;
-
-                                                       if 
(checkpointException.getCheckpointFailureReason() == 
CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING) {
-                                                               return 
lastInternalSavepoint;
-                                                       } else {
-                                                               throw new 
CompletionException(checkpointException);
-                                                       }
-                                               } else {
-                                                       throw new 
CompletionException(strippedThrowable);
-                                               }
-                                       } else {
-                                               final String savepointToDispose 
= lastInternalSavepoint;
-                                               lastInternalSavepoint = 
savepointPath;
-
-                                               if (savepointToDispose != null) 
{
-                                                       // dispose the old 
savepoint asynchronously
-                                                       
CompletableFuture.runAsync(
-                                                               () -> 
disposeSavepoint(savepointToDispose),
-                                                               
scheduledExecutorService);
-                                               }
-
-                                               return lastInternalSavepoint;
-                                       }
-                               },
-                               getMainThreadExecutor());
-       }
-
-       /**
-        * Rescales the given operators of the {@link JobGraph} of this {@link 
JobMaster} with respect to given
-        * parallelism and {@link RescalingBehaviour}.
-        *
-        * @param operators to rescale
-        * @param newParallelism new parallelism for these operators
-        * @param rescalingBehaviour of the rescaling operation
-        * @throws FlinkException if the {@link JobGraph} could not be rescaled
-        */
-       private void rescaleJobGraph(Collection<JobVertexID> operators, int 
newParallelism, RescalingBehaviour rescalingBehaviour) throws FlinkException {
-               for (JobVertexID jobVertexId : operators) {
-                       final JobVertex jobVertex = 
jobGraph.findVertexByID(jobVertexId);
-
-                       // update max parallelism in case that it has not been 
configured
-                       final ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
-
-                       if (executionJobVertex != null) {
-                               
jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
-                       }
-
-                       rescalingBehaviour.accept(jobVertex, newParallelism);
-               }
-       }
-
        
//----------------------------------------------------------------------------------------------
        // Service methods
        
//----------------------------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index b15d704..b2a282d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-import javax.annotation.Nullable;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
@@ -48,6 +45,11 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
 /**
  * {@link JobMaster} rpc gateway interface.
  */
@@ -66,34 +68,6 @@ public interface JobMasterGateway extends
        CompletableFuture<Acknowledge> cancel(@RpcTimeout Time timeout);
 
        /**
-        * Triggers rescaling of the executed job.
-        *
-        * @param newParallelism new parallelism of the job
-        * @param rescalingBehaviour defining how strict the rescaling has to 
be executed
-        * @param timeout of this operation
-        * @return Future which is completed with {@link Acknowledge} once the 
rescaling was successful
-        */
-       CompletableFuture<Acknowledge> rescaleJob(
-               int newParallelism,
-               RescalingBehaviour rescalingBehaviour,
-               @RpcTimeout Time timeout);
-
-       /**
-        * Triggers rescaling of the given set of operators.
-        *
-        * @param operators set of operators which shall be rescaled
-        * @param newParallelism new parallelism of the given set of operators
-        * @param rescalingBehaviour defining how strict the rescaling has to 
be executed
-        * @param timeout of this operation
-        * @return Future which is completed with {@link Acknowledge} once the 
rescaling was successful
-        */
-       CompletableFuture<Acknowledge> rescaleOperators(
-               Collection<JobVertexID> operators,
-               int newParallelism,
-               RescalingBehaviour rescalingBehaviour,
-               @RpcTimeout Time timeout);
-
-       /**
         * Updates the task execution state for a given task.
         *
         * @param taskExecutionState New task execution state for a given task
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
deleted file mode 100644
index 64e2ffa..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmaster;
-
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.function.BiConsumerWithException;
-
-/**
- * Definition of the rescaling behaviour.
- */
-public enum RescalingBehaviour implements BiConsumerWithException<JobVertex, 
Integer, FlinkException> {
-       // rescaling is only executed if the operator can be set to the given 
parallelism
-       STRICT {
-               @Override
-               public void accept(JobVertex jobVertex, Integer newParallelism) 
throws FlinkException {
-                       if (jobVertex.getMaxParallelism() < newParallelism) {
-                               throw new FlinkException("Cannot rescale vertex 
" + jobVertex.getName() +
-                                       " because its maximum parallelism " + 
jobVertex.getMaxParallelism() +
-                                       " is smaller than the new parallelism " 
+ newParallelism + '.');
-                       } else {
-                               jobVertex.setParallelism(newParallelism);
-                       }
-               }
-       },
-       // the new parallelism will be the minimum of the given parallelism and 
the maximum parallelism
-       RELAXED {
-               @Override
-               public void accept(JobVertex jobVertex, Integer newParallelism) 
{
-                       
jobVertex.setParallelism(Math.min(jobVertex.getMaxParallelism(), 
newParallelism));
-               }
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingHandlers.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingHandlers.java
index 8860e5b..2b0f156 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingHandlers.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingHandlers.java
@@ -18,28 +18,23 @@
 
 package org.apache.flink.runtime.rest.handler.job.rescaling;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.jobmaster.RescalingBehaviour;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import 
org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers;
 import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
+import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
+import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
 import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
-import 
org.apache.flink.runtime.rest.messages.RescalingParallelismQueryParameter;
-import org.apache.flink.runtime.rest.messages.TriggerId;
-import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
-import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.util.SerializedThrowable;
 
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
-import java.util.List;
+import javax.annotation.Nonnull;
+
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -48,6 +43,10 @@ import java.util.concurrent.CompletableFuture;
  */
 public class RescalingHandlers extends 
AbstractAsynchronousOperationHandlers<AsynchronousJobOperationKey, Acknowledge> 
{
 
+       private static RestHandlerException featureDisabledException() {
+               return new RestHandlerException("Rescaling is temporarily 
disabled. See FLINK-12312.", HttpResponseStatus.SERVICE_UNAVAILABLE);
+       }
+
        /**
         * Handler which triggers the rescaling of the specified job.
         */
@@ -65,29 +64,18 @@ public class RescalingHandlers extends 
AbstractAsynchronousOperationHandlers<Asy
                }
 
                @Override
-               protected CompletableFuture<Acknowledge> 
triggerOperation(HandlerRequest<EmptyRequestBody, 
RescalingTriggerMessageParameters> request, RestfulGateway gateway) throws 
RestHandlerException {
-                       final JobID jobId = 
request.getPathParameter(JobIDPathParameter.class);
-                       final List<Integer> queryParameter = 
request.getQueryParameter(RescalingParallelismQueryParameter.class);
-
-                       if (queryParameter.isEmpty()) {
-                               throw new RestHandlerException("No new 
parallelism was specified.", HttpResponseStatus.BAD_REQUEST);
-                       }
-
-                       final int newParallelism = queryParameter.get(0);
-
-                       final CompletableFuture<Acknowledge> rescalingFuture = 
gateway.rescaleJob(
-                               jobId,
-                               newParallelism,
-                               RescalingBehaviour.STRICT,
-                               RpcUtils.INF_TIMEOUT);
+               public CompletableFuture<TriggerResponse> 
handleRequest(@Nonnull final HandlerRequest<EmptyRequestBody, 
RescalingTriggerMessageParameters> request, @Nonnull final RestfulGateway 
gateway) throws RestHandlerException {
+                       throw featureDisabledException();
+               }
 
-                       return rescalingFuture;
+               @Override
+               protected CompletableFuture<Acknowledge> 
triggerOperation(HandlerRequest<EmptyRequestBody, 
RescalingTriggerMessageParameters> request, RestfulGateway gateway) {
+                       throw new UnsupportedOperationException();
                }
 
                @Override
                protected AsynchronousJobOperationKey 
createOperationKey(HandlerRequest<EmptyRequestBody, 
RescalingTriggerMessageParameters> request) {
-                       final JobID jobId = 
request.getPathParameter(JobIDPathParameter.class);
-                       return AsynchronousJobOperationKey.of(new TriggerId(), 
jobId);
+                       throw new UnsupportedOperationException();
                }
        }
 
@@ -108,21 +96,23 @@ public class RescalingHandlers extends 
AbstractAsynchronousOperationHandlers<Asy
                }
 
                @Override
-               protected AsynchronousJobOperationKey 
getOperationKey(HandlerRequest<EmptyRequestBody, 
RescalingStatusMessageParameters> request) {
-                       final JobID jobId = 
request.getPathParameter(JobIDPathParameter.class);
-                       final TriggerId triggerId = 
request.getPathParameter(TriggerIdPathParameter.class);
+               public 
CompletableFuture<AsynchronousOperationResult<AsynchronousOperationInfo>> 
handleRequest(@Nonnull final HandlerRequest<EmptyRequestBody, 
RescalingStatusMessageParameters> request, @Nonnull final RestfulGateway 
gateway) throws RestHandlerException {
+                       throw featureDisabledException();
+               }
 
-                       return AsynchronousJobOperationKey.of(triggerId, jobId);
+               @Override
+               protected AsynchronousJobOperationKey 
getOperationKey(HandlerRequest<EmptyRequestBody, 
RescalingStatusMessageParameters> request) {
+                       throw new UnsupportedOperationException();
                }
 
                @Override
                protected AsynchronousOperationInfo 
exceptionalOperationResultResponse(Throwable throwable) {
-                       return 
AsynchronousOperationInfo.completeExceptional(new 
SerializedThrowable(throwable));
+                       throw new UnsupportedOperationException();
                }
 
                @Override
                protected AsynchronousOperationInfo 
operationResultResponse(Acknowledge operationResult) {
-                       return AsynchronousOperationInfo.complete();
+                       throw new UnsupportedOperationException();
                }
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index dcfbf6b..33bb50a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -27,7 +27,6 @@ import 
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.jobmaster.RescalingBehaviour;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
@@ -184,23 +183,6 @@ public interface RestfulGateway extends RpcGateway {
                throw new UnsupportedOperationException();
        }
 
-       /**
-        * Trigger rescaling of the given job.
-        *
-        * @param jobId specifying the job to rescale
-        * @param newParallelism new parallelism of the job
-        * @param rescalingBehaviour defining how strict the rescaling has to 
be executed
-        * @param timeout of this operation
-        * @return Future which is completed with {@link Acknowledge} once the 
rescaling was successful
-        */
-       default CompletableFuture<Acknowledge> rescaleJob(
-                       JobID jobId,
-                       int newParallelism,
-                       RescalingBehaviour rescalingBehaviour,
-                       @RpcTimeout Time timeout) {
-               throw new UnsupportedOperationException();
-       }
-
        default CompletableFuture<Acknowledge> shutDownCluster() {
                throw new UnsupportedOperationException();
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index 13ffe52..a2d3c3b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -37,7 +37,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
-import org.apache.flink.runtime.jobmaster.RescalingBehaviour;
 import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -82,12 +81,6 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
        private final Supplier<CompletableFuture<Acknowledge>> cancelFunction;
 
        @Nonnull
-       private final BiFunction<Integer, RescalingBehaviour, 
CompletableFuture<Acknowledge>> rescalingJobFunction;
-
-       @Nonnull
-       private final TriFunction<Collection<JobVertexID>, Integer, 
RescalingBehaviour, CompletableFuture<Acknowledge>> rescalingOperatorsFunction;
-
-       @Nonnull
        private final Function<TaskExecutionState, 
CompletableFuture<Acknowledge>> updateTaskExecutionStateFunction;
 
        @Nonnull
@@ -166,8 +159,6 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
                        @Nonnull String address,
                        @Nonnull String hostname,
                        @Nonnull Supplier<CompletableFuture<Acknowledge>> 
cancelFunction,
-                       @Nonnull BiFunction<Integer, RescalingBehaviour, 
CompletableFuture<Acknowledge>> rescalingJobFunction,
-                       @Nonnull TriFunction<Collection<JobVertexID>, Integer, 
RescalingBehaviour, CompletableFuture<Acknowledge>> rescalingOperatorsFunction,
                        @Nonnull Function<TaskExecutionState, 
CompletableFuture<Acknowledge>> updateTaskExecutionStateFunction,
                        @Nonnull BiFunction<JobVertexID, ExecutionAttemptID, 
CompletableFuture<SerializedInputSplit>> requestNextInputSplitFunction,
                        @Nonnull BiFunction<IntermediateDataSetID, 
ResultPartitionID, CompletableFuture<ExecutionState>> 
requestPartitionStateFunction,
@@ -196,8 +187,6 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
                this.address = address;
                this.hostname = hostname;
                this.cancelFunction = cancelFunction;
-               this.rescalingJobFunction = rescalingJobFunction;
-               this.rescalingOperatorsFunction = rescalingOperatorsFunction;
                this.updateTaskExecutionStateFunction = 
updateTaskExecutionStateFunction;
                this.requestNextInputSplitFunction = 
requestNextInputSplitFunction;
                this.requestPartitionStateFunction = 
requestPartitionStateFunction;
@@ -231,16 +220,6 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
        }
 
        @Override
-       public CompletableFuture<Acknowledge> rescaleJob(int newParallelism, 
RescalingBehaviour rescalingBehaviour, Time timeout) {
-               return rescalingJobFunction.apply(newParallelism, 
rescalingBehaviour);
-       }
-
-       @Override
-       public CompletableFuture<Acknowledge> 
rescaleOperators(Collection<JobVertexID> operators, int newParallelism, 
RescalingBehaviour rescalingBehaviour, Time timeout) {
-               return rescalingOperatorsFunction.apply(operators, 
newParallelism, rescalingBehaviour);
-       }
-
-       @Override
        public CompletableFuture<Acknowledge> 
updateTaskExecutionState(TaskExecutionState taskExecutionState) {
                return 
updateTaskExecutionStateFunction.apply(taskExecutionState);
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
index c13cbf6..a023917 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
@@ -36,7 +36,6 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
-import org.apache.flink.runtime.jobmaster.RescalingBehaviour;
 import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -76,8 +75,6 @@ public class TestingJobMasterGatewayBuilder {
        private String address = 
"akka.tcp://flink@localhost:6130/user/jobmanager";
        private String hostname = "localhost";
        private Supplier<CompletableFuture<Acknowledge>> cancelFunction = () -> 
CompletableFuture.completedFuture(Acknowledge.get());
-       private BiFunction<Integer, RescalingBehaviour, 
CompletableFuture<Acknowledge>> rescalingJobFunction = (ignoredA, ignoredB) -> 
CompletableFuture.completedFuture(Acknowledge.get());
-       private TriFunction<Collection<JobVertexID>, Integer, 
RescalingBehaviour, CompletableFuture<Acknowledge>> rescalingOperatorsFunction 
= (ignoredA, ignoredB, ignoredC) -> 
CompletableFuture.completedFuture(Acknowledge.get());
        private Function<TaskExecutionState, CompletableFuture<Acknowledge>> 
updateTaskExecutionStateFunction = ignored -> 
CompletableFuture.completedFuture(Acknowledge.get());
        private BiFunction<JobVertexID, ExecutionAttemptID, 
CompletableFuture<SerializedInputSplit>> requestNextInputSplitFunction = 
(ignoredA, ignoredB) -> CompletableFuture.completedFuture(new 
SerializedInputSplit(null));
        private BiFunction<IntermediateDataSetID, ResultPartitionID, 
CompletableFuture<ExecutionState>> requestPartitionStateFunction = (ignoredA, 
ignoredB) -> CompletableFuture.completedFuture(ExecutionState.RUNNING);
@@ -119,16 +116,6 @@ public class TestingJobMasterGatewayBuilder {
                return this;
        }
 
-       public TestingJobMasterGatewayBuilder 
setRescalingJobFunction(BiFunction<Integer, RescalingBehaviour, 
CompletableFuture<Acknowledge>> rescalingJobFunction) {
-               this.rescalingJobFunction = rescalingJobFunction;
-               return this;
-       }
-
-       public TestingJobMasterGatewayBuilder 
setRescalingOperatorsFunction(TriFunction<Collection<JobVertexID>, Integer, 
RescalingBehaviour, CompletableFuture<Acknowledge>> rescalingOperatorsFunction) 
{
-               this.rescalingOperatorsFunction = rescalingOperatorsFunction;
-               return this;
-       }
-
        public TestingJobMasterGatewayBuilder 
setUpdateTaskExecutionStateFunction(Function<TaskExecutionState, 
CompletableFuture<Acknowledge>> updateTaskExecutionStateFunction) {
                this.updateTaskExecutionStateFunction = 
updateTaskExecutionStateFunction;
                return this;
@@ -255,6 +242,6 @@ public class TestingJobMasterGatewayBuilder {
        }
 
        public TestingJobMasterGateway build() {
-               return new TestingJobMasterGateway(address, hostname, 
cancelFunction, rescalingJobFunction, rescalingOperatorsFunction, 
updateTaskExecutionStateFunction, requestNextInputSplitFunction, 
requestPartitionStateFunction, scheduleOrUpdateConsumersFunction, 
disconnectTaskManagerFunction, disconnectResourceManagerConsumer, 
classloadingPropsSupplier, offerSlotsFunction, failSlotConsumer, 
registerTaskManagerFunction, taskManagerHeartbeatConsumer, 
resourceManagerHeartbeatConsumer, requestJobDetai [...]
+               return new TestingJobMasterGateway(address, hostname, 
cancelFunction, updateTaskExecutionStateFunction, 
requestNextInputSplitFunction, requestPartitionStateFunction, 
scheduleOrUpdateConsumersFunction, disconnectTaskManagerFunction, 
disconnectResourceManagerConsumer, classloadingPropsSupplier, 
offerSlotsFunction, failSlotConsumer, registerTaskManagerFunction, 
taskManagerHeartbeatConsumer, resourceManagerHeartbeatConsumer, 
requestJobDetailsSupplier, requestJobSupplier, triggerSavepointFu [...]
        }
 }

Reply via email to