Hi there, 

I have a stream where I reload a huge list from time to time. I know there are 
various Side-Input patterns, but none of them seem to be perfect so I stuck 
with an easy approach: I use a Guava Cache and if it expires and a new element 
comes in, processing of the element is blocked up until the new list is loaded. 

That approach runs in production for a while now and it works fine, as the 
cache has a mechanism to reload the list only on a real change. Now today, the 
list changed from a few hundred MB to multiple GB at a time where the network 
in general was a bit congested already. One TaskManager needed round about 
4minutes to load the list, but after 30seconds, it reported it lost connection 
to zookeeper and had thus no more information about the leading jobmanager, 
leading to a crashing loop. That crash & restart loop continued for 30minutes 
up until the list was rolled back and was then successfully loaded again. 

Now my question: 
* If processing of an element blocks, I understand that its also not possible 
to perform checkpoints at that time, but I didn't expect Zookeeper, Heartbeats 
or other threads of the taskmanager to timeout. Was that just a coincidence of 
the network being congested or is that something in the design of Flink that a 
long blocking call can lead to crashes? (Other than X checkpoints timed out and 
following a configured forced crash occured). Which threads can be blocked in 
Flink during a map in a MapFunction? 
* For this approach with kind of a cached reload, should I switch to async IO 
or just put loading of the list in a background thread? In my case, it's not 
really important that processing is blocked up until the list is loaded. And in 
case of async IO: 99,999% of the events would directly return and would thus 
not be async, it's always just a single one triggering reload of the list, so 
it doesn't seem to be perfectly suited here? 

Im running on Flink 1.11 and heres the relevant excerpt from the log: 

2020-11-05T09:41:40.933865+01:00 [WARN] Client session timed out, have not 
heard from server in 33897ms for sessionid 0x374de97ba0afac9 
2020-11-05T09:41:40.936488+01:00 [INFO] Client session timed out, have not 
heard from server in 33897ms for sessionid 0x374de97ba0afac9, closing socket 
connection and attempting reconnect 
2020-11-05T09:41:41.042032+01:00 [INFO] State change: SUSPENDED 
2020-11-05T09:41:41.168802+01:00 [WARN] Connection to ZooKeeper suspended. Can 
no longer retrieve the leader from ZooKeeper. 
2020-11-05T09:41:41.169276+01:00 [WARN] Connection to ZooKeeper suspended. Can 
no longer retrieve the leader from ZooKeeper. 
2020-11-05T09:41:41.169514+01:00 [INFO] Close ResourceManager connection 
e4d1f9acca4ea3c5a793877467218452. 
2020-11-05T09:41:41.169514+01:00 [INFO] JobManager for job 
0dcfb212136daefbcbfe480c6a260261 with leader id 
8440610cd5de998bd6c65f3717de42b8 lost leadership. 
2020-11-05T09:41:41.185104+01:00 [INFO] Close JobManager connection for job 
0dcfb212136daefbcbfe480c6a260261. 
2020-11-05T09:41:41.185354+01:00 [INFO] Attempting to fail task externally .... 
(c596aafa324b152911cb53ab4e6d1cc2). 
2020-11-05T09:41:41.187980+01:00 [WARN] ... (c596aafa324b152911cb53ab4e6d1cc2) 
switched from RUNNING to FAILED. 
org.apache.flink.util.FlinkException: JobManager responsible for 
0dcfb212136daefbcbfe480c6a260261 lost the leadership. 
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.disconnectJobManagerConnection(TaskExecutor.java:1415)
 
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1300(TaskExecutor.java:173)
 
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852)
 
at java.util.Optional.ifPresent(Optional.java:159) 
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851)
 
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
 
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
 
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: java.lang.Exception: Job leader for job id 
0dcfb212136daefbcbfe480c6a260261 lost leadership. 
... 24 more 

Best regards 
Theo 

Reply via email to