[ https://issues.apache.org/jira/browse/FLINK-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15715048#comment-15715048 ]
刘喆 commented on FLINK-4632: --------------------------- Thank you Ufuk Celebi. It depends on the strategy of restarting. Here I use auto restarting stategy, after restarting it works well for a while, and then restarts again. For it's running on a busy YARN cluster, the network may sometime under stress. It seams than Flink is very weak on this YARN cluster. At the same time, some Spark streaming job on my cluster work well. > when yarn nodemanager lost, flink hung > --------------------------------------- > > Key: FLINK-4632 > URL: https://issues.apache.org/jira/browse/FLINK-4632 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, Streaming > Affects Versions: 1.2.0, 1.1.2 > Environment: cdh5.5.1 jdk1.7 flink1.1.2 1.2-snapshot kafka0.8.2 > Reporter: 刘喆 > > When run flink streaming on yarn, using kafka as source, it runs well when > start. But after long run, for example 8 hours, dealing 60,000,000+ > messages, it hung: no messages consumed, one taskmanager is CANCELING, the > exception show: > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: > connection timeout > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:152) > at > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275) > at > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253) > at > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131) > at > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275) > at > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253) > at > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131) > at > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275) > at > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253) > at > io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:79) > at > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275) > at > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253) > at > io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:835) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:87) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:162) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: 连接超时 > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > at sun.nio.ch.IOUtil.read(IOUtil.java:192) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) > at > io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) > at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) > at > io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:241) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) > ... 6 more > after apply https://issues.apache.org/jira/browse/FLINK-4625 > it show: > java.lang.Exception: TaskManager was lost/killed: > ResourceID{resourceId='container_1471620986643_744852_01_001400'} @ > 38.slave.adh (dataPort=45349) > at > org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:162) > at > org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533) > at > org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:138) > at > org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167) > at > org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:224) > at > org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1054) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:458) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:100) > at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- This message was sent by Atlassian JIRA (v6.3.4#6332)