Hello all, We run jobs on a standalone cluster with Flink 1.3.2 and we're facing a problem. Suddenly a connection between a taskmanager and the jobmanager is timed out and the taskmanager is "quarantined" by jobmanager. Once a taskmanager is quarantined, of course jobs are restarted, but the timeout and quarantine happens to some taskmanager successively.
When a taskmanager's connection to jobmanager was timed out, its connections to zookeeper and snapshot HDFS were also timed out. So the problem doesn't seem to be one of Flink itself. But though a taskmanager which runs on the same machine as jobmanager is timed out, jobmanager is alright at the time. So I think it is not OS problem too. Could you give us some advice on how to investigate? Thank you. Taskmanager command line: java -XX:+UseG1GC -Xms219136M -Xmx219136M -XX:MaxDirectMemorySize=8388607T -Dlog.file=/var/log/flink/flink-log-manager-taskmanager-0-flink-jp-2.log -Dlog4j.configuration=file:/opt/flink/flink-1.3.2/conf/log4j.properties -Dlogback.configurationFile=file:/opt/flink/flink-1.3.2/conf/logback.xml -classpath /opt/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/opt/flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/opt/flink/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar::: org.apache.flink.runtime.taskmanager.TaskManager --configDir /opt/flink/flink-1.3.2/conf Taskmanager (on flink-jp-2) log: 2017-11-22 14:09:31,595 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Heap backend snapshot (File Stream Factory @ hdfs://nameservice1/user/log-manager/flink/checkpoints-data/9469db324b834e9dcf5b46428b3ae011, synchronous part) in thread Thread[TriggerWindow(TumblingProcessingTimeWindows(60000), ReducingStateDescriptor{serializer=jp.geniee.reporter.executable.BuyerReporterV2Auction$$anon$12$$anon$7@d2619591, reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@72bca894}, ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map -> Map (9/30),5,Flink Task Threads] took 142 ms. 2017-11-22 14:12:10,028 WARN org.apache.hadoop.hdfs.DFSClient - DFSOutputStream ResponseProcessor exception for block BP-390359345-10.5.0.29-1476670682927:blk_1194079870_620518999 java.io.EOFException: Premature EOF: no length prefix available at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207) at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867) 2017-11-22 14:12:10,028 WARN org.apache.hadoop.hdfs.DFSClient - DFSOutputStream ResponseProcessor exception for block BP-390359345-10.5.0.29-1476670682927:blk_1194080159_621053744 java.io.EOFException: Premature EOF: no length prefix available at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207) at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867) 2017-11-22 14:12:10,028 WARN org.apache.hadoop.hdfs.DFSClient - DFSOutputStream ResponseProcessor exception for block BP-390359345-10.5.0.29-1476670682927:blk_1194080160_620520092 java.io.EOFException: Premature EOF: no length prefix available at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207) at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867) 2017-11-22 14:12:10,028 WARN org.apache.hadoop.hdfs.DFSClient - DFSOutputStream ResponseProcessor exception for block BP-390359345-10.5.0.29-1476670682927:blk_1194079071_620517393 java.io.EOFException: Premature EOF: no length prefix available at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207) at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867) 2017-11-22 14:12:10,041 WARN org.apache.hadoop.hdfs.DFSClient - Error Recovery for block BP-390359345-10.5.0.29-1476670682927:blk_1194079071_620517393 in pipeline 10.5.0.61:50010, 10.5.0.59:50010, 10.5.0.74:50010: bad datanode 10.5.0.61:50010 2017-11-22 14:12:10,039 WARN org.apache.hadoop.hdfs.DFSClient - Error Recovery for block BP-390359345-10.5.0.29-1476670682927:blk_1194080160_620520092 in pipeline 10.5.0.59:50010, 10.5.0.52:50010, 10.5.0.63:50010: bad datanode 10.5.0.59:50010 2017-11-22 14:12:10,038 WARN org.apache.hadoop.hdfs.DFSClient - Error Recovery for block BP-390359345-10.5.0.29-1476670682927:blk_1194080159_621053744 in pipeline 10.5.0.52:50010, 10.5.0.78:50010: bad datanode 10.5.0.52:50010 2017-11-22 14:12:10,029 INFO org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 73797ms for sessionid 0x35f5cb4184700a4, closing socket connection and attempting reconnect 2017-11-22 14:12:10,057 WARN org.apache.hadoop.hdfs.DFSClient - Error Recovery for block BP-390359345-10.5.0.29-1476670682927:blk_1194079870_620518999 in pipeline 10.5.0.69:50010, 10.5.0.59:50010, 10.5.0.74:50010: bad datanode 10.5.0.69:50010 2017-11-22 14:12:10,113 WARN akka.remote.RemoteWatcher - Detected unreachable: [akka.tcp://flink@flink-jp-2:43139] 2017-11-22 14:12:10,142 INFO org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionStateManager - State change: SUSPENDED 2017-11-22 14:12:10,142 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper. 2017-11-22 14:12:10,157 INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager akka://flink/user/taskmanager disconnects from JobManager akka.tcp://flink@flink-jp-2:43139/user/jobmanager: JobManager is no longer reachable 2017-11-22 14:12:10,158 INFO org.apache.flink.runtime.taskmanager.TaskManager - Cancelling all computations and discarding all cached data. Jobmanager command line: java -Xms8192m -Xmx8192m -Dlog.file=/var/log/flink/flink-log-manager-jobmanager-0-flink-jp-2.log -Dlog4j.configuration=file:/opt/flink/flink-1.3.2/conf/log4j.properties -Dlogback.configurationFile=file:/opt/flink/flink-1.3.2/conf/logback.xml -classpath /opt/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/opt/flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/opt/flink/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar::: org.apache.flink.runtime.jobmanager.JobManager --configDir /opt/flink/flink-1.3.2/conf --executionMode cluster --host flink-jp-2 --webui-port 8081 Jobmanager (on flink-jp-2) log: 2017-11-22 14:09:32,252 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 293 (125180549 bytes in 889 ms). 2017-11-22 14:12:02,705 WARN akka.remote.RemoteWatcher - Detected unreachable: [akka.tcp://flink@flink-jp-2:42609] 2017-11-22 14:12:02,705 INFO org.apache.flink.runtime.jobmanager.JobManager - Task manager akka.tcp://flink@flink-jp-2:42609/user/taskmanager terminated. 2017-11-22 14:12:02,705 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: lamp-auction-test -> Flat Map -> Map -> Sink: 2017-11-22-auc-log (30/30) (a853390bb17f6d58997ad994266d3df2) switched from RUNNING to FAILED. java.lang.Exception: TaskManager was lost/killed: d51c4d252a8c1ff222b728ca50dbe55a @ flink-jp-2 (dataPort=37930) at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217) at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533) at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192) at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167) at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212) at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1131) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:44) at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369) at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501) at akka.actor.ActorCell.invoke(ActorCell.scala:486) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Best, Tetsuya