I'd go with the network congestion theory for the time being; then the only remedy is throttling the download of said list, or somehow reducing the size of it significantly
.
What the task thread is doing doesn't matter in regards to HA; it may cause checkpoints to time out, but should have no other effects. (unless it magically consumes all CPU resources of system) If you block in any function, then you're just blocking the task thread; nothing else.

On 11/5/2020 10:40 PM, Theo Diefenthal wrote:
Hi there,

I have a stream where I reload a huge list from time to time. I know there are various Side-Input patterns, but none of them seem to be perfect so I stuck with an easy approach: I use a Guava Cache and if it expires and a new element comes in, processing of the element is blocked up until the new list is loaded.

That approach runs in production for a while now and it works fine, as the cache has a mechanism to reload the list only on a real change. Now today, the list changed from a few hundred MB to multiple GB at a time where the network in general was a bit congested already. One TaskManager needed round about 4minutes to load the list, but after 30seconds, it reported it lost connection to zookeeper and had thus no more information about the leading jobmanager, leading to a crashing loop. That crash & restart loop continued for 30minutes up until the list was rolled back and was then successfully loaded again.

Now my question:
* If processing of an element blocks, I understand that its also not possible to perform checkpoints at that time, but I didn't expect Zookeeper, Heartbeats or other threads of the taskmanager to timeout. Was that just a coincidence of the network being congested or is that something in the design of Flink that a long blocking call can lead to crashes? (Other than X checkpoints timed out and following a configured forced crash occured). Which threads can be blocked in Flink during a map in a MapFunction? * For this approach with kind of a cached reload, should I switch to async IO or just put loading of the list in a background thread? In my case, it's not really important that processing is blocked up until the list is loaded. And in case of async IO: 99,999% of the events would directly return and would thus not be async, it's always just a single one triggering reload of the list, so it doesn't seem to be perfectly suited here?

Im running on Flink 1.11 and heres the relevant excerpt from the log:

2020-11-05T09:41:40.933865+01:00 [WARN] Client session timed out, have not heard from server in 33897ms for sessionid 0x374de97ba0afac9 2020-11-05T09:41:40.936488+01:00 [INFO] Client session timed out, have not heard from server in 33897ms for sessionid 0x374de97ba0afac9, closing socket connection and attempting reconnect
2020-11-05T09:41:41.042032+01:00 [INFO] State change: SUSPENDED
2020-11-05T09:41:41.168802+01:00 [WARN] Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper. 2020-11-05T09:41:41.169276+01:00 [WARN] Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper. 2020-11-05T09:41:41.169514+01:00 [INFO] Close ResourceManager connection e4d1f9acca4ea3c5a793877467218452. 2020-11-05T09:41:41.169514+01:00 [INFO] JobManager for job 0dcfb212136daefbcbfe480c6a260261 with leader id 8440610cd5de998bd6c65f3717de42b8 lost leadership. 2020-11-05T09:41:41.185104+01:00 [INFO] Close JobManager connection for job 0dcfb212136daefbcbfe480c6a260261. 2020-11-05T09:41:41.185354+01:00 [INFO] Attempting to fail task externally .... (c596aafa324b152911cb53ab4e6d1cc2). 2020-11-05T09:41:41.187980+01:00 [WARN] ... (c596aafa324b152911cb53ab4e6d1cc2) switched from RUNNING to FAILED. org.apache.flink.util.FlinkException: JobManager responsible for 0dcfb212136daefbcbfe480c6a260261 lost the leadership.     at org.apache.flink.runtime.taskexecutor.TaskExecutor.disconnectJobManagerConnection(TaskExecutor.java:1415)     at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1300(TaskExecutor.java:173)     at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852)
    at java.util.Optional.ifPresent(Optional.java:159)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851)     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.Exception: Job leader for job id 0dcfb212136daefbcbfe480c6a260261 lost leadership.
    ... 24 more

Best regards
Theo


Reply via email to