This sounds like a memory issue. Do you enable the GC output? When this is happening, are your executors doing full gc? How long is the full gc? Yong
From: qhz...@apache.org Date: Wed, 16 Sep 2015 13:52:25 +0000 Subject: Re: application failed on large dataset To: java8...@hotmail.com; user@spark.apache.org Hi, I have switch 'spark.shuffle.blockTransferService' to 'nio'. But the problem still exists. However the stack trace is a little bit different:PART one:15/09/16 06:20:32 ERROR executor.Executor: Exception in task 1.2 in stage 15.0 (TID 5341)java.io.IOException: Failed without being ACK'd at org.apache.spark.network.nio.ConnectionManager$MessageStatus.failWithoutAck(ConnectionManager.scala:72) at org.apache.spark.network.nio.ConnectionManager$$anonfun$removeConnection$3.apply(ConnectionManager.scala:533) at org.apache.spark.network.nio.ConnectionManager$$anonfun$removeConnection$3.apply(ConnectionManager.scala:531) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.network.nio.ConnectionManager.removeConnection(ConnectionManager.scala:531) at org.apache.spark.network.nio.ConnectionManager$$anonfun$addListeners$3.apply(ConnectionManager.scala:510) at org.apache.spark.network.nio.ConnectionManager$$anonfun$addListeners$3.apply(ConnectionManager.scala:510) at org.apache.spark.network.nio.Connection.callOnCloseCallback(Connection.scala:162) at org.apache.spark.network.nio.Connection.close(Connection.scala:130) at org.apache.spark.network.nio.ConnectionManager$$anonfun$stop$1.apply(ConnectionManager.scala:1000) at org.apache.spark.network.nio.ConnectionManager$$anonfun$stop$1.apply(ConnectionManager.scala:1000) at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107) at scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:107) at org.apache.spark.network.nio.ConnectionManager.stop(ConnectionManager.scala:1000) at org.apache.spark.network.nio.NioBlockTransferService.close(NioBlockTransferService.scala:78) at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1228) at org.apache.spark.SparkEnv.stop(SparkEnv.scala:100) at org.apache.spark.executor.Executor.stop(Executor.scala:144) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:113) at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:177) at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:126) at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:197) at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:125) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:92) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) PART two:15/09/16 06:14:36 INFO nio.ConnectionManager: Removing SendingConnection to ConnectionManagerId()15/09/16 06:14:36 INFO nio.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId()15/09/16 06:14:36 ERROR nio.ConnectionManager: Corresponding SendingConnection to ConnectionManagerId() not found15/09/16 06:14:36 INFO nio.ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@3011c7c915/09/16 06:14:36 INFO nio.ConnectionManager: key already cancelled ? sun.nio.ch.selectionkeyi...@3011c7c9java.nio.channels.CancelledKeyException at org.apache.spark.network.nio.ConnectionManager.run(ConnectionManager.scala:461) at org.apache.spark.network.nio.ConnectionManager$$anon$7.run(ConnectionManager.scala:193) java8964 <java8...@hotmail.com>于2015年9月16日周三 下午8:17写道: Can you try for "nio", instead of "netty". set "spark.shuffle.blockTransferService", to "nio" and give it a try. Yong From: z.qian...@gmail.com Date: Wed, 16 Sep 2015 03:21:02 +0000 Subject: Re: application failed on large dataset To: java8...@hotmail.com; user@spark.apache.org Hi, after check with the yarn logs, all the error stack looks like below: 15/09/15 19:58:23 ERROR shuffle.OneForOneBlockFetcher: Failed while starting block fetchesjava.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at java.lang.Thread.run(Thread.java:745) It seems that some error occurs when try to fetch the block, and after several retries, the executor just dies with such error. And for your question, I did not see any executor restart during the job. PS: the operator I am using during that stage if rdd.glom().mapPartitions() java8964 <java8...@hotmail.com>于2015年9月15日周二 下午11:44写道: When you saw this error, does any executor die due to whatever error? Do you check to see if any executor restarts during your job? It is hard to help you just with the stack trace. You need to tell us the whole picture when your jobs are running. Yong From: qhz...@apache.org Date: Tue, 15 Sep 2015 15:02:28 +0000 Subject: Re: application failed on large dataset To: user@spark.apache.org has anyone met the same problems? 周千昊 <qhz...@apache.org>于2015年9月14日周一 下午9:07写道: Hi, community I am facing a strange problem: all executors does not respond, and then all of them failed with the ExecutorLostFailure. when I look into yarn logs, there are full of such exception 15/09/14 04:35:33 ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks (after 3 retries)java.io.IOException: Failed to connect to host/ip:port at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43) at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)Caused by: java.net.ConnectException: Connection refused: host/ip:port at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) ... 1 more The strange thing is that, if I reduce the input size, the problems just disappeared. I have found a similar issue in the mail-archive(http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3CCAOHP_tHRtuxDfWF0qmYDauPDhZ1=MAm5thdTfgAhXDN=7kq...@mail.gmail.com%3E), however I didn't see the solution. So I am wondering if anyone could help with that? My env is: hdp 2.2.6 spark(1.4.1) mode: yarn-client spark-conf: spark.driver.extraJavaOptions -Dhdp.version=2.2.6.0-2800 spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.6.0-2800 spark.executor.memory 6g spark.storage.memoryFraction 0.3 spark.dynamicAllocation.enabled true spark.shuffle.service.enabled true -- Best RegardZhouQianhao