yazgoo created FLINK-35639: ------------------------------ Summary: upgrading to 1.19 with job in HA state with restart strategy crashes job manager Key: FLINK-35639 URL: https://issues.apache.org/jira/browse/FLINK-35639 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.19.1 Environment: Download 1.18 and 1.19 binary releases. Add the following to flink-1.19.0/conf/config.yaml and flink-1.18.1/conf/flink-conf.yaml ```yaml high-availability: zookeeper high-availability.zookeeper.quorum: localhost high-availability.storageDir: file:///tmp/flink/recovery ``` Launch zookeeper: docker run --network host zookeeper:latest launch 1.18 task manager: ./flink-1.18.1/bin/taskmanager.sh start-foreground launch 1.18 job manager: ./flink-1.18.1/bin/jobmanager.sh start-foreground launch the following job: ```java import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import java.util.concurrent.TimeUnit; public class FlinkJob \{ public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy( RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.of(20, TimeUnit.SECONDS)) ); env.fromElements("Hello World", "Hello Flink") .flatMap(new LineSplitter()) .groupBy(0) .sum(1) .print(); } public static final class LineSplitter implements FlatMapFunction> \{ @Override public void flatMap(String value, Collector> out) { for (String word : value.split(" ")) { try { Thread.sleep(120000); } catch (InterruptedException e) \{ e.printStackTrace(); } out.collect(new Tuple2<>(word, 1)); } } } } ``` ```xml 4.0.0 org.apache.flink myflinkjob 1.0-SNAPSHOT 1.18.1 1.8 org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java ${flink.version} org.apache.maven.plugins maven-compiler-plugin 3.8.1 ${java.version} ${java.version} org.apache.maven.plugins maven-jar-plugin 3.1.0 true lib/ FlinkJob ``` Launch job: ./flink-1.18.1/bin/flink run ../flink-job/target/myflinkjob-1.0-SNAPSHOT.jar Job has been submitted with JobID 5f0898c964a93a47aa480427f3e2c6c0 Kill job manager and task manager. Then launch job manager 1.19.0 ./flink-1.19.0/bin/jobmanager.sh start-foreground Root cause ========== It looks like the type of delayBetweenAttemptsInterval was changed in 1.19 https://github.com/apache/flink/pull/22984/files#diff-d174f32ffdea69de610c4f37c545bd22a253b9846434f83397f1bbc2aaa399faR239 , introducing an incompatibility which is not handled by flink 1.19. In my opinion, job-maanger should not crash when starting in that case. Reporter: yazgoo
When trying to upgrade a flink cluster from 1.18 to 1.19, with a 1.18 job in zookeeper HA state, I have a ClassCastException, see log below ```log 2024-06-18 16:58:14,401 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint. org.apache.flink.util.FlinkException: JobMaster for job 5f0898c964a93a47aa480427f3e2c6c0 failed. at org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:1484) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:775) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:738) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$7(Dispatcher.java:693) ~[flink-dist-1.19.0.jar:1.19.0] at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934) ~[?:?] at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911) ~[?:?] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[?:?] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451) ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451) ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218) ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) [?:?] at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) [?:?] at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) [?:?] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) [?:?] at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) [?:?] Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) ~[flink-dist-1.19.0.jar:1.19.0] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?] at java.lang.Thread.run(Thread.java:840) ~[?:?] Caused by: java.util.concurrent.CompletionException: java.lang.ClassCastException: cannot assign instance of org.apache.flink.api.common.time.Time to field org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration.delayBetweenAttemptsInterval of type java.time.Duration in instance of org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) ~[?:?] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320) ~[?:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?] at java.lang.Thread.run(Thread.java:840) ~[?:?] Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.flink.api.common.time.Time to field org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration.delayBetweenAttemptsInterval of type java.time.Duration in instance of org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2096) ~[?:?] at java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2060) ~[?:?] at java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1347) ~[?:?] at java.io.ObjectInputStream$FieldValues.defaultCheckFieldValues(ObjectInputStream.java:2679) ~[?:?] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2486) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257) ~[?:?] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733) ~[?:?] at java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606) ~[?:?] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257) ~[?:?] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733) ~[?:?] at java.io.ObjectInputStream.readObject(ObjectInputStream.java:509) ~[?:?] at java.io.ObjectInputStream.readObject(ObjectInputStream.java:467) ~[?:?] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:102) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:121) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:356) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist-1.19.0.jar:1.19.0] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?] at java.lang.Thread.run(Thread.java:840) ~[?:?] 2024-06-18 16:58:14,403 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting StandaloneSessionClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally.. 2024-06-18 16:58:14,404 INFO org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 127.0.0.1:40067 2024-06-18 16:58:14,431 INFO org.apache.flink.shaded.curator5.org.apache.curator.utils.Compatibility [] - Using org.apache.zookeeper.server.quorum.MultipleAddresses ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)