[ 
https://issues.apache.org/jira/browse/FLINK-20376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17283507#comment-17283507
 ] 

Partha Pradeep Mishra edited comment on FLINK-20376 at 2/12/21, 5:25 AM:
-------------------------------------------------------------------------

[~pnowojski] I used ur above method to calculate the hash of all the operators 
for which i have specified uid manually. I have also attached the DataStream 
APIs (our code snippet to make you understand the different operator uid 
specified manually) 

{color:#ffab00}Operator UID : a Hashed :897859f6655555855a890e51483ab5e6{color}
 {color:#ffab00} Operator UID : b Hashed 
:eed1d3b157a9987ae9944e541e132efa{color}
 Operator UID : c Hashed :d7741f4a6cdf388e747557749a0f0d21
 Operator UID : d Hashed :76f74784cdf272cbdd1c37d471a532a0
 Operator UID : e Hashed :94e9d5a34992b6c51461d69a7ca2eb56
 Operator UID : f Hashed :afa3664e2d13439221e8d041382a4dc1
 Operator UID : g Hashed :da9aa6f89ab75dbc6233a02db1b171fd
 Operator UID : h Hashed :2345cb61bbb2fcd603d786389726830c
 {color:#ffab00}Operator UID : z Hashed :936222da3bb558848f700bb01edb34c0{color}
 Operator UID : i Hashed :bdf3ca0e5e6bde27f22d80457a5a19a9

 

But the metafile contains the below hashed value.
 
{d7741f4a6cdf388e747557749a0f0d21=org.apache.flink.state.api.runtime.metadata.OperatorStateSpec@5af97850,
 
94e9d5a34992b6c51461d69a7ca2eb56=org.apache.flink.state.api.runtime.metadata.OperatorStateSpec@5ef60048,
 
{color:#00875a}647a0a5ff84846c52775ce89f51a5edc=org.apache.flink.state.api.runtime.metadata.OperatorStateSpec@1d548a08,{color}
 
2345cb61bbb2fcd603d786389726830c=org.apache.flink.state.api.runtime.metadata.OperatorStateSpec@16aa0a0a,
 
76f74784cdf272cbdd1c37d471a532a0=org.apache.flink.state.api.runtime.metadata.OperatorStateSpec@780cb77,
 
bdf3ca0e5e6bde27f22d80457a5a19a9=org.apache.flink.state.api.runtime.metadata.OperatorStateSpec@691a7f8f,
 
da9aa6f89ab75dbc6233a02db1b171fd=org.apache.flink.state.api.runtime.metadata.OperatorStateSpec@50a7bc6e,
 
{color:#00875a}43b792ffaf5a610180059cb432d4a71d=org.apache.flink.state.api.runtime.metadata.OperatorStateSpec@161b062a,{color}
 
afa3664e2d13439221e8d041382a4dc1=org.apache.flink.state.api.runtime.metadata.OperatorStateSpec@17c1bced}
  

The green highlighted ones are not from our any operator as you can see.

The yellow highlighted one are the operator id which is not present in the 
metafile.

Also out of the 10 I have generated only 7 are present in the metafile, what 
happened to the remaining 3. i.e. `897859f6655555855a890e51483ab5e6`, 
`eed1d3b157a9987ae9944e541e132efa`,  '936222da3bb558848f700bb01edb34c0'?


was (Author: partha mishra):
[~pnowojski] I used ur above method to calculate the hash of all the operators 
for which i have specified uid manually. I have also attached the DataStream 
APIs (our code snippet to make you understand the different operator uid 
specified manually) 

{color:#ffab00}Operator UID : a Hashed :897859f6655555855a890e51483ab5e6{color}
 {color:#ffab00} Operator UID : b Hashed 
:eed1d3b157a9987ae9944e541e132efa{color}
 Operator UID : c Hashed :d7741f4a6cdf388e747557749a0f0d21
 Operator UID : d Hashed :76f74784cdf272cbdd1c37d471a532a0
 Operator UID : e Hashed :94e9d5a34992b6c51461d69a7ca2eb56
 Operator UID : f Hashed :afa3664e2d13439221e8d041382a4dc1
 Operator UID : g Hashed :da9aa6f89ab75dbc6233a02db1b171fd
 Operator UID : h Hashed :2345cb61bbb2fcd603d786389726830c
 {color:#ffab00}Operator UID : z Hashed :936222da3bb558848f700bb01edb34c0{color}
 Operator UID : i Hashed :bdf3ca0e5e6bde27f22d80457a5a19a9

 

But the metafile contains the below hashed value.
 
{d7741f4a6cdf388e747557749a0f0d21=org.apache.flink.state.api.runtime.metadata.OperatorStateSpec@5af97850,
 
94e9d5a34992b6c51461d69a7ca2eb56=org.apache.flink.state.api.runtime.metadata.OperatorStateSpec@5ef60048,
 
{color:#00875a}647a0a5ff84846c52775ce89f51a5edc=org.apache.flink.state.api.runtime.metadata.OperatorStateSpec@1d548a08,{color}
 
2345cb61bbb2fcd603d786389726830c=org.apache.flink.state.api.runtime.metadata.OperatorStateSpec@16aa0a0a,
 
76f74784cdf272cbdd1c37d471a532a0=org.apache.flink.state.api.runtime.metadata.OperatorStateSpec@780cb77,
 
bdf3ca0e5e6bde27f22d80457a5a19a9=org.apache.flink.state.api.runtime.metadata.OperatorStateSpec@691a7f8f,
 
da9aa6f89ab75dbc6233a02db1b171fd=org.apache.flink.state.api.runtime.metadata.OperatorStateSpec@50a7bc6e,
 
{color:#00875a}43b792ffaf5a610180059cb432d4a71d=org.apache.flink.state.api.runtime.metadata.OperatorStateSpec@161b062a,{color}
 
afa3664e2d13439221e8d041382a4dc1=org.apache.flink.state.api.runtime.metadata.OperatorStateSpec@17c1bced}
  

The green highlighted ones are not from our any operator as you can see.

The yellow highlighted one are the operator id which is not present in the 
metafile.

Also out of the 10 I have generated only 7 are present in the metafile, what 
happened to the remaining 3. i.e. `897859f6655555855a890e51483ab5e6`, 
`eed1d3b157a9987ae9944e541e132efa`,  '936222da3bb558848f700bb01edb34c0'.

> Error in restoring checkpoint/savepoint when Flink is upgraded from 1.9 to 
> 1.11.2
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-20376
>                 URL: https://issues.apache.org/jira/browse/FLINK-20376
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Runtime / State Backends
>            Reporter: Partha Pradeep Mishra
>            Priority: Major
>         Attachments: MetaData.zip, image-2020-12-10-15-04-39-624.png, 
> image-2020-12-10-15-06-48-013.png, image-2020-12-10-15-09-13-527.png, 
> image-2021-01-18-14-42-49-814.png, image-2021-02-11-14-37-31-793.png, 
> image-2021-02-12-10-50-26-411.png
>
>
> We tried to save checkpoints for one of the flink job (1.9 version) and then 
> import/restore the checkpoints in the newer flink version (1.11.2). The 
> import/resume operation failed with the below error. Please note that both 
> the jobs(i.e. one running in 1.9 and other in 1.11.2) are same binary with no 
> code difference or introduction of new operators. Still we got the below 
> issue.
> _Cannot map checkpoint/savepoint state for operator 
> fbb4ef531e002f8fb3a2052db255adf5 to the new program, because the operator is 
> not available in the new program._
> *Complete Stack Trace :*
> {"errors":["org.apache.flink.runtime.rest.handler.RestHandlerException: Could 
> not execute application.\n\tat 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:103)\n\tat
>  
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
>  
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat
>  
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat
>  
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)\n\tat
>  
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n\tat
>  
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n\tat
>  
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
>  
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
>  java.lang.Thread.run(Thread.java:748)\nCaused by: 
> java.util.concurrent.CompletionException: 
> org.apache.flink.util.FlinkRuntimeException: Could not execute 
> application.\n\tat 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)\n\tat
>  
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)\n\tat
>  
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)\n\t...
>  7 more\nCaused by: org.apache.flink.util.FlinkRuntimeException: Could not 
> execute application.\n\tat 
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:81)\n\tat
>  
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)\n\tat
>  
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)\n\tat
>  
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\t...
>  7 more\nCaused by: 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Failed to execute job 
> 'ST1_100Services-preprod-Tumbling-ProcessedBased'.\n\tat 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)\n\tat
>  
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)\n\tat
>  
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)\n\tat
>  
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)\n\t...
>  10 more\nCaused by: org.apache.flink.util.FlinkException: Failed to execute 
> job 'ST1_100Services-preprod-Tumbling-ProcessedBased'.\n\tat 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1821)\n\tat
>  
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)\n\tat
>  
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)\n\tat
>  
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)\n\tat
>  
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:699)\n\tat
>  com.man.ceon.cep.jobs.AnalyticService$.main(AnalyticService.scala:108)\n\tat 
> com.man.ceon.cep.jobs.AnalyticService.main(AnalyticService.scala)\n\tat 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat
>  
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat
>  java.lang.reflect.Method.invoke(Method.java:498)\n\tat 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)\n\t...
>  13 more\nCaused by: org.apache.flink.runtime.client.JobSubmissionException: 
> Failed to submit job.\n\tat 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:344)\n\tat
>  
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
>  
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat
>  
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat
>  akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)\n\tat 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)\n\tat
>  akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat
>  akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused
>  by: org.apache.flink.runtime.client.JobExecutionException: Could not 
> instantiate JobManager.\n\tat 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)\n\tat
>  
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\t...
>  6 more\nCaused by: java.lang.IllegalStateException: Failed to rollback to 
> checkpoint/savepoint 
> s3://prodv2-flink-cluster/savepoints/savepoint-b76d18-d302cc7ca666. Cannot 
> map checkpoint/savepoint state for operator fbb4ef531e002f8fb3a2052db255adf5 
> to the new program, because the operator is not available in the new program. 
> If you want to allow to skip this, you can set the --allowNonRestoredState 
> option on the CLI.\n\tat 
> org.apache.flink.runtime.checkpoint.Checkpoints.throwNonRestoredStateException(Checkpoints.java:210)\n\tat
>  
> org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:180)\n\tat
>  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1397)\n\tat
>  
> org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:300)\n\tat
>  
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:253)\n\tat
>  
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229)\n\tat
>  
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119)\n\tat
>  
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)\n\tat
>  
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)\n\tat
>  
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272)\n\tat 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)\n\tat
>  
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)\n\tat
>  
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140)\n\tat
>  
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)\n\tat
>  
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)\n\t...
>  7 more\n"]}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to