Hi, Flink 1.14.0, Scala 2.12 Flink on Kubernetes I use Lyfts FlinkOperator, which sets up a job cluster in Kubernetes and then submits the job via the REST API. At times, the job fails. Specifically one case I am analyzing fails due to invalid state migration. I see the following error when executing the job:
2021-12-05 20:01:18,489 WARN o.a.f.r.t.Task GroupWindowAggregate must not be incompatible with the old state serializer (org.apache.flink.table.runtime.typeutils.RowDataSerializer@1b2d793e). at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837) at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.getOrCreateKeyedState(StreamOperatorStateHandler.java:316) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:471) at org.apache.flink.table.runtime.operators.window.WindowOperator.open(WindowOperator.java:263) This is thrown when the TaskManger attempts to load the state. When this exception is thrown, I see the following error propagated to the JM: 2021-12-05 20:01:19,130 INFO o.a.f.r.d.Dispatcher Job 6f631fe929aa2d7c9642cb3a04412b0e reached terminal state FAILED. org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=1, backoffTimeMS=10000) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444) at jdk.internal.reflect.GeneratedMethodAccessor33.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor.aroundReceive$(Actor.scala:535) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) at akka.actor.ActorCell.invoke(ActorCell.scala:548) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) Caused by: org.apache.flink.util.StateMigrationException: The new state serializer (org.apache.flink.table.runtime.typeutils.RowDataSerializer@1b2d793e) must not be incompatible with the old state serializer (org.apache.flink.table.runtime.typeutils.RowDataSerializer@1b2d793e). at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837) at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.getOrCreateKeyedState(StreamOperatorStateHandler.java:316) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:471) at org.apache.flink.table.runtime.operators.window.WindowOperator.open(WindowOperator.java:263) However, when taking a thread dump, I see the thread of the REST API which accepted the request is still stuck on invoking the main function: "Flink-DispatcherRestEndpoint-thread-2" #60 daemon prio=5 os_prio=31 cpu=1524.33ms elapsed=134.31s tid=0x00007fe96c458800 nid=0x12403 in Object.wait() [0x00007000068cc000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(java.base@11.0.9.1/Native Method) - waiting on <0x0000000612f5a3d8> (a zio.internal.OneShot) at java.lang.Object.wait(java.base@11.0.9.1/Object.java:328) at zio.internal.OneShot.get(OneShot.scala:79) - waiting to re-lock in wait() <0x0000000612f5a3d8> (a zio.internal.OneShot) at zio.Runtime.unsafeRunSync(Runtime.scala:85) at zio.Runtime.unsafeRunSync$(Runtime.scala:80) at Pipeline$.unsafeRunSync(Pipeline.scala:35) at zio.Runtime.unsafeRun(Runtime.scala:58) at zio.Runtime.unsafeRun$(Runtime.scala:57) at Pipeline$.unsafeRun(Pipeline.scala:35) at zio.App.main(App.scala:58) at zio.App.main$(App.scala:54) at Pipeline$.main(Pipeline.scala:35) at Pipeline.main(Pipeline.scala) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(java.base@11.0.9.1/Native Method) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(java.base@11.0.9.1/NativeMethodAccessorImpl.java:62) at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(java.base@11.0.9.1/DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(java.base@11.0.9.1/Method.java:566) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84) at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70) at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102) at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler$$Lambda$767/0x00000008007b2c40.get(Unknown Source) at java.util.concurrent.CompletableFuture$AsyncSupply.run(java.base@11.0.9.1/CompletableFuture.java:1700) at java.util.concurrent.Executors$RunnableAdapter.call(java.base@11.0.9.1/Executors.java:515) at java.util.concurrent.FutureTask.run(java.base@11.0.9.1/FutureTask.java:264) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(java.base@11.0.9.1/ScheduledThreadPoolExecutor.java:304) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.9.1/ThreadPoolExecutor.java:1128) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.9.1/ThreadPoolExecutor.java:628) at java.lang.Thread.run(java.base@11.0.9.1/Thread.java:834) The problem I have with this, is that the operator will attempt to re-submit the job as it did not receive any response, but a timeout. This will happen continuously until it exhausts the available threads defined by rest.server.numThreads. How can I make sure the exception thrown from the JM causes the REST API thread to be terminated? -- Best Regards, Yuval Itzchakov.