Re: application failed on large dataset

2015-09-18 Thread
Hi,
 The issue turn outs to be a memory issue. Thanks for the guidance.

周千昊 <qhz...@apache.org>于2015年9月17日周四 下午12:39写道:

> indeed, the operation in this stage is quite memory consuming.
> We are trying to enable the printGCDetail option and see what is going on.
>
> java8964 <java8...@hotmail.com>于2015年9月16日周三 下午11:47写道:
>
>> This sounds like a memory issue.
>>
>> Do you enable the GC output? When this is happening, are your executors
>> doing full gc? How long is the full gc?
>>
>> Yong
>>
>> --
>> From: qhz...@apache.org
>> Date: Wed, 16 Sep 2015 13:52:25 +
>>
>> Subject: Re: application failed on large dataset
>> To: java8...@hotmail.com; user@spark.apache.org
>>
>> Hi,
>>  I have switch 'spark.shuffle.blockTransferService' to 'nio'. But the
>> problem still exists. However the stack trace is a little bit different:
>> PART one:
>> 15/09/16 06:20:32 ERROR executor.Executor: Exception in task 1.2 in stage
>> 15.0 (TID 5341)
>> java.io.IOException: Failed without being ACK'd
>> at
>> org.apache.spark.network.nio.ConnectionManager$MessageStatus.failWithoutAck(ConnectionManager.scala:72)
>> at
>> org.apache.spark.network.nio.ConnectionManager$$anonfun$removeConnection$3.apply(ConnectionManager.scala:533)
>> at
>> org.apache.spark.network.nio.ConnectionManager$$anonfun$removeConnection$3.apply(ConnectionManager.scala:531)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at
>> org.apache.spark.network.nio.ConnectionManager.removeConnection(ConnectionManager.scala:531)
>> at
>> org.apache.spark.network.nio.ConnectionManager$$anonfun$addListeners$3.apply(ConnectionManager.scala:510)
>> at
>> org.apache.spark.network.nio.ConnectionManager$$anonfun$addListeners$3.apply(ConnectionManager.scala:510)
>> at
>> org.apache.spark.network.nio.Connection.callOnCloseCallback(Connection.scala:162)
>> at
>> org.apache.spark.network.nio.Connection.close(Connection.scala:130)
>> at
>> org.apache.spark.network.nio.ConnectionManager$$anonfun$stop$1.apply(ConnectionManager.scala:1000)
>> at
>> org.apache.spark.network.nio.ConnectionManager$$anonfun$stop$1.apply(ConnectionManager.scala:1000)
>> at
>> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
>> at
>> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
>> at
>> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>> at
>> scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:107)
>> at
>> org.apache.spark.network.nio.ConnectionManager.stop(ConnectionManager.scala:1000)
>> at
>> org.apache.spark.network.nio.NioBlockTransferService.close(NioBlockTransferService.scala:78)
>> at
>> org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1228)
>> at org.apache.spark.SparkEnv.stop(SparkEnv.scala:100)
>> at org.apache.spark.executor.Executor.stop(Executor.scala:144)
>> at
>> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:113)
>> at org.apache.spark.rpc.akka.AkkaRpcEnv.org
>> $apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:177)
>> at
>> org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:126)
>> at org.apache.spark.rpc.akka.AkkaRpcEnv.org
>> $apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:197)
>> at
>> org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:125)
>> at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>> at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>> at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>> at
>> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
>> at
>> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
>> at
>> scala.PartialFunction$class.applyOrElse(PartialFunction

Re: application failed on large dataset

2015-09-16 Thread
 nio.ConnectionManager: key already cancelled ?
sun.nio.ch.SelectionKeyImpl@3011c7c9
java.nio.channels.CancelledKeyException
at
org.apache.spark.network.nio.ConnectionManager.run(ConnectionManager.scala:461)
at
org.apache.spark.network.nio.ConnectionManager$$anon$7.run(ConnectionManager.scala:193)

java8964 <java8...@hotmail.com>于2015年9月16日周三 下午8:17写道:

> Can you try for "nio", instead of "netty".
>
> set "spark.shuffle.blockTransferService", to "nio" and give it a try.
>
> Yong
>
> --
> From: z.qian...@gmail.com
> Date: Wed, 16 Sep 2015 03:21:02 +
>
> Subject: Re: application failed on large dataset
> To: java8...@hotmail.com; user@spark.apache.org
>
>
> Hi,
>   after check with the yarn logs, all the error stack looks like below:
>
> 15/09/15 19:58:23 ERROR shuffle.OneForOneBlockFetcher: Failed while
> starting block fetches
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> at
> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
> at
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
> at
> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at java.lang.Thread.run(Thread.java:745)
>
> It seems that some error occurs when try to fetch the block, and
> after several retries, the executor just dies with such error.
> And for your question, I did not see any executor restart during
> the job.
> PS: the operator I am using during that stage if
> rdd.glom().mapPartitions()
>
>
> java8964 <java8...@hotmail.com>于2015年9月15日周二 下午11:44写道:
>
> When you saw this error, does any executor die due to whatever error?
>
> Do you check to see if any executor restarts during your job?
>
> It is hard to help you just with the stack trace. You need to tell us the
> whole picture when your jobs are running.
>
> Yong
>
> --
> From: qhz...@apache.org
> Date: Tue, 15 Sep 2015 15:02:28 +
> Subject: Re: application failed on large dataset
> To: user@spark.apache.org
>
>
> has anyone met the same problems?
> 周千昊 <qhz...@apache.org>于2015年9月14日周一 下午9:07写道:
>
> Hi, community
>   I am facing a strange problem:
>   all executors does not respond, and then all of them failed with the
> ExecutorLostFailure.
>   when I look into yarn logs, there are full of such exception
>
> 15/09/14 04:35:33 ERROR shuffle.RetryingBlockFetcher: Exception while
> beginning fetch of 1 outstanding blocks (after 3 retries)
> java.io.IOException: Failed to connect to host/ip:port
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
> at
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.n

Re: application failed on large dataset

2015-09-16 Thread
ch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> PART two:
> 15/09/16 06:14:36 INFO nio.ConnectionManager: Removing SendingConnection
> to ConnectionManagerId()
> 15/09/16 06:14:36 INFO nio.ConnectionManager: Removing ReceivingConnection
> to ConnectionManagerId()
> 15/09/16 06:14:36 ERROR nio.ConnectionManager: Corresponding
> SendingConnection to ConnectionManagerId() not found
> 15/09/16 06:14:36 INFO nio.ConnectionManager: Key not valid ?
> sun.nio.ch.SelectionKeyImpl@3011c7c9
> 15/09/16 06:14:36 INFO nio.ConnectionManager: key already cancelled ?
> sun.nio.ch.SelectionKeyImpl@3011c7c9
> java.nio.channels.CancelledKeyException
> at
> org.apache.spark.network.nio.ConnectionManager.run(ConnectionManager.scala:461)
> at
> org.apache.spark.network.nio.ConnectionManager$$anon$7.run(ConnectionManager.scala:193)
>
> java8964 <java8...@hotmail.com>于2015年9月16日周三 下午8:17写道:
>
> Can you try for "nio", instead of "netty".
>
> set "spark.shuffle.blockTransferService", to "nio" and give it a try.
>
> Yong
>
> --
> From: z.qian...@gmail.com
> Date: Wed, 16 Sep 2015 03:21:02 +
>
> Subject: Re: application failed on large dataset
> To: java8...@hotmail.com; user@spark.apache.org
>
>
> Hi,
>   after check with the yarn logs, all the error stack looks like below:
>
> 15/09/15 19:58:23 ERROR shuffle.OneForOneBlockFetcher: Failed while
> starting block fetches
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> at
> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
> at
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
> at
> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at java.lang.Thread.run(Thread.java:745)
>
> It seems that some error occurs when try to fetch the block, and
> after several retries, the executor just dies with such error.
> And for your question, I did not see any executor restart during
> the job.
> PS: the operator I am using during that stage if
> rdd.glom().mapPartitions()
>
>
> java8964 <java8...@hotmail.com>于2015年9月15日周二 下午11:44写道:
>
> When you saw this error, does any executor die due to whatever error?
>
> Do you check to see if any executor restarts during your job?
>
> It is hard to help you just with the stack trace. You need to tell us the
> whole picture when your jobs are running.
>
> Yong
>
> --
> From: qhz...@apache.org
> Date: Tue, 15 Sep 2015 15:02:28 +
> Subject: Re: application failed on large dataset
> To: user@spark.apache.org
>
>
> has anyone met the same problems?
> 周千昊 <qhz...@apache.org>于2015年9月14日周一 下午9:07写道:
>
> Hi, community
>   I am facing a strange problem:
>   all executors does not respond, and then all of them failed with the
> ExecutorLostFailure.
>   when I look into yarn logs, there are full of such exception
>
> 15/09/14 04:35:33 ERROR shuffle.RetryingBlockFetcher: Exception while
> beginning fetch of 1 outstanding blocks (after 3 retries)
> java.io.IOException: Failed to connect to host/ip:port
> at
> org.ap

Re: application failed on large dataset

2015-09-15 Thread
Hi,
  after check with the yarn logs, all the error stack looks like below:

15/09/15 19:58:23 ERROR shuffle.OneForOneBlockFetcher: Failed while
starting block fetches
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at
io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
at
io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at
io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)

It seems that some error occurs when try to fetch the block, and
after several retries, the executor just dies with such error.
And for your question, I did not see any executor restart during
the job.
PS: the operator I am using during that stage if
rdd.glom().mapPartitions()


java8964 <java8...@hotmail.com>于2015年9月15日周二 下午11:44写道:

> When you saw this error, does any executor die due to whatever error?
>
> Do you check to see if any executor restarts during your job?
>
> It is hard to help you just with the stack trace. You need to tell us the
> whole picture when your jobs are running.
>
> Yong
>
> --
> From: qhz...@apache.org
> Date: Tue, 15 Sep 2015 15:02:28 +
> Subject: Re: application failed on large dataset
> To: user@spark.apache.org
>
>
> has anyone met the same problems?
> 周千昊 <qhz...@apache.org>于2015年9月14日周一 下午9:07写道:
>
> Hi, community
>   I am facing a strange problem:
>   all executors does not respond, and then all of them failed with the
> ExecutorLostFailure.
>   when I look into yarn logs, there are full of such exception
>
> 15/09/14 04:35:33 ERROR shuffle.RetryingBlockFetcher: Exception while
> beginning fetch of 1 outstanding blocks (after 3 retries)
> java.io.IOException: Failed to connect to host/ip:port
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
> at
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.net.ConnectException: Connection refused: host/ip:port
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
> at
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
> at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
> ... 1 more
>
>

Re: application failed on large dataset

2015-09-15 Thread
has anyone met the same problems?
周千昊 <qhz...@apache.org>于2015年9月14日周一 下午9:07写道:

> Hi, community
>   I am facing a strange problem:
>   all executors does not respond, and then all of them failed with the
> ExecutorLostFailure.
>   when I look into yarn logs, there are full of such exception
>
> 15/09/14 04:35:33 ERROR shuffle.RetryingBlockFetcher: Exception while
> beginning fetch of 1 outstanding blocks (after 3 retries)
> java.io.IOException: Failed to connect to host/ip:port
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
> at
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.net.ConnectException: Connection refused: host/ip:port
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
> at
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
> at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
> ... 1 more
>
>
>   The strange thing is that, if I reduce the input size, the problems
> just disappeared. I have found a similar issue in the mail-archive(
> http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3CCAOHP_tHRtuxDfWF0qmYDauPDhZ1=MAm5thdTfgAhXDN=7kq...@mail.gmail.com%3E),
> however I didn't see the solution. So I am wondering if anyone could help
> with that?
>
>   My env is:
>   hdp 2.2.6
>   spark(1.4.1)
>   mode: yarn-client
>   spark-conf:
>   spark.driver.extraJavaOptions -Dhdp.version=2.2.6.0-2800
>   spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.6.0-2800
>   spark.executor.memory 6g
>   spark.storage.memoryFraction 0.3
>   spark.dynamicAllocation.enabled true
>   spark.shuffle.service.enabled true
>
>


application failed on large dataset

2015-09-14 Thread
Hi, community
  I am facing a strange problem:
  all executors does not respond, and then all of them failed with the
ExecutorLostFailure.
  when I look into yarn logs, there are full of such exception

15/09/14 04:35:33 ERROR shuffle.RetryingBlockFetcher: Exception while
beginning fetch of 1 outstanding blocks (after 3 retries)
java.io.IOException: Failed to connect to host/ip:port
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: host/ip:port
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
... 1 more


  The strange thing is that, if I reduce the input size, the problems
just disappeared. I have found a similar issue in the mail-archive(
http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3CCAOHP_tHRtuxDfWF0qmYDauPDhZ1=MAm5thdTfgAhXDN=7kq...@mail.gmail.com%3E),
however I didn't see the solution. So I am wondering if anyone could help
with that?

  My env is:
  hdp 2.2.6
  spark(1.4.1)
  mode: yarn-client
  spark-conf:
  spark.driver.extraJavaOptions -Dhdp.version=2.2.6.0-2800
  spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.6.0-2800
  spark.executor.memory 6g
  spark.storage.memoryFraction 0.3
  spark.dynamicAllocation.enabled true
  spark.shuffle.service.enabled true


Re: about mr-style merge sort

2015-09-10 Thread
Hi, all
 Can anyone give some tips about this issue?

周千昊 <qhz...@apache.org>于2015年9月8日周二 下午4:46写道:

> Hi, community
>  I have an application which I try to migrate from MR to Spark.
>  It will do some calculations from Hive and output to hfile which will
> be bulk load to HBase Table, details as follow:
>
>  Rdd input = getSourceInputFromHive()
>  Rdd<Tuple2<byte[], byte[]>> mapSideResult =
> input.glom().mapPartitions(/*some calculation*/)
>  // PS: the result in each partition has already been sorted according
> to the lexicographical order during the calculation
>  mapSideResult.reduceByKey(/*some
> aggregations*/).sortByKey(/**/).map(/*transform Tuple2<byte[], byte[]> to
> Tuple2<ImmutableBytesWritable, KeyValue>*/).saveAsNewAPIHadoopFile(/*write
> to hfile*/)
>
>   *Here is the problem, as in MR, in the reducer side, the mapper
> output has already been sorted, so that it is a merge sort which makes
> writing to hfile is sequential and fast.*
> *  However in Spark, the output of reduceByKey phase has been
> shuffled, so I have to sort the rdd in order to write hfile which makes it
> slower 2x running on Spark than on MR.*
> *  I am wondering that, if there is anything I can leverage has the
> same effect as MR. I happen to see a JIRA
> ticket https://issues.apache.org/jira/browse/SPARK-2926
> <https://issues.apache.org/jira/browse/SPARK-2926>. Is it related to what I
> am looking for?*
>
-- 
Best Regard
ZhouQianhao


Re: about mr-style merge sort

2015-09-10 Thread
Hi, Shao & Pendey
  Thanks for tips. I will try to workaround this.

Saisai Shao <sai.sai.s...@gmail.com>于2015年9月11日周五 下午1:23写道:

> Hi Qianhao,
>
> I think you could sort the data by yourself if you want achieve the same
> result as MR, like rdd.reduceByKey(...).mapPartitions(// sort within each
> partition).  Do not call sortByKey again since it will introduce another
> shuffle (that's the reason why it is slower than MR).
>
> The problem and difficulty is that you have to achieve external sort
> yourself, since memory may not be enough to hold the whole partition.
>
> Spark's shuffle is different from MR, which does not have key ordering
> restriction. So the scenarios like what you mentioned is not so easy to
> address. SPARK-2926 tries to solve the scenario like yours, but it is not
> merged yet, you have to find a workaround in application level.
>
> Thanks
> Jerry
>
>
>
> On Fri, Sep 11, 2015 at 10:42 AM, Raghavendra Pandey <
> raghavendra.pan...@gmail.com> wrote:
>
>> In mr jobs, the output is sorted only within reducer.. That can be better
>> emulated by sorting each partition of rdd rather than total sorting the
>> rdd..
>> In Rdd.mapPartition you can sort the data in one partition and try...
>> On Sep 11, 2015 7:36 AM, "周千昊" <z.qian...@gmail.com> wrote:
>>
>>> Hi, all
>>>  Can anyone give some tips about this issue?
>>>
>>> 周千昊 <qhz...@apache.org>于2015年9月8日周二 下午4:46写道:
>>>
>>>> Hi, community
>>>>  I have an application which I try to migrate from MR to Spark.
>>>>  It will do some calculations from Hive and output to hfile which
>>>> will be bulk load to HBase Table, details as follow:
>>>>
>>>>  Rdd input = getSourceInputFromHive()
>>>>  Rdd<Tuple2<byte[], byte[]>> mapSideResult =
>>>> input.glom().mapPartitions(/*some calculation*/)
>>>>  // PS: the result in each partition has already been sorted
>>>> according to the lexicographical order during the calculation
>>>>  mapSideResult.reduceByKey(/*some
>>>> aggregations*/).sortByKey(/**/).map(/*transform Tuple2<byte[], byte[]> to
>>>> Tuple2<ImmutableBytesWritable, KeyValue>*/).saveAsNewAPIHadoopFile(/*write
>>>> to hfile*/)
>>>>
>>>>   *Here is the problem, as in MR, in the reducer side, the mapper
>>>> output has already been sorted, so that it is a merge sort which makes
>>>> writing to hfile is sequential and fast.*
>>>> *  However in Spark, the output of reduceByKey phase has been
>>>> shuffled, so I have to sort the rdd in order to write hfile which makes it
>>>> slower 2x running on Spark than on MR.*
>>>> *  I am wondering that, if there is anything I can leverage has the
>>>> same effect as MR. I happen to see a JIRA
>>>> ticket https://issues.apache.org/jira/browse/SPARK-2926
>>>> <https://issues.apache.org/jira/browse/SPARK-2926>. Is it related to what I
>>>> am looking for?*
>>>>
>>> --
>>> Best Regard
>>> ZhouQianhao
>>>
>>
> --
Best Regard
ZhouQianhao


about mr-style merge sort

2015-09-08 Thread
Hi, community
 I have an application which I try to migrate from MR to Spark.
 It will do some calculations from Hive and output to hfile which will
be bulk load to HBase Table, details as follow:

 Rdd input = getSourceInputFromHive()
 Rdd> mapSideResult =
input.glom().mapPartitions(/*some calculation*/)
 // PS: the result in each partition has already been sorted according
to the lexicographical order during the calculation
 mapSideResult.reduceByKey(/*some
aggregations*/).sortByKey(/**/).map(/*transform Tuple2 to
Tuple2*/).saveAsNewAPIHadoopFile(/*write
to hfile*/)

  *Here is the problem, as in MR, in the reducer side, the mapper
output has already been sorted, so that it is a merge sort which makes
writing to hfile is sequential and fast.*
*  However in Spark, the output of reduceByKey phase has been shuffled,
so I have to sort the rdd in order to write hfile which makes it slower 2x
running on Spark than on MR.*
*  I am wondering that, if there is anything I can leverage has the
same effect as MR. I happen to see a JIRA
ticket https://issues.apache.org/jira/browse/SPARK-2926
. Is it related to what I
am looking for?*


serialization issue

2015-08-13 Thread
Hi,
I am using spark 1.4 when an issue occurs to me.
I am trying to use the aggregate function:
JavaRddString rdd = some rdd;
HashMapLong, TypeA zeroValue = new HashMap();
// add initial key-value pair for zeroValue
rdd.aggregate(zeroValue,
   new Function2HashMapLong, TypeA,
String,
HashMapLong, TypeA(){//implementation},
   new Function2HashMapLong, TypeA,
String,
HashMapLong, TypeA(){//implementation})

here is the stack trace when i run the application:

Caused by: java.lang.ClassNotFoundException: TypeA
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:274)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at java.util.HashMap.readObject(HashMap.java:1180)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89)
at org.apache.spark.util.Utils$.clone(Utils.scala:1458)
at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1049)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1047)
at
org.apache.spark.api.java.JavaRDDLike$class.aggregate(JavaRDDLike.scala:413)
at
org.apache.spark.api.java.AbstractJavaRDDLike.aggregate(JavaRDDLike.scala:47)
 *however I have checked that TypeA is in the jar file which is in the
classpath*
*And when I use an empty HashMap as the zeroValue, the exception has
gone*