yazgoo created FLINK-35639:
------------------------------
Summary: upgrading to 1.19 with job in HA state with restart
strategy crashes job manager
Key: FLINK-35639
URL: https://issues.apache.org/jira/browse/FLINK-35639
Project: Flink
Issue Type: Bug
Components: API / Core
Affects Versions: 1.19.1
Environment: Download 1.18 and 1.19 binary releases. Add the following
to flink-1.19.0/conf/config.yaml and flink-1.18.1/conf/flink-conf.yaml ```yaml
high-availability: zookeeper high-availability.zookeeper.quorum: localhost
high-availability.storageDir: file:///tmp/flink/recovery ``` Launch zookeeper:
docker run --network host zookeeper:latest launch 1.18 task manager:
./flink-1.18.1/bin/taskmanager.sh start-foreground launch 1.18 job manager:
./flink-1.18.1/bin/jobmanager.sh start-foreground launch the following job:
```java import org.apache.flink.api.java.ExecutionEnvironment; import
org.apache.flink.api.java.tuple.Tuple2; import
org.apache.flink.api.common.functions.FlatMapFunction; import
org.apache.flink.util.Collector; import
org.apache.flink.api.common.restartstrategy.RestartStrategies; import
org.apache.flink.api.common.time.Time; import java.util.concurrent.TimeUnit;
public class FlinkJob \{ public static void main(String[] args) throws
Exception { final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(
RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, Time.of(20,
TimeUnit.SECONDS)) ); env.fromElements("Hello World", "Hello Flink")
.flatMap(new LineSplitter()) .groupBy(0) .sum(1) .print(); } public static
final class LineSplitter implements FlatMapFunction> \{ @Override public void
flatMap(String value, Collector> out) { for (String word : value.split(" ")) {
try { Thread.sleep(120000); } catch (InterruptedException e) \{
e.printStackTrace(); } out.collect(new Tuple2<>(word, 1)); } } } } ``` ```xml
4.0.0 org.apache.flink myflinkjob 1.0-SNAPSHOT 1.18.1 1.8 org.apache.flink
flink-java ${flink.version} org.apache.flink flink-streaming-java
${flink.version} org.apache.maven.plugins maven-compiler-plugin 3.8.1
${java.version} ${java.version} org.apache.maven.plugins maven-jar-plugin 3.1.0
true lib/ FlinkJob ``` Launch job: ./flink-1.18.1/bin/flink run
../flink-job/target/myflinkjob-1.0-SNAPSHOT.jar Job has been submitted with
JobID 5f0898c964a93a47aa480427f3e2c6c0 Kill job manager and task manager. Then
launch job manager 1.19.0 ./flink-1.19.0/bin/jobmanager.sh start-foreground
Root cause ========== It looks like the type of delayBetweenAttemptsInterval
was changed in 1.19
https://github.com/apache/flink/pull/22984/files#diff-d174f32ffdea69de610c4f37c545bd22a253b9846434f83397f1bbc2aaa399faR239
, introducing an incompatibility which is not handled by flink 1.19. In my
opinion, job-maanger should not crash when starting in that case.
Reporter: yazgoo
When trying to upgrade a flink cluster from 1.18 to 1.19, with a 1.18 job in
zookeeper HA state, I have a ClassCastException, see log below
```log
2024-06-18 16:58:14,401 ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error
occurred in the cluster entrypoint.
org.apache.flink.util.FlinkException: JobMaster for job
5f0898c964a93a47aa480427f3e2c6c0 failed.
at
org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:1484)
~[flink-dist-1.19.0.jar:1.19.0]
at
org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:775)
~[flink-dist-1.19.0.jar:1.19.0]
at
org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:738)
~[flink-dist-1.19.0.jar:1.19.0]
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$7(Dispatcher.java:693)
~[flink-dist-1.19.0.jar:1.19.0]
at
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
~[?:?]
at
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
~[?:?]
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
~[?:?]
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
at
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
~[flink-dist-1.19.0.jar:1.19.0]
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
at
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
~[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
at
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
at
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
at
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
at
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
[flink-rpc-akka84eb9e64-a1ce-450c-ad53-d9fa579b67e1.jar:1.19.0]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) [?:?]
at
java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
[?:?]
at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) [?:?]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) [?:?]
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
[?:?]
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could
not start the JobMaster.
at
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
~[flink-dist-1.19.0.jar:1.19.0]
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
~[?:?]
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
~[?:?]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
~[?:?]
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
~[?:?]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
~[?:?]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
~[?:?]
at java.lang.Thread.run(Thread.java:840) ~[?:?]
Caused by: java.util.concurrent.CompletionException:
java.lang.ClassCastException: cannot assign instance of
org.apache.flink.api.common.time.Time to field
org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration.delayBetweenAttemptsInterval
of type java.time.Duration in instance of
org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
~[?:?]
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
~[?:?]
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770)
~[?:?]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
~[?:?]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
~[?:?]
at java.lang.Thread.run(Thread.java:840) ~[?:?]
Caused by: java.lang.ClassCastException: cannot assign instance of
org.apache.flink.api.common.time.Time to field
org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration.delayBetweenAttemptsInterval
of type java.time.Duration in instance of
org.apache.flink.api.common.restartstrategy.RestartStrategies$FixedDelayRestartStrategyConfiguration
at
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2096)
~[?:?]
at
java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2060)
~[?:?]
at
java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1347)
~[?:?]
at
java.io.ObjectInputStream$FieldValues.defaultCheckFieldValues(ObjectInputStream.java:2679)
~[?:?]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2486)
~[?:?]
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257) ~[?:?]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733) ~[?:?]
at
java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606) ~[?:?]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
~[?:?]
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257) ~[?:?]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733) ~[?:?]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:509) ~[?:?]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:467) ~[?:?]
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539)
~[flink-dist-1.19.0.jar:1.19.0]
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527)
~[flink-dist-1.19.0.jar:1.19.0]
at
org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)
~[flink-dist-1.19.0.jar:1.19.0]
at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:102)
~[flink-dist-1.19.0.jar:1.19.0]
at
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:121)
~[flink-dist-1.19.0.jar:1.19.0]
at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379)
~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:356)
~[flink-dist-1.19.0.jar:1.19.0]
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128)
~[flink-dist-1.19.0.jar:1.19.0]
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100)
~[flink-dist-1.19.0.jar:1.19.0]
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
~[flink-dist-1.19.0.jar:1.19.0]
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
~[?:?]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
~[?:?]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
~[?:?]
at java.lang.Thread.run(Thread.java:840) ~[?:?]
2024-06-18 16:58:14,403 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting
StandaloneSessionClusterEntrypoint down with application status UNKNOWN.
Diagnostics Cluster entrypoint has been closed externally..
2024-06-18 16:58:14,404 INFO org.apache.flink.runtime.blob.BlobServer
[] - Stopped BLOB server at 127.0.0.1:40067
2024-06-18 16:58:14,431 INFO
org.apache.flink.shaded.curator5.org.apache.curator.utils.Compatibility [] -
Using org.apache.zookeeper.server.quorum.MultipleAddresses
```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)