Spark Lost executor && shuffle.FetchFailedException

2015-09-21 Thread biyan900116
Hi All:

When I write the data to the hive dynamic partition table, many errors and 
warnings as following happen... 

Is the reason that shuffle output is so large ?

=
15/09/21 14:53:09 ERROR cluster.YarnClusterScheduler: Lost executor 402 on 
dn03.datanode.com: remote Rpc client disassociated

=
15/09/21 14:53:27 WARN scheduler.TaskSetManager: Lost task 107.0 in stage 7.0 
(TID 27601, dn01.datanode.com): FetchFailed(BlockManagerId(513, 
dn02.datanode.com, 34869), shuffleId=1, mapId=90, reduceId=107, message=
org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
dn02.datanode.com/XX.XX.XX.17:34869
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:216)
at 
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:61)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
Caused by: java.io.IOException: Failed to connect to 
dn02.datanode.com/XX.XX.XX.17:34869
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
... 3 more
Caused by: java.net.ConnectException: 拒绝连接: dn02.datanode.com/XX.XX.XX.17:34869
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:708)
at 
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
... 1 more

)



Re: Spark aggregateByKey Issues

2015-09-15 Thread biyan900116
Hi Alexis:

Of course, it’s very useful to me, specially about the operations after sort 
operation is done.

And, i still have one question:
How to set the decent number of partition,  if it need not to be equal to the 
number of keys ?

> 在 2015年9月15日,下午3:41,Alexis Gillain  写道:
> 
> Sorry I made a typo error in my previous message, you can't 
> sortByKey(youkey,date) and have all records of your keys in the same 
> partition.
>  
> So you can sortByKey(yourkey)  with a decent number of partition (doesnt have 
> to be the number of keys). After that you have records of a key grouped in a 
> partition but not sort by date.
> 
> Then you use mapPartitions to copy the partition in a List and you can sort 
> by (youkey, date) and use this list to compute whatever you want. The main 
> issue is that a partition must fit in memory.
> 
> Hope this help.
> 
> 2015-09-15 13:50 GMT+08:00  >:
> Hi Alexis:
> 
> Thank you for your replying.
> 
> My case is that each operation to one record need to depend on one value that 
> will be set by the operating to the last record. 
> 
> So your advise is that i can use “sortByKey”. “sortByKey” will put all 
> records with the same Key in one partition. Need I take the “numPartitions” 
> parameter ? Or even if i don’t , it still do that .
> 
> If it works, add “aggregate” to deal with my case, i think the comOp function 
> in parameter list of aggregate API won’t be executed.. Is my understanding 
> wrong ? 
>   
> 
>> 在 2015年9月15日,下午12:47,Alexis Gillain > > 写道:
>> 
>> I'm not sure about what you want to do.
>> 
>> You should try to sort the RDD by (yourKey, date), it ensures that all the 
>> keys are in the same partition.
>> 
>> You problem after that is you want to aggregate only on yourKey and if you 
>> change the Key of the sorted RDD you loose partitionning.
>> 
>> Depending of the size of the result you can use an aggregate bulding a map 
>> of results by youKey or use MapPartition to output a rdd (in this case set 
>> the number of partition high enough to allow the partition to fit in memory).
>> 
>> see 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html
>>  
>> 
>> 
>> 2015-09-15 11:25 GMT+08:00 毕岩 > >:
>> Hi:
>> 
>> 
>> 
>> There is such one case about using reduce operation like that:
>> 
>> 
>> 
>> I Need to reduce large data made up of billions of records with a Key-Value 
>> pair.
>> 
>> For the following:
>> 
>> First,group by Key, and the records with the same Key need to be in 
>> order of one field called “date” in Value
>> 
>> Second, in records with the same Key, every operation to one recored 
>> need to depend on the result of dealing with the last one, and the first one 
>> is original state..
>> 
>> 
>> 
>> Some attempts:
>> 
>> 1. groupByKey + map :  because of the bad performance, CPU is to 100%, so 
>> executors heartbeat time out and throw errors “Lost executor”, or the 
>> application is hung…
>> 
>> 
>> 
>> 2. AggregateByKey:
>> 
>> def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
>> 
>> combOp: (U, U) => U): RDD[(K, U)]
>> 
>> About aggregateByKey, is all the records with the same Key In the same 
>> partition ? Is the zeroValue applied to the first one in all records with 
>> the same Key, or in each partition ? If it is the former, comOp Function do 
>> nothing! 
>> 
>> I tried to take the second “numPartitions” parameter, pass the number of key 
>> to it. But, the number of key is so large to all the tasks be killed.
>> 
>> 
>> 
>> What should I do with this case ? 
>> 
>> I'm asking for advises online...
>> 
>> Thank you.
>> 
>> 
>> 
>> 
>> -- 
>> Alexis GILLAIN
> 
> 
> 
> 
> -- 
> Alexis GILLAIN



Re: Spark aggregateByKey Issues

2015-09-14 Thread biyan900116
Hi Alexis:

Thank you for your replying.

My case is that each operation to one record need to depend on one value that 
will be set by the operating to the last record. 

So your advise is that i can use “sortByKey”. “sortByKey” will put all records 
with the same Key in one partition. Need I take the “numPartitions” parameter ? 
Or even if i don’t , it still do that .

If it works, add “aggregate” to deal with my case, i think the comOp function 
in parameter list of aggregate API won’t be executed.. Is my understanding 
wrong ? 
  

> 在 2015年9月15日,下午12:47,Alexis Gillain  写道:
> 
> I'm not sure about what you want to do.
> 
> You should try to sort the RDD by (yourKey, date), it ensures that all the 
> keys are in the same partition.
> 
> You problem after that is you want to aggregate only on yourKey and if you 
> change the Key of the sorted RDD you loose partitionning.
> 
> Depending of the size of the result you can use an aggregate bulding a map of 
> results by youKey or use MapPartition to output a rdd (in this case set the 
> number of partition high enough to allow the partition to fit in memory).
> 
> see 
> http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html
>  
> 
> 
> 2015-09-15 11:25 GMT+08:00 毕岩  >:
> Hi:
> 
> 
> 
> There is such one case about using reduce operation like that:
> 
> 
> 
> I Need to reduce large data made up of billions of records with a Key-Value 
> pair.
> 
> For the following:
> 
> First,group by Key, and the records with the same Key need to be in order 
> of one field called “date” in Value
> 
> Second, in records with the same Key, every operation to one recored need 
> to depend on the result of dealing with the last one, and the first one is 
> original state..
> 
> 
> 
> Some attempts:
> 
> 1. groupByKey + map :  because of the bad performance, CPU is to 100%, so 
> executors heartbeat time out and throw errors “Lost executor”, or the 
> application is hung…
> 
> 
> 
> 2. AggregateByKey:
> 
> def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
> 
> combOp: (U, U) => U): RDD[(K, U)]
> 
> About aggregateByKey, is all the records with the same Key In the same 
> partition ? Is the zeroValue applied to the first one in all records with the 
> same Key, or in each partition ? If it is the former, comOp Function do 
> nothing! 
> 
> I tried to take the second “numPartitions” parameter, pass the number of key 
> to it. But, the number of key is so large to all the tasks be killed.
> 
> 
> 
> What should I do with this case ? 
> 
> I'm asking for advises online...
> 
> Thank you.
> 
> 
> 
> 
> -- 
> Alexis GILLAIN