[ https://issues.apache.org/jira/browse/FLINK-35639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17855979#comment-17855979 ]
Matthias Pohl commented on FLINK-35639: --------------------------------------- I guess, you're right. Looks like there was an error being made when deprecating the {{@PublicEvolving}} API of {{{}RestartStrategies#FixedDelayRestartStrategyConfiguration{}}}. I will raise the priority for this one to blocker because it also affects 1.20. > upgrade to 1.19 with job in HA state with restart strategy crashes job manager > ------------------------------------------------------------------------------ > > Key: FLINK-35639 > URL: https://issues.apache.org/jira/browse/FLINK-35639 > Project: Flink > Issue Type: Bug > Components: API / Core > Affects Versions: 1.19.1 > Environment: Download 1.18 and 1.19 binary releases. Add the > following to flink-1.19.0/conf/config.yaml and > flink-1.18.1/conf/flink-conf.yaml ```yaml high-availability: zookeeper > high-availability.zookeeper.quorum: localhost high-availability.storageDir: > file:///tmp/flink/recovery ``` Launch zookeeper: docker run --network host > zookeeper:latest launch 1.18 task manager: ./flink-1.18.1/bin/taskmanager.sh > start-foreground launch 1.18 job manager: ./flink-1.18.1/bin/jobmanager.sh > start-foreground launch the following job: ```java import > org.apache.flink.api.java.ExecutionEnvironment; import > org.apache.flink.api.java.tuple.Tuple2; import > org.apache.flink.api.common.functions.FlatMapFunction; import > org.apache.flink.util.Collector; import > org.apache.flink.api.common.restartstrategy.RestartStrategies; import > org.apache.flink.api.common.time.Time; import java.util.concurrent.TimeUnit; > public class FlinkJob \{ public static void main(String[] args) throws > Exception { final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy( > RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.of(20, > TimeUnit.SECONDS)) ); env.fromElements("Hello World", "Hello Flink") > .flatMap(new LineSplitter()) .groupBy(0) .sum(1) .print(); } public static > final class LineSplitter implements FlatMapFunction> \{ @Override public void > flatMap(String value, Collector> out) { for (String word : value.split(" ")) > { try { Thread.sleep(120000); } catch (InterruptedException e) \{ > e.printStackTrace(); } out.collect(new Tuple2<>(word, 1)); } } } } ``` ```xml > 4.0.0 org.apache.flink myflinkjob 1.0-SNAPSHOT 1.18.1 1.8 org.apache.flink > flink-java ${flink.version} org.apache.flink flink-streaming-java > ${flink.version} org.apache.maven.plugins maven-compiler-plugin 3.8.1 > ${java.version} ${java.version} org.apache.maven.plugins maven-jar-plugin > 3.1.0 true lib/ FlinkJob ``` Launch job: ./flink-1.18.1/bin/flink run > ../flink-job/target/myflinkjob-1.0-SNAPSHOT.jar Job has been submitted with > JobID 5f0898c964a93a47aa480427f3e2c6c0 Kill job manager and task manager. > Then launch job manager 1.19.0 ./flink-1.19.0/bin/jobmanager.sh > start-foreground Root cause ========== It looks like the type of > delayBetweenAttemptsInterval was changed in 1.19 > https://github.com/apache/flink/pull/22984/files#diff-d174f32ffdea69de610c4f37c545bd22a253b9846434f83397f1bbc2aaa399faR239 > , introducing an incompatibility which is not handled by flink 1.19. In my > opinion, job-maanger should not crash when starting in that case. > Reporter: yazgoo > Priority: Major > > When trying to upgrade a flink cluster from 1.18 to 1.19, with a 1.18 job in > zookeeper HA state, I have a jobmanager crash with a ClassCastException, see > log below > > {code:java} > 2024-06-18 16:58:14,401 ERROR > org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error > occurred in the cluster entrypoint. org.apache.flink.util.FlinkException: > JobMaster for job 5f0898c964a93a47aa480427f3e2c6c0 failed. at > org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:1484) > ~[flink-dist-1.19.0.jar:1.19.0] at > org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:775) > ~[flink-dist-1.19.0.jar:1.19.0] at > org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:738) > ~[flink-dist-1.19.0.jar:1.19.0] at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$7(Dispatcher.java:693) > ~[flink-dist-1.19.0.jar:1.19.0] at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934) > ~[?:?] at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911) > ~[?:?] at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) > ~[?:?] at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451) > ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at > org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > ~[flink-dist-1.19.0.jar:1.19.0] at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451) > ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218) > ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at > org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) > ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) > ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at > org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) > [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at > org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) > [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at > scala.PartialFunction.applyOrElse(PartialFunction.scala:127) > [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at > scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) > [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at > org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) > [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) > [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at > org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) > [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at > org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) > [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at > org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) > [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at > org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) > [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at > org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) > [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at > org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) > [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at > org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) > [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at > org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) > [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0] at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) [?:?] at > java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) > [?:?] at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) > [?:?] at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) [?:?] > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) > [?:?] Caused by: org.apache.flink.runtime.client.JobInitializationException: > Could not start the JobMaster. at > org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) > ~[flink-dist-1.19.0.jar:1.19.0] at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) > ~[?:?] at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) > ~[?:?] at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) > ~[?:?] at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773) > ~[?:?] at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) > ~[?:?] at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) > ~[?:?] at java.lang.Thread.run(Thread.java:840) ~[?:?] Caused by: > java.util.concurrent.CompletionException: java.lang.ClassCastException: > cannot assign instance of org.apache.flink.api.common.time.Time to field > org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration.delayBetweenAttemptsInterval > of type java.time.Duration in instance of > org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) > ~[?:?] at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320) > ~[?:?] at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770) > ~[?:?] at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) > ~[?:?] at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) > ~[?:?] at java.lang.Thread.run(Thread.java:840) ~[?:?] Caused by: > java.lang.ClassCastException: cannot assign instance of > org.apache.flink.api.common.time.Time to field > org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration.delayBetweenAttemptsInterval > of type java.time.Duration in instance of > org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration > at > java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2096) > ~[?:?] at > java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2060) > ~[?:?] at > java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1347) > ~[?:?] at > java.io.ObjectInputStream$FieldValues.defaultCheckFieldValues(ObjectInputStream.java:2679) > ~[?:?] at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2486) ~[?:?] > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257) > ~[?:?] at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733) ~[?:?] > at java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606) > ~[?:?] at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457) ~[?:?] > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257) > ~[?:?] at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733) ~[?:?] > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:509) ~[?:?] > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:467) ~[?:?] > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539) > ~[flink-dist-1.19.0.jar:1.19.0] at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527) > ~[flink-dist-1.19.0.jar:1.19.0] at > org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67) > ~[flink-dist-1.19.0.jar:1.19.0] at > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:102) > ~[flink-dist-1.19.0.jar:1.19.0] at > org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:121) > ~[flink-dist-1.19.0.jar:1.19.0] at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379) > ~[flink-dist-1.19.0.jar:1.19.0] at > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:356) > ~[flink-dist-1.19.0.jar:1.19.0] at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128) > ~[flink-dist-1.19.0.jar:1.19.0] at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100) > ~[flink-dist-1.19.0.jar:1.19.0] at > org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) > ~[flink-dist-1.19.0.jar:1.19.0] at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768) > ~[?:?] at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) > ~[?:?] at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) > ~[?:?] at java.lang.Thread.run(Thread.java:840) ~[?:?] 2024-06-18 > 16:58:14,403 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint > [] - Shutting StandaloneSessionClusterEntrypoint down with application > status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally.. > 2024-06-18 16:58:14,404 INFO org.apache.flink.runtime.blob.BlobServer > [] - Stopped BLOB server at 127.0.0.1:40067 2024-06-18 > 16:58:14,431 INFO > org.apache.flink.shaded.curator5.org.apache.curator.utils.Compatibility [] - > Using org.apache.zookeeper.server.quorum.MultipleAddresses > {code} > *Reproducing* > Download 1.18 and 1.19 binary releases. > Add the following to flink-1.19.0/conf/config.yaml and > flink-1.18.1/conf/flink-conf.yaml > > {code:java} > high-availability: zookeeper > high-availability.zookeeper.quorum: localhost > high-availability.storageDir: file:///tmp/flink/recovery {code} > Launch zookeeper: > {code:java} > docker run --network host zookeeper:latest{code} > launch 1.18 job manager: > {code:java} > ./flink-1.18.1/bin/jobmanager.sh start-foreground{code} > launch 1.18 task manager: > {code:java} > ./flink-1.18.1/bin/taskmanager.sh start-foreground{code} > create the following job: > {code:java} > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.api.common.functions.FlatMapFunction; > import org.apache.flink.util.Collector; > import org.apache.flink.api.common.restartstrategy.RestartStrategies; > import org.apache.flink.api.common.time.Time; > import java.util.concurrent.TimeUnit;public class FlinkJob { > public static void main(String[] args) throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy( > RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, > Time.of(20, TimeUnit.SECONDS)) > ); env.fromElements("Hello World", "Hello Flink") > .flatMap(new LineSplitter()) > .groupBy(0) > .sum(1) > .print(); > } public static final class LineSplitter implements > FlatMapFunction<String, Tuple2<String, Integer>> { > @Override > public void flatMap(String value, Collector<Tuple2<String, Integer>> > out) { > for (String word : value.split(" ")) { > try { > Thread.sleep(120000); > } catch (InterruptedException e) { > e.printStackTrace(); > } > out.collect(new Tuple2<>(word, 1)); > } > } > }} > {code} > pom.xml > {code:xml} > <project xmlns="http://maven.apache.org/POM/4.0.0" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 > http://maven.apache.org/xsd/maven-4.0.0.xsd"> > <modelVersion>4.0.0</modelVersion> > <groupId>org.apache.flink</groupId> > <artifactId>myflinkjob</artifactId> > <version>1.0-SNAPSHOT</version> > <properties> > <flink.version>1.18.1</flink.version> > <java.version>1.8</java.version> > </properties> > <dependencies> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-java</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-java</artifactId> > <version>${flink.version}</version> > </dependency> > </dependencies> > <build> > <plugins> > <plugin> > <groupId>org.apache.maven.plugins</groupId> > <artifactId>maven-compiler-plugin</artifactId> > <version>3.8.1</version> > <configuration> > <source>${java.version}</source> > <target>${java.version}</target> > </configuration> > </plugin> > <plugin> > <groupId>org.apache.maven.plugins</groupId> > <artifactId>maven-jar-plugin</artifactId> > <version>3.1.0</version> > <configuration> > <archive> > <manifest> > <addClasspath>true</addClasspath> > <classpathPrefix>lib/</classpathPrefix> > <mainClass>FlinkJob</mainClass> > </manifest> > </archive> > </configuration> > </plugin> > </plugins> > </build> > </project> > {code} > Launch job: > {code:java} > ./flink-1.18.1/bin/flink run ../flink-job/target/myflinkjob-1.0-SNAPSHOT.jar > Job has been submitted with JobID 5f0898c964a93a47aa480427f3e2c6c0{code} > Kill job manager and task manager. > Then launch job manager 1.19.0 > {code:java} > ./flink-1.19.0/bin/jobmanager.sh start-foreground{code} > job manager will crash with stack trace above. > *Root cause* > It looks like the type of delayBetweenAttemptsInterval was changed in 1.19 > [https://github.com/apache/flink/pull/22984/files#diff-d174f32ffdea69de610c4f37c545bd22a253b9846434f83397f1bbc2aaa399faR239|http://example.com/] > , introducing an incompatibility which is not handled by flink 1.19. > In my opinion, job-maanger should not crash when starting in that case. > -- This message was sent by Atlassian Jira (v8.20.10#820010)