Hello,

I am trying to modify the parallelism of a streaming Flink job (wiki-edits
example) multiple times on a standalone cluster (one local machine) having
two TaskManagers with 3 slots each (i.e. 6 slots total). However, the
"modify" command is only working once (e.g. when I change the parallelism
from 2 to 4). The second time (e.g. change parallelism to 6 or even back to
2), it is giving an error.

I am using Flink 1.8.1 (since I found that the modify parallelism command
has been removed from v1.9 documentation) and have configured savepoints to
be written to file:///home/pankaj/flink-checkpoints. The output of the
first "modify <jobid> -p 4" command and second "modify <jobid> -p 6"
command is copied below.

Please tell me how to modify parallelism multiple times at runtime.

Thanks,

Pankaj


$ ./bin/flink modify 94831ca34951975dbee3335a384ee935 -p 4
Modify job 94831ca34951975dbee3335a384ee935.
Rescaled job 94831ca34951975dbee3335a384ee935. Its new parallelism is 4.

$ ./bin/flink modify 94831ca34951975dbee3335a384ee935 -p 6
Modify job 94831ca34951975dbee3335a384ee935.

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not rescale job
94831ca34951975dbee3335a384ee935.
at
org.apache.flink.client.cli.CliFrontend.lambda$modify$10(CliFrontend.java:799)
at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:790)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1068)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.util.concurrent.CompletionException:
java.lang.IllegalStateException: Suspend needs to happen atomically
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:961)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor.aroundReceive(Actor.scala:502)
at akka.actor.Actor.aroundReceive$(Actor.scala:500)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.IllegalStateException: Suspend needs to happen
atomically
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.suspend(ExecutionGraph.java:1172)
at
org.apache.flink.runtime.jobmaster.JobMaster.suspendExecutionGraph(JobMaster.java:1221)
at
org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
... 20 more

Reply via email to