Hello all,

We run jobs on a standalone cluster with Flink 1.3.2 and we're facing
a problem. Suddenly a connection between a taskmanager and the
jobmanager is timed out and the taskmanager is "quarantined" by
jobmanager.
Once a taskmanager is quarantined, of course jobs are restarted, but
the timeout and quarantine happens to some taskmanager successively.

When a taskmanager's connection to jobmanager was timed out, its
connections to zookeeper and snapshot HDFS were also timed out. So the
problem doesn't seem to be one of Flink itself.
But though a taskmanager which runs on the same machine as jobmanager
is timed out, jobmanager is alright at the time. So I think it is not
OS problem too.

Could you give us some advice on how to investigate? Thank you.



Taskmanager command line:

java -XX:+UseG1GC -Xms219136M -Xmx219136M
-XX:MaxDirectMemorySize=8388607T
-Dlog.file=/var/log/flink/flink-log-manager-taskmanager-0-flink-jp-2.log
-Dlog4j.configuration=file:/opt/flink/flink-1.3.2/conf/log4j.properties
-Dlogback.configurationFile=file:/opt/flink/flink-1.3.2/conf/logback.xml
-classpath 
/opt/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/opt/flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/opt/flink/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar:::
org.apache.flink.runtime.taskmanager.TaskManager --configDir
/opt/flink/flink-1.3.2/conf


Taskmanager (on flink-jp-2) log:

2017-11-22 14:09:31,595 INFO
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend     - Heap
backend snapshot (File Stream Factory @
hdfs://nameservice1/user/log-manager/flink/checkpoints-data/9469db324b834e9dcf5b46428b3ae011,
synchronous part) in thread
Thread[TriggerWindow(TumblingProcessingTimeWindows(60000),
ReducingStateDescriptor{serializer=jp.geniee.reporter.executable.BuyerReporterV2Auction$$anon$12$$anon$7@d2619591,
reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@72bca894},
ProcessingTimeTrigger(),
WindowedStream.reduce(WindowedStream.java:300)) -> Map -> Map
(9/30),5,Flink Task Threads] took 142 ms.
2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
                     - DFSOutputStream ResponseProcessor exception
for block BP-390359345-10.5.0.29-1476670682927:blk_1194079870_620518999
java.io.EOFException: Premature EOF: no length prefix available
        at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
        at 
org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
                     - DFSOutputStream ResponseProcessor exception
for block BP-390359345-10.5.0.29-1476670682927:blk_1194080159_621053744
java.io.EOFException: Premature EOF: no length prefix available
        at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
        at 
org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
                     - DFSOutputStream ResponseProcessor exception
for block BP-390359345-10.5.0.29-1476670682927:blk_1194080160_620520092
java.io.EOFException: Premature EOF: no length prefix available
        at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
        at 
org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
                     - DFSOutputStream ResponseProcessor exception
for block BP-390359345-10.5.0.29-1476670682927:blk_1194079071_620517393
java.io.EOFException: Premature EOF: no length prefix available
        at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
        at 
org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
        at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
2017-11-22 14:12:10,041 WARN  org.apache.hadoop.hdfs.DFSClient
                     - Error Recovery for block
BP-390359345-10.5.0.29-1476670682927:blk_1194079071_620517393 in
pipeline 10.5.0.61:50010, 10.5.0.59:50010, 10.5.0.74:50010: bad
datanode 10.5.0.61:50010
2017-11-22 14:12:10,039 WARN  org.apache.hadoop.hdfs.DFSClient
                     - Error Recovery for block
BP-390359345-10.5.0.29-1476670682927:blk_1194080160_620520092 in
pipeline 10.5.0.59:50010, 10.5.0.52:50010, 10.5.0.63:50010: bad
datanode 10.5.0.59:50010
2017-11-22 14:12:10,038 WARN  org.apache.hadoop.hdfs.DFSClient
                     - Error Recovery for block
BP-390359345-10.5.0.29-1476670682927:blk_1194080159_621053744 in
pipeline 10.5.0.52:50010, 10.5.0.78:50010: bad datanode
10.5.0.52:50010
2017-11-22 14:12:10,029 INFO  org.apache.zookeeper.ClientCnxn
                     - Client session timed out, have not heard from
server in 73797ms for sessionid 0x35f5cb4184700a4, closing socket
connection and attempting reconnect
2017-11-22 14:12:10,057 WARN  org.apache.hadoop.hdfs.DFSClient
                     - Error Recovery for block
BP-390359345-10.5.0.29-1476670682927:blk_1194079870_620518999 in
pipeline 10.5.0.69:50010, 10.5.0.59:50010, 10.5.0.74:50010: bad
datanode 10.5.0.69:50010
2017-11-22 14:12:10,113 WARN  akka.remote.RemoteWatcher
                     - Detected unreachable:
[akka.tcp://flink@flink-jp-2:43139]
2017-11-22 14:12:10,142 INFO
org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionStateManager
 - State change: SUSPENDED
2017-11-22 14:12:10,142 WARN
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
 - Connection to ZooKeeper suspended. Can no longer retrieve the
leader from ZooKeeper.
2017-11-22 14:12:10,157 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -
TaskManager akka://flink/user/taskmanager disconnects from JobManager
akka.tcp://flink@flink-jp-2:43139/user/jobmanager: JobManager is no
longer reachable
2017-11-22 14:12:10,158 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -
Cancelling all computations and discarding all cached data.



Jobmanager command line:

java -Xms8192m -Xmx8192m
-Dlog.file=/var/log/flink/flink-log-manager-jobmanager-0-flink-jp-2.log
-Dlog4j.configuration=file:/opt/flink/flink-1.3.2/conf/log4j.properties
-Dlogback.configurationFile=file:/opt/flink/flink-1.3.2/conf/logback.xml
-classpath 
/opt/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/opt/flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/opt/flink/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar:::
org.apache.flink.runtime.jobmanager.JobManager --configDir
/opt/flink/flink-1.3.2/conf --executionMode cluster --host flink-jp-2
--webui-port 8081


Jobmanager (on flink-jp-2) log:

2017-11-22 14:09:32,252 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Completed checkpoint 293 (125180549 bytes in 889
 ms).
2017-11-22 14:12:02,705 WARN  akka.remote.RemoteWatcher
                     - Detected unreachable:
[akka.tcp://flink@flink-jp-2:42609]
2017-11-22 14:12:02,705 INFO
org.apache.flink.runtime.jobmanager.JobManager                - Task
manager akka.tcp://flink@flink-jp-2:42609/user/taskmanager terminated.
2017-11-22 14:12:02,705 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        -
Source: lamp-auction-test -> Flat Map -> Map -> Sink:
2017-11-22-auc-log (30/30) (a853390bb17f6d58997ad994266d3df2) switched
from RUNNING to FAILED.
java.lang.Exception: TaskManager was lost/killed:
d51c4d252a8c1ff222b728ca50dbe55a @ flink-jp-2 (dataPort=37930)
        at 
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
        at 
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
        at 
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
        at 
org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
        at 
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
        at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1131)
        at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
        at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
        at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at 
akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:44)
        at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
        at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
        at akka.actor.ActorCell.invoke(ActorCell.scala:486)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



Best,
Tetsuya

Reply via email to