yazgoo created FLINK-35639:
------------------------------

             Summary: upgrading to 1.19 with job in HA state with restart 
strategy crashes job manager
                 Key: FLINK-35639
                 URL: https://issues.apache.org/jira/browse/FLINK-35639
             Project: Flink
          Issue Type: Bug
          Components: API / Core
    Affects Versions: 1.19.1
         Environment: Download 1.18 and 1.19 binary releases. Add the following 
to flink-1.19.0/conf/config.yaml and flink-1.18.1/conf/flink-conf.yaml ```yaml 
high-availability: zookeeper high-availability.zookeeper.quorum: localhost 
high-availability.storageDir: file:///tmp/flink/recovery ``` Launch zookeeper: 
docker run --network host zookeeper:latest launch 1.18 task manager: 
./flink-1.18.1/bin/taskmanager.sh start-foreground launch 1.18 job manager: 
./flink-1.18.1/bin/jobmanager.sh start-foreground launch the following job: 
```java import org.apache.flink.api.java.ExecutionEnvironment; import 
org.apache.flink.api.java.tuple.Tuple2; import 
org.apache.flink.api.common.functions.FlatMapFunction; import 
org.apache.flink.util.Collector; import 
org.apache.flink.api.common.restartstrategy.RestartStrategies; import 
org.apache.flink.api.common.time.Time; import java.util.concurrent.TimeUnit; 
public class FlinkJob \{ public static void main(String[] args) throws 
Exception { final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy( 
RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.of(20, 
TimeUnit.SECONDS)) ); env.fromElements("Hello World", "Hello Flink") 
.flatMap(new LineSplitter()) .groupBy(0) .sum(1) .print(); } public static 
final class LineSplitter implements FlatMapFunction> \{ @Override public void 
flatMap(String value, Collector> out) { for (String word : value.split(" ")) { 
try { Thread.sleep(120000); } catch (InterruptedException e) \{ 
e.printStackTrace(); } out.collect(new Tuple2<>(word, 1)); } } } } ``` ```xml 
4.0.0 org.apache.flink myflinkjob 1.0-SNAPSHOT 1.18.1 1.8 org.apache.flink 
flink-java ${flink.version} org.apache.flink flink-streaming-java 
${flink.version} org.apache.maven.plugins maven-compiler-plugin 3.8.1 
${java.version} ${java.version} org.apache.maven.plugins maven-jar-plugin 3.1.0 
true lib/ FlinkJob ``` Launch job: ./flink-1.18.1/bin/flink run 
../flink-job/target/myflinkjob-1.0-SNAPSHOT.jar Job has been submitted with 
JobID 5f0898c964a93a47aa480427f3e2c6c0 Kill job manager and task manager. Then 
launch job manager 1.19.0 ./flink-1.19.0/bin/jobmanager.sh start-foreground 
Root cause ========== It looks like the type of delayBetweenAttemptsInterval 
was changed in 1.19 
https://github.com/apache/flink/pull/22984/files#diff-d174f32ffdea69de610c4f37c545bd22a253b9846434f83397f1bbc2aaa399faR239
 , introducing an incompatibility which is not handled by flink 1.19. In my 
opinion, job-maanger should not crash when starting in that case. 
            Reporter: yazgoo


When trying to upgrade a flink cluster from 1.18 to 1.19, with a 1.18 job in 
zookeeper HA state, I have a ClassCastException, see log below  

```log
2024-06-18 16:58:14,401 ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error 
occurred in the cluster entrypoint.
org.apache.flink.util.FlinkException: JobMaster for job 
5f0898c964a93a47aa480427f3e2c6c0 failed.
    at 
org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:1484)
 ~[flink-dist-1.19.0.jar:1.19.0]
    at 
org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:775)
 ~[flink-dist-1.19.0.jar:1.19.0]
    at 
org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:738)
 ~[flink-dist-1.19.0.jar:1.19.0]
    at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$7(Dispatcher.java:693)
 ~[flink-dist-1.19.0.jar:1.19.0]
    at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934) 
~[?:?]
    at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
 ~[?:?]
    at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
 ~[?:?]
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
 ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
    at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
 ~[flink-dist-1.19.0.jar:1.19.0]
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
 ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
 ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
    at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
 ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
 ~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
    at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) 
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
    at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) 
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) 
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
    at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) 
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) 
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
    at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) 
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
    at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) 
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
    at 
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) 
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
    at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) 
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
    at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) 
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
    at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) 
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
    at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) 
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
    at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) 
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) [?:?]
    at 
java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
 [?:?]
    at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) [?:?]
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) [?:?]
    at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) 
[?:?]
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
not start the JobMaster.
    at 
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
 ~[flink-dist-1.19.0.jar:1.19.0]
    at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
 ~[?:?]
    at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
 ~[?:?]
    at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) 
~[?:?]
    at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
 ~[?:?]
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) 
~[?:?]
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 
~[?:?]
    at java.lang.Thread.run(Thread.java:840) ~[?:?]
Caused by: java.util.concurrent.CompletionException: 
java.lang.ClassCastException: cannot assign instance of 
org.apache.flink.api.common.time.Time to field 
org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration.delayBetweenAttemptsInterval
 of type java.time.Duration in instance of 
org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration
    at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
 ~[?:?]
    at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
 ~[?:?]
    at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770)
 ~[?:?]
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) 
~[?:?]
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 
~[?:?]
    at java.lang.Thread.run(Thread.java:840) ~[?:?]
Caused by: java.lang.ClassCastException: cannot assign instance of 
org.apache.flink.api.common.time.Time to field 
org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration.delayBetweenAttemptsInterval
 of type java.time.Duration in instance of 
org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration
    at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2096)
 ~[?:?]
    at 
java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2060)
 ~[?:?]
    at 
java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1347) 
~[?:?]
    at 
java.io.ObjectInputStream$FieldValues.defaultCheckFieldValues(ObjectInputStream.java:2679)
 ~[?:?]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2486) 
~[?:?]
    at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257) ~[?:?]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733) ~[?:?]
    at 
java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606) ~[?:?]
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457) 
~[?:?]
    at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257) ~[?:?]
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733) ~[?:?]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:509) ~[?:?]
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:467) ~[?:?]
    at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539)
 ~[flink-dist-1.19.0.jar:1.19.0]
    at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527)
 ~[flink-dist-1.19.0.jar:1.19.0]
    at 
org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67) 
~[flink-dist-1.19.0.jar:1.19.0]
    at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:102)
 ~[flink-dist-1.19.0.jar:1.19.0]
    at 
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:121)
 ~[flink-dist-1.19.0.jar:1.19.0]
    at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379)
 ~[flink-dist-1.19.0.jar:1.19.0]
    at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:356) 
~[flink-dist-1.19.0.jar:1.19.0]
    at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128)
 ~[flink-dist-1.19.0.jar:1.19.0]
    at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100)
 ~[flink-dist-1.19.0.jar:1.19.0]
    at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
 ~[flink-dist-1.19.0.jar:1.19.0]
    at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
 ~[?:?]
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) 
~[?:?]
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 
~[?:?]
    at java.lang.Thread.run(Thread.java:840) ~[?:?]
2024-06-18 16:58:14,403 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting 
StandaloneSessionClusterEntrypoint down with application status UNKNOWN. 
Diagnostics Cluster entrypoint has been closed externally..
2024-06-18 16:58:14,404 INFO  org.apache.flink.runtime.blob.BlobServer          
           [] - Stopped BLOB server at 127.0.0.1:40067
2024-06-18 16:58:14,431 INFO  
org.apache.flink.shaded.curator5.org.apache.curator.utils.Compatibility [] - 
Using org.apache.zookeeper.server.quorum.MultipleAddresses
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to