Zhenzhong Xu created FLINK-7278:
-----------------------------------

             Summary: Flink job can stuck while ZK leader reelected during ZK 
cluster migration 
                 Key: FLINK-7278
                 URL: https://issues.apache.org/jira/browse/FLINK-7278
             Project: Flink
          Issue Type: Bug
          Components: Distributed Coordination
            Reporter: Zhenzhong Xu
            Priority: Minor


We have observed an potential failure case while Flink job was running during 
ZK migration. Below describes the scenario.

1. Flink cluster running with standalone mode on Netfilx Titus container 
runtime 
2. We performed a ZK migration by updating new OS image one node at a time.
3. During ZK leader reelection, Flink cluster starts to exhibit failures and 
eventually end in a non-recoverable failure mode.
4. This behavior does not repro every time, may be caused by an edge race 
condition.

Below is a list of error messages ordered by event time:
017-07-22 02:47:44,535 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Source -> 
Sink: Sink (67/176) (0442d63c89809ad86f38874c845ba83f) switched from RUNNING to 
FAILED.
java.lang.Exception: TaskManager was lost/killed: ResourceID
{resourceId='f519795dfabcecfd7863ed587efdb398'}
@ titus-123072-worker-3-39 (dataPort=46879)
at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
at 
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
at 
org.apache.flink.runtime.instance.InstanceManager.unregisterAllTaskManagers(InstanceManager.java:234)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:330)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


2017-07-22 02:47:44,621 WARN com.netflix.spaas.runtime.FlinkJobManager - 
Discard message 
LeaderSessionMessage(7a247ad9-531b-4f27-877b-df41f9019431,Disconnect(0b300c04592b19750678259cd09fea95,java.lang.Exception:
 TaskManager akka://flink/user/taskmanager is disassociating)) because the 
expected leader session ID None did not equal the received leader session ID 
Some(7a247ad9-531b-4f27-877b-df41f9019431).
Permalink Edit Delete 
zxu Zhenzhong Xu added a comment - 07/26/2017 09:24 PM
2017-07-22 02:47:45,015 WARN 
netflix.spaas.shaded.org.apache.zookeeper.ClientCnxn - Session 
0x2579bebfd265054 for server 100.83.64.121/100.83.64.121:2181, unexpected 
error, closing socket connection and attempting reconnect
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at 
netflix.spaas.shaded.org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68)
at 
netflix.spaas.shaded.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
at 
netflix.spaas.shaded.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
Permalink Edit Delete 
zxu Zhenzhong Xu added a comment - 07/26/2017 09:25 PM
2017-07-22 02:47:44,557 ERROR org.apache.kafka.clients.producer.KafkaProducer - 
Interrupted while joining ioThread
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1260)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:320)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:431)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:332)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:666)
at java.lang.Thread.run(Thread.java:748)


2017-07-22 02:47:44,663 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal of 
stream operator.
org.apache.kafka.common.KafkaException: Failed to close kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:732)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:320)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:431)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:332)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:666)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1260)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)
... 9 more


2017-07-22 02:47:45,079 WARN 
netflix.spaas.shaded.org.apache.zookeeper.ClientCnxn - Session 
0x35841491f044692 for server null, unexpected error, closing socket connection 
and attempting reconnect
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at 
netflix.spaas.shaded.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
at 
netflix.spaas.shaded.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)


2017-07-22 02:47:59,521 ERROR 
org.apache.flink.shaded.org.apache.curator.ConnectionState - Connection timed 
out for connection string 
(100.83.64.121:2181,100.83.104.81:2181,100.83.135.236:2181,100.83.146.196:2181,100.83.17.206:2181)
 and timeout (15000) / elapsed (15002)
org.apache.flink.shaded.org.apache.curator.CuratorConnectionLossException: 
KeeperErrorCode = ConnectionLoss
at 
org.apache.flink.shaded.org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:197)
at 
org.apache.flink.shaded.org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:87)
at 
org.apache.flink.shaded.org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:115)
at 
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:806)
at 
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:792)
at 
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:62)
at 
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:257)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)


2017-07-22 02:48:24,523 ERROR 
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl 
- Background operation retry gave up
netflix.spaas.shaded.org.apache.zookeeper.KeeperException$ConnectionLossException:
 KeeperErrorCode = ConnectionLoss
at 
netflix.spaas.shaded.org.apache.zookeeper.KeeperException.create(KeeperException.java:99)
at 
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:708)
at 
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:826)
at 
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:792)
at 
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:62)
at 
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:257)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

2017-07-22 02:49:34,592 ERROR 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager 
- Resource manager could not register at JobManager
akka.pattern.AskTimeoutException: Ask timed out on 
[ActorSelection[Anchor(akka://flink/), Path(/user/jobmanager)]] after [10000 ms]
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
at 
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
at java.lang.Thread.run(Thread.java:748)




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to