[ 
https://issues.apache.org/jira/browse/FLINK-35639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-35639:
----------------------------------
    Priority: Major  (was: Blocker)

> upgrade to 1.19 with job in HA state with restart strategy crashes job manager
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-35639
>                 URL: https://issues.apache.org/jira/browse/FLINK-35639
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>    Affects Versions: 1.20.0, 1.19.1
>         Environment: Download 1.18 and 1.19 binary releases. Add the 
> following to flink-1.19.0/conf/config.yaml and 
> flink-1.18.1/conf/flink-conf.yaml ```yaml high-availability: zookeeper 
> high-availability.zookeeper.quorum: localhost high-availability.storageDir: 
> file:///tmp/flink/recovery ``` Launch zookeeper: docker run --network host 
> zookeeper:latest launch 1.18 task manager: ./flink-1.18.1/bin/taskmanager.sh 
> start-foreground launch 1.18 job manager: ./flink-1.18.1/bin/jobmanager.sh 
> start-foreground launch the following job: ```java import 
> org.apache.flink.api.java.ExecutionEnvironment; import 
> org.apache.flink.api.java.tuple.Tuple2; import 
> org.apache.flink.api.common.functions.FlatMapFunction; import 
> org.apache.flink.util.Collector; import 
> org.apache.flink.api.common.restartstrategy.RestartStrategies; import 
> org.apache.flink.api.common.time.Time; import java.util.concurrent.TimeUnit; 
> public class FlinkJob \{ public static void main(String[] args) throws 
> Exception { final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy( 
> RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.of(20, 
> TimeUnit.SECONDS)) ); env.fromElements("Hello World", "Hello Flink") 
> .flatMap(new LineSplitter()) .groupBy(0) .sum(1) .print(); } public static 
> final class LineSplitter implements FlatMapFunction> \{ @Override public void 
> flatMap(String value, Collector> out) { for (String word : value.split(" ")) 
> { try { Thread.sleep(120000); } catch (InterruptedException e) \{ 
> e.printStackTrace(); } out.collect(new Tuple2<>(word, 1)); } } } } ``` ```xml 
> 4.0.0 org.apache.flink myflinkjob 1.0-SNAPSHOT 1.18.1 1.8 org.apache.flink 
> flink-java ${flink.version} org.apache.flink flink-streaming-java 
> ${flink.version} org.apache.maven.plugins maven-compiler-plugin 3.8.1 
> ${java.version} ${java.version} org.apache.maven.plugins maven-jar-plugin 
> 3.1.0 true lib/ FlinkJob ``` Launch job: ./flink-1.18.1/bin/flink run 
> ../flink-job/target/myflinkjob-1.0-SNAPSHOT.jar Job has been submitted with 
> JobID 5f0898c964a93a47aa480427f3e2c6c0 Kill job manager and task manager. 
> Then launch job manager 1.19.0 ./flink-1.19.0/bin/jobmanager.sh 
> start-foreground Root cause ========== It looks like the type of 
> delayBetweenAttemptsInterval was changed in 1.19 
> https://github.com/apache/flink/pull/22984/files#diff-d174f32ffdea69de610c4f37c545bd22a253b9846434f83397f1bbc2aaa399faR239
>  , introducing an incompatibility which is not handled by flink 1.19. In my 
> opinion, job-maanger should not crash when starting in that case. 
>            Reporter: yazgoo
>            Assignee: Matthias Pohl
>            Priority: Major
>              Labels: pull-request-available
>
> When trying to upgrade a flink cluster from 1.18 to 1.19, with a 1.18 job in 
> zookeeper HA state, I have a jobmanager crash with a ClassCastException, see 
> log below  
>  
> {code:java}
> 2024-06-18 16:58:14,401 ERROR 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error 
> occurred in the cluster entrypoint. org.apache.flink.util.FlinkException: 
> JobMaster for job 5f0898c964a93a47aa480427f3e2c6c0 failed.     at 
> org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:1484)
>  ~[flink-dist-1.19.0.jar:1.19.0]     at 
> org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:775)
>  ~[flink-dist-1.19.0.jar:1.19.0]     at 
> org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:738)
>  ~[flink-dist-1.19.0.jar:1.19.0]     at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$7(Dispatcher.java:693)
>  ~[flink-dist-1.19.0.jar:1.19.0]     at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934) 
> ~[?:?]     at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
>  ~[?:?]     at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
>  ~[?:?]     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
>  ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>  ~[flink-dist-1.19.0.jar:1.19.0]     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
>  ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
>  ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at 
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
>  ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
>  ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at 
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) 
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at 
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) 
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at 
> scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at 
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) 
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at 
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
>  [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) 
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at 
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at 
> org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) 
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at 
> org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) 
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at 
> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) 
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at 
> org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) 
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at 
> org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) 
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at 
> org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) 
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at 
> org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) 
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at 
> org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) 
> [flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]     at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) [?:?]     at 
> java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
>  [?:?]     at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) 
> [?:?]     at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) [?:?]     
> at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) 
> [?:?] Caused by: org.apache.flink.runtime.client.JobInitializationException: 
> Could not start the JobMaster.     at 
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
>  ~[flink-dist-1.19.0.jar:1.19.0]     at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
>  ~[?:?]     at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
>  ~[?:?]     at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
>  ~[?:?]     at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
>  ~[?:?]     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>  ~[?:?]     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>  ~[?:?]     at java.lang.Thread.run(Thread.java:840) ~[?:?] Caused by: 
> java.util.concurrent.CompletionException: java.lang.ClassCastException: 
> cannot assign instance of org.apache.flink.api.common.time.Time to field 
> org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration.delayBetweenAttemptsInterval
>  of type java.time.Duration in instance of 
> org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration
>      at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
>  ~[?:?]     at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
>  ~[?:?]     at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770)
>  ~[?:?]     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>  ~[?:?]     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>  ~[?:?]     at java.lang.Thread.run(Thread.java:840) ~[?:?] Caused by: 
> java.lang.ClassCastException: cannot assign instance of 
> org.apache.flink.api.common.time.Time to field 
> org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration.delayBetweenAttemptsInterval
>  of type java.time.Duration in instance of 
> org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration
>      at 
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2096)
>  ~[?:?]     at 
> java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2060)
>  ~[?:?]     at 
> java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1347)
>  ~[?:?]     at 
> java.io.ObjectInputStream$FieldValues.defaultCheckFieldValues(ObjectInputStream.java:2679)
>  ~[?:?]     at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2486) ~[?:?]  
>    at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257) 
> ~[?:?]     at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733) ~[?:?]     
> at java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606) 
> ~[?:?]     at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457) ~[?:?]  
>    at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257) 
> ~[?:?]     at 
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733) ~[?:?]     
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:509) ~[?:?]    
>  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:467) ~[?:?]   
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539)
>  ~[flink-dist-1.19.0.jar:1.19.0]     at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527)
>  ~[flink-dist-1.19.0.jar:1.19.0]     at 
> org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)
>  ~[flink-dist-1.19.0.jar:1.19.0]     at 
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:102)
>  ~[flink-dist-1.19.0.jar:1.19.0]     at 
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:121)
>  ~[flink-dist-1.19.0.jar:1.19.0]     at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379)
>  ~[flink-dist-1.19.0.jar:1.19.0]     at 
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:356) 
> ~[flink-dist-1.19.0.jar:1.19.0]     at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128)
>  ~[flink-dist-1.19.0.jar:1.19.0]     at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100)
>  ~[flink-dist-1.19.0.jar:1.19.0]     at 
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
>  ~[flink-dist-1.19.0.jar:1.19.0]     at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
>  ~[?:?]     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>  ~[?:?]     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>  ~[?:?]     at java.lang.Thread.run(Thread.java:840) ~[?:?] 2024-06-18 
> 16:58:14,403 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint      
>   [] - Shutting StandaloneSessionClusterEntrypoint down with application 
> status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally.. 
> 2024-06-18 16:58:14,404 INFO  org.apache.flink.runtime.blob.BlobServer        
>              [] - Stopped BLOB server at 127.0.0.1:40067 2024-06-18 
> 16:58:14,431 INFO  
> org.apache.flink.shaded.curator5.org.apache.curator.utils.Compatibility [] - 
> Using org.apache.zookeeper.server.quorum.MultipleAddresses
> {code}
> *Reproducing*
> Download 1.18 and 1.19 binary releases.
> Add the following to flink-1.19.0/conf/config.yaml and 
> flink-1.18.1/conf/flink-conf.yaml  
>  
> {code:java}
> high-availability: zookeeper
> high-availability.zookeeper.quorum: localhost
> high-availability.storageDir: file:///tmp/flink/recovery {code}
> Launch zookeeper:
> {code:java}
> docker run --network host zookeeper:latest{code}
> launch 1.18 job manager:
> {code:java}
> ./flink-1.18.1/bin/jobmanager.sh start-foreground{code}
> launch 1.18 task manager:
> {code:java}
> ./flink-1.18.1/bin/taskmanager.sh start-foreground{code}
> create the following job:
> {code:java}
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.util.Collector;
> import org.apache.flink.api.common.restartstrategy.RestartStrategies;
> import org.apache.flink.api.common.time.Time;
> import java.util.concurrent.TimeUnit;public class FlinkJob {
>     public static void main(String[] args) throws Exception {
>         final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();        env.setRestartStrategy(
>                 RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
> Time.of(20, TimeUnit.SECONDS))
>                 );        env.fromElements("Hello World", "Hello Flink")
>             .flatMap(new LineSplitter())
>             .groupBy(0)
>             .sum(1)
>             .print();
>     }    public static final class LineSplitter implements 
> FlatMapFunction<String, Tuple2<String, Integer>> {
>         @Override
>         public void flatMap(String value, Collector<Tuple2<String, Integer>> 
> out) {
>             for (String word : value.split(" ")) {
>                 try {
>                     Thread.sleep(120000);
>                 } catch (InterruptedException e) {
>                     e.printStackTrace();
>                 }
>                 out.collect(new Tuple2<>(word, 1));
>             }
>         }
>     }}
>  {code}
> pom.xml
> {code:xml}
> <project xmlns="http://maven.apache.org/POM/4.0.0";
>          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>     <modelVersion>4.0.0</modelVersion>
>     <groupId>org.apache.flink</groupId>
>     <artifactId>myflinkjob</artifactId>
>     <version>1.0-SNAPSHOT</version>
>     <properties>
>         <flink.version>1.18.1</flink.version>
>         <java.version>1.8</java.version>
>     </properties>
>     <dependencies>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-java</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-streaming-java</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>     </dependencies>
>     <build>
>         <plugins>
>             <plugin>
>                 <groupId>org.apache.maven.plugins</groupId>
>                 <artifactId>maven-compiler-plugin</artifactId>
>                 <version>3.8.1</version>
>                 <configuration>
>                     <source>${java.version}</source>
>                     <target>${java.version}</target>
>                 </configuration>
>             </plugin>
>             <plugin>
>                 <groupId>org.apache.maven.plugins</groupId>
>                 <artifactId>maven-jar-plugin</artifactId>
>                 <version>3.1.0</version>
>                 <configuration>
>                     <archive>
>                         <manifest>
>                             <addClasspath>true</addClasspath>
>                             <classpathPrefix>lib/</classpathPrefix>
>                             <mainClass>FlinkJob</mainClass>
>                         </manifest>
>                     </archive>
>                 </configuration>
>             </plugin>
>         </plugins>
>     </build>
> </project>
> {code}
> Launch job:
> {code:java}
> ./flink-1.18.1/bin/flink run ../flink-job/target/myflinkjob-1.0-SNAPSHOT.jar
> Job has been submitted with JobID 5f0898c964a93a47aa480427f3e2c6c0{code}
> Kill job manager and task manager.
> Then launch job manager 1.19.0
> {code:java}
> ./flink-1.19.0/bin/jobmanager.sh start-foreground{code}
> job manager will crash with stack trace above.
> *Root cause*
> It looks like the type of delayBetweenAttemptsInterval was changed in 1.19 
> [https://github.com/apache/flink/pull/22984/files#diff-d174f32ffdea69de610c4f37c545bd22a253b9846434f83397f1bbc2aaa399faR239|http://example.com/]
>  , introducing an incompatibility which is not handled by flink 1.19.
> In my opinion, job-maanger should not crash when starting in that case.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to