Can you share the logs with us (ideally on DEBUG if available) from the affected TaskManager and JobManager?

On 19/08/2021 08:29, Ivan Yang wrote:
Dear Flink community,

I recently running into this issue at a job startup. It happened from time to 
time. Here is the exception from the job manager:

2021-08-17 01:21:01,944 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Defence raw 
event prod05_analytics_output -> String to JSON -> (Sink: Malformed JSON Sink, ThreatSight Filter -> Raw Json 
to ThreatSight Json -> ThreatSight Json to String -> Sink: ThreatSight Sink, Json to CDCA -> (mssp eventlog 
filter 1 -> mssp eventlog filter 2 -> CDCA to eventlog json -> Flat Map, Sink: Parquet Sink Event Time), mssp 
json filter -> action filter -> raw json to action json, Filter -> Json to ResponseAlarm, Filter -> json 
to SecurityEvent) (542/626) (a7be17221c0726a67679091062cfa8dc) switched from DEPLOYING to FAILED on 
172.1.200.173:6122-856ad2 @ ip-172-1-200-173.ec2.internal (dataPort=6121).
org.apache.flink.util.FlinkException: Could not mark slot 
58af05c3109a0fe8f96ea8936c0783a4 active.
        at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$handleAcceptedSlotOffers$18(TaskExecutor.java:1561)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
 ~[?:1.8.0_302]
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
 ~[?:1.8.0_302]
        at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
 ~[?:1.8.0_302]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.11-1.13.1.jar:1.13.1]
2021-08-17 01:21:01,945 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Discarding 
the results produced by task execution a7be17221c0726a67679091062cfa8dc.
2021-08-17 01:21:01,945 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Discarding 
the results produced by task execution a7be17221c0726a67679091062cfa8dc.
2021-08-17 01:21:01,945 INFO  
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - Calculating tasks to restart to recover the failed task 
7ac3b50525c642dc419b976cfaf0ee0e_541.
2021-08-17 01:21:01,999 INFO  
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
 [] - 3150 tasks should be restarted to recover the failed task 
7ac3b50525c642dc419b976cfaf0ee0e_541.
2021-08-17 01:21:02,012 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job Event 
Router prod05 (0f10bafc07e48918f955fb22cbaa5735) switched from state RUNNING to 
RESTARTING.

I am using kubernetes deployment session mode. We have 4 jobs running in the 
cluster, 3 small jobs never had the problem. The issue only happened to the one 
large job, which is 10 time more parallelisms then other smaller jobs. Can 
someone help me on what the root cause of the issue and how to avoid it.

Thanks,
Ivan



Reply via email to