Thank you~

Xintong Song



On Sat, Jan 30, 2021 at 8:27 AM Xintong Song <tonysong...@gmail.com> wrote:

> There's indeed a ZK version upgrading during 1.9 and 1.11, but I'm not
> aware of any similar issue reported since the upgrading.
> I would suggest the following:
> - Turn on the DEBUG log see if there's any valuable details
> - Maybe try asking in the Apache Zookeeper community, see if this is a
> known issue.
>
> Thank you~
> Xintong Song
>
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Sat, Jan 30, 2021 at 6:47 AM Lu Niu <qqib...@gmail.com> wrote:
>
>> Hi, Xintong
>>
>> Thanks for replying. Could it relate to the zk version? We are a platform
>> team at Pinterest in the middle of migrating form 1.9.1 to 1.11. Both 1.9
>> and 1.11 jobs talking to the same ZK for JM HA. This problem only surfaced
>> in 1.11 jobs. That's why we think it is related to version upgrade.
>>
>> Best
>> Lu
>>
>> On Thu, Jan 28, 2021 at 7:56 PM Xintong Song <tonysong...@gmail.com>
>> wrote:
>>
>>> The ZK client side uses 15s connection timeout and 60s session timeout
>>> in Flink. There's nothing similar to a heartbeat interval configured, which
>>> I assume is up to ZK's internal implementation. These things have not
>>> changed in FLink since at least 2017.
>>>
>>> If both ZK client and server complain about timeout, and there's no gc
>>> issue spotted, I would consider a network instability.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Fri, Jan 29, 2021 at 3:15 AM Lu Niu <qqib...@gmail.com> wrote:
>>>
>>>> After checking the log I found the root cause is zk client timeout on
>>>> TM:
>>>> ```
>>>> 2021-01-25 14:01:49,600 WARN
>>>> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client
>>>> session timed out, have not heard from server in 40020ms for sessionid
>>>> 0x404f9ca531a5d6f
>>>> 2021-01-25 14:01:49,610 INFO
>>>> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client
>>>> session timed out, have not heard from server in 40020ms for sessionid
>>>> 0x404f9ca531a5d6f, closing socket connection and attempting reconnect
>>>> 2021-01-25 14:01:49,711 INFO
>>>> org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager
>>>> - State change: SUSPENDED
>>>> 2021-01-25 14:01:49,711 WARN
>>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
>>>> Connection to ZooKeeper suspended. Can no longer retrieve the leader from
>>>> ZooKeeper.
>>>> 2021-01-25 14:01:49,712 INFO
>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - JobManager for job
>>>> 27ac39342913d29baac4cde13062c4a4 with leader id
>>>> b5af099c17a05fcf15e7bbfcb74e49ea lost leadership.
>>>> 2021-01-25 14:01:49,712 WARN
>>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
>>>> Connection to ZooKeeper suspended. Can no longer retrieve the leader from
>>>> ZooKeeper.
>>>> 2021-01-25 14:01:49,712 INFO
>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager
>>>> connection for job 27ac39342913d29baac4cde13062c4a4.
>>>> 2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskmanager.Task
>>>> - Attempting to fail task externally Sink:
>>>> USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360)
>>>> (d5b5887e639874cb70d7fef939b957b7).
>>>> 2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.taskmanager.Task
>>>> - Sink: USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360)
>>>> (d5b5887e639874cb70d7fef939b957b7) switched from RUNNING to FAILED.
>>>> org.apache.flink.util.FlinkException: JobManager responsible for
>>>> 27ac39342913d29baac4cde13062c4a4 lost the leadership.
>>>> ```
>>>>
>>>> I checked that TM gc log, no gc issues. it also shows client timeout in
>>>> zookeeper server log. How frequently the zk client sync with server side in
>>>> flink? The log says client doesn't heartbeat to server for 40s. Any help?
>>>> thanks!
>>>>
>>>> Best
>>>> Lu
>>>>
>>>>
>>>> On Thu, Dec 17, 2020 at 6:10 PM Xintong Song <tonysong...@gmail.com>
>>>> wrote:
>>>>
>>>>> I'm not aware of any significant changes to the HA components between
>>>>> 1.9/1.11.
>>>>> Would you mind sharing the complete jobmanager/taskmanager logs?
>>>>>
>>>>> Thank you~
>>>>>
>>>>> Xintong Song
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Dec 18, 2020 at 8:53 AM Lu Niu <qqib...@gmail.com> wrote:
>>>>>
>>>>>> Hi, Xintong
>>>>>>
>>>>>> Thanks for replying and your suggestion. I did check the ZK side but
>>>>>> there is nothing interesting. The error message actually shows that only
>>>>>> one TM thought JM lost leadership while others ran fine. Also, this
>>>>>> happened only after we migrated from 1.9 to 1.11. Is it possible this is
>>>>>> introduced by 1.11?
>>>>>>
>>>>>> Best
>>>>>> Lu
>>>>>>
>>>>>> On Wed, Dec 16, 2020 at 5:56 PM Xintong Song <tonysong...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Lu,
>>>>>>>
>>>>>>> I assume you are using ZooKeeper as the HA service?
>>>>>>>
>>>>>>> A common cause of unexpected leadership lost is the instability of
>>>>>>> HA service. E.g., if ZK does not receive heartbeat from Flink RM for a
>>>>>>> certain period of time, it will revoke the leadership and notify
>>>>>>> other components. You can look into the ZooKeeper logs checking why RM's
>>>>>>> leadership is revoked.
>>>>>>>
>>>>>>> Thank you~
>>>>>>>
>>>>>>> Xintong Song
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Dec 17, 2020 at 8:42 AM Lu Niu <qqib...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi, Flink users
>>>>>>>>
>>>>>>>> Recently we migrated to flink 1.11 and see exceptions like:
>>>>>>>> ```
>>>>>>>> 2020-12-15 12:41:01,199 INFO
>>>>>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - 
>>>>>>>> Source:
>>>>>>>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event ->
>>>>>>>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event-as_nrtgtuple (21/60)
>>>>>>>> (711d1d319691a4b80e30fe6ab7dfab5b) switched from RUNNING to FAILED on
>>>>>>>> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@50abf386.
>>>>>>>> java.lang.Exception: Job leader for job id
>>>>>>>> 47b1531f79ffe3b86bc5910f6071e40c lost leadership.
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852)
>>>>>>>> ~[nrtg-1.11_deploy.jar:?]
>>>>>>>> at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_212-ga]
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851)
>>>>>>>> ~[nrtg-1.11_deploy.jar:?]
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>>>>>>>> ~[nrtg-1.11_deploy.jar:?]
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>>>>>>>> ~[nrtg-1.11_deploy.jar:?]
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>>>>>>> ~[nrtg-1.11_deploy.jar:?]
>>>>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>>>>>>>> [nrtg-1.11_deploy.jar:?]
>>>>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>>>>>>>> [nrtg-1.11_deploy.jar:?]
>>>>>>>> at
>>>>>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>>>>>> [nrtg-1.11_deploy.jar:?]
>>>>>>>> at
>>>>>>>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>>>>>>>> [nrtg-1.11_deploy.jar:?]
>>>>>>>> at
>>>>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>>>>>>> [nrtg-1.11_deploy.jar:?]
>>>>>>>> at
>>>>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>>>>> [nrtg-1.11_deploy.jar:?]
>>>>>>>> at
>>>>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>>>>> [nrtg-1.11_deploy.jar:?]
>>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:539)
>>>>>>>> [nrtg-1.11_deploy.jar:?]
>>>>>>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227)
>>>>>>>> [nrtg-1.11_deploy.jar:?]
>>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612)
>>>>>>>> [nrtg-1.11_deploy.jar:?]
>>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:581)
>>>>>>>> [nrtg-1.11_deploy.jar:?]
>>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
>>>>>>>> [nrtg-1.11_deploy.jar:?]
>>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:229)
>>>>>>>> [nrtg-1.11_deploy.jar:?]
>>>>>>>> ```
>>>>>>>>
>>>>>>>> ```
>>>>>>>> 2020-12-15 01:01:39,531 INFO
>>>>>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
>>>>>>>> USER_MATERIALIZED_EVENT_SIGNAL.user_context.SINK-stream_joiner ->
>>>>>>>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-SINK-SINKS.realpin 
>>>>>>>> (260/360)
>>>>>>>> (0c1f4495088ec9452c597f46a88a2c8e) switched from RUNNING to FAILED on
>>>>>>>> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2362b2fd.
>>>>>>>> org.apache.flink.util.FlinkException: ResourceManager leader
>>>>>>>> changed to new address null
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093)
>>>>>>>> ~[nrtg-1.11_deploy.jar:?]
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173)
>>>>>>>> ~[nrtg-1.11_deploy.jar:?]
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816)
>>>>>>>> ~[nrtg-1.11_deploy.jar:?]
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>>>>>>>> ~[nrtg-1.11_deploy.jar:?]
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>>>>>>>> ~[nrtg-1.11_deploy.jar:?]
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>>>>>>> ~[nrtg-1.11_deploy.jar:?]
>>>>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>>>>>>>> [nrtg-1.11_deploy.jar:?]
>>>>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>>>>>>>> [nrtg-1.11_deploy.jar:?]
>>>>>>>> at
>>>>>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>>>>>> [nrtg-1.11_deploy.jar:?]
>>>>>>>> at
>>>>>>>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>>>>>>>> [nrtg-1.11_deploy.jar:?]
>>>>>>>> at
>>>>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>>>>>>> [nrtg-1.11_deploy.jar:?]
>>>>>>>> at
>>>>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>>>>> [nrtg-1.11_deploy.jar:?]
>>>>>>>> at
>>>>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>>>>> [nrtg-1.11_deploy.jar:?]
>>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:539)
>>>>>>>> [nrtg-1.11_deploy.jar:?]
>>>>>>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227)
>>>>>>>> [nrtg-1.11_deploy.jar:?]
>>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612)
>>>>>>>> [nrtg-1.11_deploy.jar:?]
>>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:581)
>>>>>>>> [nrtg-1.11_deploy.jar:?]
>>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
>>>>>>>> [nrtg-1.11_deploy.jar:?]
>>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:229)
>>>>>>>> [nrtg-1.11_deploy.jar:?]
>>>>>>>> ```
>>>>>>>>
>>>>>>>> This happens a few times per week. It seems like one Task Manager
>>>>>>>> wrongly thought JobMananger is lost and triggers a full restart of the
>>>>>>>> whole job. Does anyone know how to resolve such errors? Thanks!
>>>>>>>>
>>>>>>>> Best
>>>>>>>> Lu
>>>>>>>>
>>>>>>>

Reply via email to