Hi Lei,

setting explicit operator ID should solve this issue.
As far as I know, the auto-generated operator id also depended on the
operator parallelism in previous versions of Flink (not sure until which
point).

Which version are you running?

Best, Fabian


2017-10-17 3:15 GMT+02:00 Lei Chen <leyn...@gmail.com>:

> Hi,
>
> We're trying to implement some module to help autoscale our pipeline which
> is built  with Flink on YARN. According to the document, the suggested
> procedure seems to be:
>
> 1. cancel job with savepoint
> 2. start new job with increased YARN TM number and parallelism.
>
> However, step 2 always gave error
>
> Caused by: java.lang.IllegalStateException: Failed to rollback to
> savepoint hdfs://10.106.238.14:/tmp/savepoint-767421-20907d234655. Cannot
> map savepoint state for operator 37dfe905df17858e07858039ce3d8ae1 to the
> new program, because the operator is not available in the new program. If
> you want to allow to skip this, you can set the --allowNonRestoredState
> option on the CLI.
> at org.apache.flink.runtime.checkpoint.savepoint.SavepointLoade
> r.loadAndValidateSavepoint(SavepointLoader.java:130)
> at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.re
> storeSavepoint(CheckpointCoordinator.java:1140)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$
> apache$flink$runtime$jobmanager$JobManager$$submitJob$1.
> apply$mcV$sp(JobManager.scala:1386)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$
> apache$flink$runtime$jobmanager$JobManager$$submitJob$1.
> apply(JobManager.scala:1372)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$
> apache$flink$runtime$jobmanager$JobManager$$submitJob$1.
> apply(JobManager.scala:1372)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.lifte
> dTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(F
> uture.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.
> exec(AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(For
> kJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
> l.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
> orkerThread.java:107)
>
> The procedure worked fine if parallelism was not changed.
>
> Also want to mention that I didn't manually specify OperatorID in my job. The
> document does mentioned manually OperatorID assignment is suggested, just
> curious is that mandatory in my case to fix the problem I'm seeing, given
> that my program doesn't change at all so the autogenerated operatorID
> should be unchanged after parallelism increase?
>
> thanks,
> Lei
>

Reply via email to