The thread is blocked in your user-code, so whether we can unblock it depends on what said user-code is doing.

On 05/12/2021 19:13, Yuval Itzchakov wrote:
Hi,
Flink 1.14.0,
Scala 2.12
Flink on Kubernetes

I use Lyfts FlinkOperator, which sets up a job cluster in Kubernetes and then submits the job via the REST API. At times, the job fails. Specifically one case I am analyzing fails due to invalid state migration. I see the following error when
executing the job:

2021-12-05 20:01:18,489 WARN o.a.f.r.t.Task GroupWindowAggregate must not be 
incompatible with the old state serializer 
(org.apache.flink.table.runtime.typeutils.RowDataSerializer@1b2d793e).
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
        at 
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
        at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73)
        at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302)
        at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.getOrCreateKeyedState(StreamOperatorStateHandler.java:316)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:471)
        at 
org.apache.flink.table.runtime.operators.window.WindowOperator.open(WindowOperator.java:263)
This is thrown when the TaskManger attempts to load the state. When this exception is thrown, I see the following error propagated to the JM:
2021-12-05 20:01:19,130 INFO o.a.f.r.d.Dispatcher Job 
6f631fe929aa2d7c9642cb3a04412b0e reached terminal state FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=1, 
backoffTimeMS=10000)
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
        at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
        at jdk.internal.reflect.GeneratedMethodAccessor33.invoke(Unknown Source)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
        at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at akka.actor.Actor.aroundReceive(Actor.scala:537)
        at akka.actor.Actor.aroundReceive$(Actor.scala:535)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
        at akka.actor.ActorCell.invoke(ActorCell.scala:548)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
        at akka.dispatch.Mailbox.run(Mailbox.scala:231)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
        at 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
        at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
        at 
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
        at 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
        at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: org.apache.flink.util.StateMigrationException: The new state 
serializer 
(org.apache.flink.table.runtime.typeutils.RowDataSerializer@1b2d793e) must not 
be incompatible with the old state serializer 
(org.apache.flink.table.runtime.typeutils.RowDataSerializer@1b2d793e).
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
        at 
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
        at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73)
        at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302)
        at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.getOrCreateKeyedState(StreamOperatorStateHandler.java:316)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.getOrCreateKeyedState(AbstractStreamOperator.java:471)
        at 
org.apache.flink.table.runtime.operators.window.WindowOperator.open(WindowOperator.java:263)
However, when taking a thread dump, I see the thread of the REST API which accepted the request is still stuck on invoking the main function:

"Flink-DispatcherRestEndpoint-thread-2" #60 daemon prio=5 os_prio=31 
cpu=1524.33ms elapsed=134.31s tid=0x00007fe96c458800 nid=0x12403 in Object.wait()  
[0x00007000068cc000]
    java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(java.base@11.0.9.1 <http://11.0.9.1>/Native 
Method)
        - waiting on <0x0000000612f5a3d8> (a zio.internal.OneShot)
        at java.lang.Object.wait(java.base@11.0.9.1 
<http://11.0.9.1>/Object.java:328)
        at zio.internal.OneShot.get(OneShot.scala:79)
        - waiting to re-lock in wait() <0x0000000612f5a3d8> (a 
zio.internal.OneShot)
        at zio.Runtime.unsafeRunSync(Runtime.scala:85)
        at zio.Runtime.unsafeRunSync$(Runtime.scala:80)
        at Pipeline$.unsafeRunSync(Pipeline.scala:35)
        at zio.Runtime.unsafeRun(Runtime.scala:58)
        at zio.Runtime.unsafeRun$(Runtime.scala:57)
        at Pipeline$.unsafeRun(Pipeline.scala:35)
        at zio.App.main(App.scala:58)
        at zio.App.main$(App.scala:54)
        at Pipeline$.main(Pipeline.scala:35)
        at Pipeline.main(Pipeline.scala)
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(java.base@11.0.9.1 
<http://11.0.9.1>/Native Method)
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(java.base@11.0.9.1 
<http://11.0.9.1>/NativeMethodAccessorImpl.java:62)
        at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(java.base@11.0.9.1 
<http://11.0.9.1>/DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(java.base@11.0.9.1 
<http://11.0.9.1>/Method.java:566)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
        at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
        at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)
        at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler$$Lambda$767/0x00000008007b2c40.get(Unknown
 Source)
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(java.base@11.0.9.1 
<http://11.0.9.1>/CompletableFuture.java:1700)
        at java.util.concurrent.Executors$RunnableAdapter.call(java.base@11.0.9.1 
<http://11.0.9.1>/Executors.java:515)
        at java.util.concurrent.FutureTask.run(java.base@11.0.9.1 
<http://11.0.9.1>/FutureTask.java:264)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(java.base@11.0.9.1
 <http://11.0.9.1>/ScheduledThreadPoolExecutor.java:304)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.9.1 
<http://11.0.9.1>/ThreadPoolExecutor.java:1128)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.9.1 
<http://11.0.9.1>/ThreadPoolExecutor.java:628)
        at java.lang.Thread.run(java.base@11.0.9.1 
<http://11.0.9.1>/Thread.java:834)
The problem I have with this, is that the operator will attempt to re-submit the job as it did not receive any response, but a timeout. This will happen continuously until it exhausts
the available threads defined by rest.server.numThreads.
How can I make sure the exception thrown from the JM causes the REST API thread to be terminated?
--
Best Regards,
Yuval Itzchakov.

Reply via email to