Re: Having lots of FetchFailedException in join

2015-03-05 Thread Aaron Davidson
However, Executors were dying when using Netty as well, so it is possible
that the OOM was occurring then too. It is also possible only one of your
Executors OOMs (due to a particularly large task) and the others display
various exceptions while trying to fetch the shuffle blocks from the failed
executor.

I cannot explain the local FileNotFoundExcepions occurring on machines that
were not throwing fatal errors, though -- typically I have only seen that
happen when a fatal error (e.g., OOM) was thrown on an Executor, causing it
to begin the termination process which involves deleting its own shuffle
files. It may then throw the FNF if other Executors request those files
before it has completed its shutdown (and will throw a ConnectionFailed
once it's completed terminating).

On Thu, Mar 5, 2015 at 12:19 AM, Shao, Saisai saisai.s...@intel.com wrote:

  I’ve no idea why Netty didn’t meet OOM issue, one possibility is that
 Netty uses direct memory to save each block, whereas NIO uses on-heap
 memory, so Netty occupies less on heap memory than NIO.





 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 4:14 PM

 *To:* Shao, Saisai
 *Cc:* Cheng, Hao; user
 *Subject:* Re: Having lots of FetchFailedException in join



 Thanks. I was about to submit a ticket for this :)



 Also there's a ticket for sort-merge based groupbykey
 https://issues.apache.org/jira/browse/SPARK-3461



 BTW, any idea why run with netty didn't output OOM error messages? It's
 very confusing in troubleshooting.





 Jianshi



 On Thu, Mar 5, 2015 at 4:01 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  I think there’s a lot of JIRA trying to solve this problem (
 https://issues.apache.org/jira/browse/SPARK-5763). Basically sort merge
 join is a good choice.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 3:55 PM
 *To:* Shao, Saisai
 *Cc:* Cheng, Hao; user


 *Subject:* Re: Having lots of FetchFailedException in join



 There're some skew.



 64

 6164

 0

 SUCCESS

 PROCESS_LOCAL

 200 / 

 2015/03/04 23:45:47

 1.1 min

 6 s

 198.6 MB

 21.1 GB

 240.8 MB

 59

 6159

 0

 SUCCESS

 PROCESS_LOCAL

 30 / 

 2015/03/04 23:45:47

 44 s

 5 s

 200.7 MB

 4.8 GB

 154.0 MB

 But I expect this kind of skewness to be quite common.



 Jianshi





 On Thu, Mar 5, 2015 at 3:48 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

  I see. I'm using core's join. The data might have some skewness
 (checking).



 I understand shuffle can spill data to disk but when consuming it, say in
 cogroup or groupByKey, it still needs to read the whole group elements,
 right? I guess OOM happened there when reading very large groups.



 Jianshi



 On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  I think what you could do is to monitor through web UI to see if there’s
 any skew or other symptoms in shuffle write and read. For GC you could use
 the below configuration as you mentioned.



 From Spark core side, all the shuffle related operations can spill the
 data into disk and no need to read the whole partition into memory. But if
 you uses SparkSQL, it depends on how SparkSQL uses this operators.



 CC @hao if he has some thoughts on it.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 3:28 PM
 *To:* Shao, Saisai


 *Cc:* user
 *Subject:* Re: Having lots of FetchFailedException in join



 Hi Saisai,



 What's your suggested settings on monitoring shuffle? I've
 enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.



 I found SPARK-3461 (Support external groupByKey using
 repartitionAndSortWithinPartitions) want to make groupByKey using external
 storage. It's still open status. Does that mean now
 groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read
 the group as a whole during consuming?



 How can I deal with the key skewness in joins? Is there a skew-join
 implementation?





 Jianshi







 On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  Hi Jianshi,



 From my understanding, it may not be the problem of NIO or Netty, looking
 at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap),
 theoretically EAOM can spill the data into disk if memory is not enough,
 but there might some issues when join key is skewed or key number is
 smaller, so you will meet OOM.



 Maybe you could monitor each stage or task’s shuffle and GC status also
 system status to identify the problem.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 2:32 PM
 *To:* Aaron Davidson
 *Cc:* user
 *Subject:* Re: Having lots of FetchFailedException in join



 One really interesting is that when I'm using the
 netty-based spark.shuffle.blockTransferService, there's no OOM error
 messages (java.lang.OutOfMemoryError: Java heap space

Re: Having lots of FetchFailedException in join

2015-03-05 Thread Jianshi Huang
Thanks. I was about to submit a ticket for this :)

Also there's a ticket for sort-merge based groupbykey
https://issues.apache.org/jira/browse/SPARK-3461

BTW, any idea why run with netty didn't output OOM error messages? It's
very confusing in troubleshooting.


Jianshi

On Thu, Mar 5, 2015 at 4:01 PM, Shao, Saisai saisai.s...@intel.com wrote:

  I think there’s a lot of JIRA trying to solve this problem (
 https://issues.apache.org/jira/browse/SPARK-5763). Basically sort merge
 join is a good choice.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 3:55 PM
 *To:* Shao, Saisai
 *Cc:* Cheng, Hao; user

 *Subject:* Re: Having lots of FetchFailedException in join



 There're some skew.



 64

 6164

 0

 SUCCESS

 PROCESS_LOCAL

 200 / 

 2015/03/04 23:45:47

 1.1 min

 6 s

 198.6 MB

 21.1 GB

 240.8 MB

 59

 6159

 0

 SUCCESS

 PROCESS_LOCAL

 30 / 

 2015/03/04 23:45:47

 44 s

 5 s

 200.7 MB

 4.8 GB

 154.0 MB

 But I expect this kind of skewness to be quite common.



 Jianshi





 On Thu, Mar 5, 2015 at 3:48 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

  I see. I'm using core's join. The data might have some skewness
 (checking).



 I understand shuffle can spill data to disk but when consuming it, say in
 cogroup or groupByKey, it still needs to read the whole group elements,
 right? I guess OOM happened there when reading very large groups.



 Jianshi



 On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  I think what you could do is to monitor through web UI to see if there’s
 any skew or other symptoms in shuffle write and read. For GC you could use
 the below configuration as you mentioned.



 From Spark core side, all the shuffle related operations can spill the
 data into disk and no need to read the whole partition into memory. But if
 you uses SparkSQL, it depends on how SparkSQL uses this operators.



 CC @hao if he has some thoughts on it.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 3:28 PM
 *To:* Shao, Saisai


 *Cc:* user
 *Subject:* Re: Having lots of FetchFailedException in join



 Hi Saisai,



 What's your suggested settings on monitoring shuffle? I've
 enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.



 I found SPARK-3461 (Support external groupByKey using
 repartitionAndSortWithinPartitions) want to make groupByKey using external
 storage. It's still open status. Does that mean now
 groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read
 the group as a whole during consuming?



 How can I deal with the key skewness in joins? Is there a skew-join
 implementation?





 Jianshi







 On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  Hi Jianshi,



 From my understanding, it may not be the problem of NIO or Netty, looking
 at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap),
 theoretically EAOM can spill the data into disk if memory is not enough,
 but there might some issues when join key is skewed or key number is
 smaller, so you will meet OOM.



 Maybe you could monitor each stage or task’s shuffle and GC status also
 system status to identify the problem.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 2:32 PM
 *To:* Aaron Davidson
 *Cc:* user
 *Subject:* Re: Having lots of FetchFailedException in join



 One really interesting is that when I'm using the
 netty-based spark.shuffle.blockTransferService, there's no OOM error
 messages (java.lang.OutOfMemoryError: Java heap space).



 Any idea why it's not here?



 I'm using Spark 1.2.1.



 Jianshi



 On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

  I changed spark.shuffle.blockTransferService to nio and now I'm getting
 OOM errors, I'm doing a big join operation.





 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0
 (TID 6207)

 java.lang.OutOfMemoryError: Java heap space

 at
 org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)

 at
 org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)

 at
 org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)

 at
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32

Re: Having lots of FetchFailedException in join

2015-03-04 Thread Jianshi Huang
One really interesting is that when I'm using the
netty-based spark.shuffle.blockTransferService, there's no OOM error
messages (java.lang.OutOfMemoryError: Java heap space).

Any idea why it's not here?

I'm using Spark 1.2.1.

Jianshi

On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 I changed spark.shuffle.blockTransferService to nio and now I'm getting
 OOM errors, I'm doing a big join operation.


 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0
 (TID 6207)
 java.lang.OutOfMemoryError: Java heap space
 at
 org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
 at
 org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
 at
 org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
 at
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
 at
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at
 org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at
 org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at
 org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)

 Is join/cogroup still memory bound?


 Jianshi



 On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hmm... ok, previous errors are still block fetch errors.

 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning
 fetch of 11 outstanding blocks
 java.io.IOException: Failed to connect to host-/:55597
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
 at
 org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
 at
 org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:289)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at
 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at 

Re: Having lots of FetchFailedException in join

2015-03-04 Thread Jianshi Huang
There're some skew.

6461640SUCCESSPROCESS_LOCAL200 / 2015/03/04 23:45:471.1 min6 s198.6 MB21.1
GB240.8 MB5961590SUCCESSPROCESS_LOCAL30 / 2015/03/04 23:45:4744 s5 s200.7
MB4.8 GB154.0 MB
But I expect this kind of skewness to be quite common.

Jianshi


On Thu, Mar 5, 2015 at 3:48 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 I see. I'm using core's join. The data might have some skewness
 (checking).

 I understand shuffle can spill data to disk but when consuming it, say in
 cogroup or groupByKey, it still needs to read the whole group elements,
 right? I guess OOM happened there when reading very large groups.

 Jianshi

 On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  I think what you could do is to monitor through web UI to see if
 there’s any skew or other symptoms in shuffle write and read. For GC you
 could use the below configuration as you mentioned.



 From Spark core side, all the shuffle related operations can spill the
 data into disk and no need to read the whole partition into memory. But if
 you uses SparkSQL, it depends on how SparkSQL uses this operators.



 CC @hao if he has some thoughts on it.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 3:28 PM
 *To:* Shao, Saisai

 *Cc:* user
 *Subject:* Re: Having lots of FetchFailedException in join



 Hi Saisai,



 What's your suggested settings on monitoring shuffle? I've
 enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.



 I found SPARK-3461 (Support external groupByKey using
 repartitionAndSortWithinPartitions) want to make groupByKey using external
 storage. It's still open status. Does that mean now
 groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read
 the group as a whole during consuming?



 How can I deal with the key skewness in joins? Is there a skew-join
 implementation?





 Jianshi







 On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  Hi Jianshi,



 From my understanding, it may not be the problem of NIO or Netty, looking
 at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap),
 theoretically EAOM can spill the data into disk if memory is not enough,
 but there might some issues when join key is skewed or key number is
 smaller, so you will meet OOM.



 Maybe you could monitor each stage or task’s shuffle and GC status also
 system status to identify the problem.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 2:32 PM
 *To:* Aaron Davidson
 *Cc:* user
 *Subject:* Re: Having lots of FetchFailedException in join



 One really interesting is that when I'm using the
 netty-based spark.shuffle.blockTransferService, there's no OOM error
 messages (java.lang.OutOfMemoryError: Java heap space).



 Any idea why it's not here?



 I'm using Spark 1.2.1.



 Jianshi



 On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

  I changed spark.shuffle.blockTransferService to nio and now I'm getting
 OOM errors, I'm doing a big join operation.





 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0
 (TID 6207)

 java.lang.OutOfMemoryError: Java heap space

 at
 org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)

 at
 org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)

 at
 org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)

 at
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)

 at
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)

 at
 org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

 at org.apache.spark.rdd.RDD.iterator

RE: Having lots of FetchFailedException in join

2015-03-04 Thread Shao, Saisai
Yes, if one key has too many values, there still has a chance to meet the OOM.

Thanks
Jerry

From: Jianshi Huang [mailto:jianshi.hu...@gmail.com]
Sent: Thursday, March 5, 2015 3:49 PM
To: Shao, Saisai
Cc: Cheng, Hao; user
Subject: Re: Having lots of FetchFailedException in join

I see. I'm using core's join. The data might have some skewness (checking).

I understand shuffle can spill data to disk but when consuming it, say in 
cogroup or groupByKey, it still needs to read the whole group elements, right? 
I guess OOM happened there when reading very large groups.

Jianshi

On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
I think what you could do is to monitor through web UI to see if there’s any 
skew or other symptoms in shuffle write and read. For GC you could use the 
below configuration as you mentioned.

From Spark core side, all the shuffle related operations can spill the data 
into disk and no need to read the whole partition into memory. But if you uses 
SparkSQL, it depends on how SparkSQL uses this operators.

CC @hao if he has some thoughts on it.

Thanks
Jerry

From: Jianshi Huang 
[mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com]
Sent: Thursday, March 5, 2015 3:28 PM
To: Shao, Saisai

Cc: user
Subject: Re: Having lots of FetchFailedException in join

Hi Saisai,

What's your suggested settings on monitoring shuffle? I've enabled 
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.

I found SPARK-3461 (Support external groupByKey using 
repartitionAndSortWithinPartitions) want to make groupByKey using external 
storage. It's still open status. Does that mean now 
groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the 
group as a whole during consuming?

How can I deal with the key skewness in joins? Is there a skew-join 
implementation?


Jianshi



On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Hi Jianshi,

From my understanding, it may not be the problem of NIO or Netty, looking at 
your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), 
theoretically EAOM can spill the data into disk if memory is not enough, but 
there might some issues when join key is skewed or key number is smaller, so 
you will meet OOM.

Maybe you could monitor each stage or task’s shuffle and GC status also system 
status to identify the problem.

Thanks
Jerry

From: Jianshi Huang 
[mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com]
Sent: Thursday, March 5, 2015 2:32 PM
To: Aaron Davidson
Cc: user
Subject: Re: Having lots of FetchFailedException in join

One really interesting is that when I'm using the netty-based 
spark.shuffle.blockTransferService, there's no OOM error messages 
(java.lang.OutOfMemoryError: Java heap space).

Any idea why it's not here?

I'm using Spark 1.2.1.

Jianshi

On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang 
jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote:
I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM 
errors, I'm doing a big join operation.


15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 
6207)
java.lang.OutOfMemoryError: Java heap space
at 
org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
at 
org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
at 
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
at 
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247

RE: Having lots of FetchFailedException in join

2015-03-04 Thread Shao, Saisai
Hi Jianshi,

From my understanding, it may not be the problem of NIO or Netty, looking at 
your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), 
theoretically EAOM can spill the data into disk if memory is not enough, but 
there might some issues when join key is skewed or key number is smaller, so 
you will meet OOM.

Maybe you could monitor each stage or task’s shuffle and GC status also system 
status to identify the problem.

Thanks
Jerry

From: Jianshi Huang [mailto:jianshi.hu...@gmail.com]
Sent: Thursday, March 5, 2015 2:32 PM
To: Aaron Davidson
Cc: user
Subject: Re: Having lots of FetchFailedException in join

One really interesting is that when I'm using the netty-based 
spark.shuffle.blockTransferService, there's no OOM error messages 
(java.lang.OutOfMemoryError: Java heap space).

Any idea why it's not here?

I'm using Spark 1.2.1.

Jianshi

On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang 
jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote:
I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM 
errors, I'm doing a big join operation.


15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 
6207)
java.lang.OutOfMemoryError: Java heap space
at 
org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
at 
org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
at 
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
at 
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at 
org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at 
org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)

Is join/cogroup still memory bound?


Jianshi



On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang 
jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote:
Hmm... ok, previous errors are still block fetch errors.

15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning fetch 
of 11 outstanding blocks
java.io.IOException: Failed to connect to host-/:55597
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
at 
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87

Re: Having lots of FetchFailedException in join

2015-03-04 Thread Jianshi Huang
Hi Saisai,

What's your suggested settings on monitoring shuffle? I've
enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.

I found SPARK-3461 (Support external groupByKey using
repartitionAndSortWithinPartitions) want to make groupByKey using external
storage. It's still open status. Does that mean now
groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read
the group as a whole during consuming?


How can I deal with the key skewness in joins? Is there a skew-join
implementation?



Jianshi



On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.com wrote:

  Hi Jianshi,



 From my understanding, it may not be the problem of NIO or Netty, looking
 at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap),
 theoretically EAOM can spill the data into disk if memory is not enough,
 but there might some issues when join key is skewed or key number is
 smaller, so you will meet OOM.



 Maybe you could monitor each stage or task’s shuffle and GC status also
 system status to identify the problem.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 2:32 PM
 *To:* Aaron Davidson
 *Cc:* user
 *Subject:* Re: Having lots of FetchFailedException in join



 One really interesting is that when I'm using the
 netty-based spark.shuffle.blockTransferService, there's no OOM error
 messages (java.lang.OutOfMemoryError: Java heap space).



 Any idea why it's not here?



 I'm using Spark 1.2.1.



 Jianshi



 On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

  I changed spark.shuffle.blockTransferService to nio and now I'm getting
 OOM errors, I'm doing a big join operation.





 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0
 (TID 6207)

 java.lang.OutOfMemoryError: Java heap space

 at
 org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)

 at
 org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)

 at
 org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)

 at
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)

 at
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)

 at
 org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)

 at
 org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)

 at
 org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)

 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)

 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)

 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)

 at org.apache.spark.scheduler.Task.run(Task.scala:56)



 Is join/cogroup still memory bound?





 Jianshi







 On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

  Hmm... ok, previous errors are still block fetch errors.



 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning
 fetch of 11 outstanding blocks

 java.io.IOException: Failed to connect to host-/:55597

RE: Having lots of FetchFailedException in join

2015-03-04 Thread Shao, Saisai
I think what you could do is to monitor through web UI to see if there’s any 
skew or other symptoms in shuffle write and read. For GC you could use the 
below configuration as you mentioned.

From Spark core side, all the shuffle related operations can spill the data 
into disk and no need to read the whole partition into memory. But if you uses 
SparkSQL, it depends on how SparkSQL uses this operators.

CC @hao if he has some thoughts on it.

Thanks
Jerry

From: Jianshi Huang [mailto:jianshi.hu...@gmail.com]
Sent: Thursday, March 5, 2015 3:28 PM
To: Shao, Saisai
Cc: user
Subject: Re: Having lots of FetchFailedException in join

Hi Saisai,

What's your suggested settings on monitoring shuffle? I've enabled 
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.

I found SPARK-3461 (Support external groupByKey using 
repartitionAndSortWithinPartitions) want to make groupByKey using external 
storage. It's still open status. Does that mean now 
groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the 
group as a whole during consuming?

How can I deal with the key skewness in joins? Is there a skew-join 
implementation?


Jianshi



On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Hi Jianshi,

From my understanding, it may not be the problem of NIO or Netty, looking at 
your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), 
theoretically EAOM can spill the data into disk if memory is not enough, but 
there might some issues when join key is skewed or key number is smaller, so 
you will meet OOM.

Maybe you could monitor each stage or task’s shuffle and GC status also system 
status to identify the problem.

Thanks
Jerry

From: Jianshi Huang 
[mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com]
Sent: Thursday, March 5, 2015 2:32 PM
To: Aaron Davidson
Cc: user
Subject: Re: Having lots of FetchFailedException in join

One really interesting is that when I'm using the netty-based 
spark.shuffle.blockTransferService, there's no OOM error messages 
(java.lang.OutOfMemoryError: Java heap space).

Any idea why it's not here?

I'm using Spark 1.2.1.

Jianshi

On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang 
jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote:
I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM 
errors, I'm doing a big join operation.


15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 
6207)
java.lang.OutOfMemoryError: Java heap space
at 
org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
at 
org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
at 
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
at 
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at 
org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at 
org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31

Re: Having lots of FetchFailedException in join

2015-03-04 Thread Jianshi Huang
I see. I'm using core's join. The data might have some skewness (checking).

I understand shuffle can spill data to disk but when consuming it, say in
cogroup or groupByKey, it still needs to read the whole group elements,
right? I guess OOM happened there when reading very large groups.

Jianshi

On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai saisai.s...@intel.com wrote:

  I think what you could do is to monitor through web UI to see if there’s
 any skew or other symptoms in shuffle write and read. For GC you could use
 the below configuration as you mentioned.



 From Spark core side, all the shuffle related operations can spill the
 data into disk and no need to read the whole partition into memory. But if
 you uses SparkSQL, it depends on how SparkSQL uses this operators.



 CC @hao if he has some thoughts on it.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 3:28 PM
 *To:* Shao, Saisai

 *Cc:* user
 *Subject:* Re: Having lots of FetchFailedException in join



 Hi Saisai,



 What's your suggested settings on monitoring shuffle? I've
 enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.



 I found SPARK-3461 (Support external groupByKey using
 repartitionAndSortWithinPartitions) want to make groupByKey using external
 storage. It's still open status. Does that mean now
 groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read
 the group as a whole during consuming?



 How can I deal with the key skewness in joins? Is there a skew-join
 implementation?





 Jianshi







 On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  Hi Jianshi,



 From my understanding, it may not be the problem of NIO or Netty, looking
 at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap),
 theoretically EAOM can spill the data into disk if memory is not enough,
 but there might some issues when join key is skewed or key number is
 smaller, so you will meet OOM.



 Maybe you could monitor each stage or task’s shuffle and GC status also
 system status to identify the problem.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 2:32 PM
 *To:* Aaron Davidson
 *Cc:* user
 *Subject:* Re: Having lots of FetchFailedException in join



 One really interesting is that when I'm using the
 netty-based spark.shuffle.blockTransferService, there's no OOM error
 messages (java.lang.OutOfMemoryError: Java heap space).



 Any idea why it's not here?



 I'm using Spark 1.2.1.



 Jianshi



 On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

  I changed spark.shuffle.blockTransferService to nio and now I'm getting
 OOM errors, I'm doing a big join operation.





 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0
 (TID 6207)

 java.lang.OutOfMemoryError: Java heap space

 at
 org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)

 at
 org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)

 at
 org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)

 at
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)

 at
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)

 at
 org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)

 at
 org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)

 at
 org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31

Re: Having lots of FetchFailedException in join

2015-03-04 Thread Jianshi Huang
I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM
errors, I'm doing a big join operation.


15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID
6207)
java.lang.OutOfMemoryError: Java heap space
at
org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
at
org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
at
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
at
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at
org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at
org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)

Is join/cogroup still memory bound?


Jianshi



On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Hmm... ok, previous errors are still block fetch errors.

 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning
 fetch of 11 outstanding blocks
 java.io.IOException: Failed to connect to host-/:55597
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
 at
 org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
 at
 org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:289)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at
 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
 at
 

Re: Having lots of FetchFailedException in join

2015-03-03 Thread Aaron Davidson
Failed to connect implies that the executor at that host died, please
check its logs as well.

On Tue, Mar 3, 2015 at 11:03 AM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Sorry that I forgot the subject.

 And in the driver, I got many FetchFailedException. The error messages are

 15/03/03 10:34:32 WARN TaskSetManager: Lost task 31.0 in stage 2.2 (TID
 7943, ): FetchFailed(BlockManagerId(86, , 43070), shuffleId=0,
 mapId=24, reduceId=1220, message=
 org.apache.spark.shuffle.FetchFailedException: Failed to connect to
 /:43070
 at
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
 at
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
 at
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)


 Jianshi

 On Wed, Mar 4, 2015 at 2:55 AM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi,

 I got this error message:

 15/03/03 10:22:41 ERROR OneForOneBlockFetcher: Failed while starting
 block fetches
 java.lang.RuntimeException: java.io.FileNotFoundException:
 /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)
 at java.io.FileInputStream.open(Native Method)
 at java.io.FileInputStream.init(FileInputStream.java:146)
 at
 org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
 at
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)


 And then for the same index file and executor, I got the following errors
 multiple times

 15/03/03 10:22:41 ERROR ShuffleBlockFetcherIterator: Failed to get
 block(s) from host-:39534
 java.lang.RuntimeException: java.io.FileNotFoundException:
 /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)

 15/03/03 10:22:41 ERROR RetryingBlockFetcher: Failed to fetch block
 shuffle_0_13_1228, and will not retry (0 retries)
 java.lang.RuntimeException: java.io.FileNotFoundException:
 /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)

 ...
 Caused by: java.net.ConnectException: Connection refused: host-


 What's the problem?

 BTW, I'm using Spark 1.2.1-SNAPSHOT I built around Dec. 20. Is there any
 bug fixes related to shuffle block fetching or index files after that?


 Thanks,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/



Re: Having lots of FetchFailedException in join

2015-03-03 Thread Jianshi Huang
Sorry that I forgot the subject.

And in the driver, I got many FetchFailedException. The error messages are

15/03/03 10:34:32 WARN TaskSetManager: Lost task 31.0 in stage 2.2 (TID
7943, ): FetchFailed(BlockManagerId(86, , 43070), shuffleId=0,
mapId=24, reduceId=1220, message=
org.apache.spark.shuffle.FetchFailedException: Failed to connect to
/:43070
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)


Jianshi

On Wed, Mar 4, 2015 at 2:55 AM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Hi,

 I got this error message:

 15/03/03 10:22:41 ERROR OneForOneBlockFetcher: Failed while starting block
 fetches
 java.lang.RuntimeException: java.io.FileNotFoundException:
 /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)
 at java.io.FileInputStream.open(Native Method)
 at java.io.FileInputStream.init(FileInputStream.java:146)
 at
 org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
 at
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)


 And then for the same index file and executor, I got the following errors
 multiple times

 15/03/03 10:22:41 ERROR ShuffleBlockFetcherIterator: Failed to get
 block(s) from host-:39534
 java.lang.RuntimeException: java.io.FileNotFoundException:
 /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)

 15/03/03 10:22:41 ERROR RetryingBlockFetcher: Failed to fetch block
 shuffle_0_13_1228, and will not retry (0 retries)
 java.lang.RuntimeException: java.io.FileNotFoundException:
 /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)

 ...
 Caused by: java.net.ConnectException: Connection refused: host-


 What's the problem?

 BTW, I'm using Spark 1.2.1-SNAPSHOT I built around Dec. 20. Is there any
 bug fixes related to shuffle block fetching or index files after that?


 Thanks,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


Re: Having lots of FetchFailedException in join

2015-03-03 Thread Jianshi Huang
Hmm... ok, previous errors are still block fetch errors.

15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning
fetch of 11 outstanding blocks
java.io.IOException: Failed to connect to host-/:55597
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
at
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:289)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
Caused by: java.net.ConnectException: Connection refused:
lvshdc5dn0518.lvs.paypal.com/10.196.244.48:55597
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735)
at
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)

And I checked executor on container host-, everything is good.

Jianshi


On Wed, Mar 4, 2015 at 12:28 PM, Aaron Davidson ilike...@gmail.com wrote:

 Drat! That doesn't help. Could you scan from the top to see if there were
 any fatal errors preceding these? Sometimes a OOM will cause this type of
 issue further down.

 On Tue, Mar 3, 2015 at 8:16 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 The failed executor has the following error messages. Any hints?

 15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking
 RpcHandler#receive() on RPC id 5711039715419258699
 

Re: Having lots of FetchFailedException in join

2015-03-03 Thread Aaron Davidson
Drat! That doesn't help. Could you scan from the top to see if there were
any fatal errors preceding these? Sometimes a OOM will cause this type of
issue further down.

On Tue, Mar 3, 2015 at 8:16 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 The failed executor has the following error messages. Any hints?

 15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking
 RpcHandler#receive() on RPC id 5711039715419258699
 java.io.FileNotFoundException:
 /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)
 at java.io.FileInputStream.open(Native Method)
 at java.io.FileInputStream.init(FileInputStream.java:146)
 at
 org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
 at
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
 at
 org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
 at
 org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
 at
 org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
 at
 io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at
 io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at
 io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
 at
 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at
 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at
 io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
 at
 io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
 at
 io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
 at
 io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
 at
 io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
 at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
 at
 io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
 at java.lang.Thread.run(Thread.java:724)
 15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking
 RpcHandler#receive() on RPC id 7941985280808455530
 java.io.FileNotFoundException:
 /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)
 at java.io.FileInputStream.open(Native Method)
 at java.io.FileInputStream.init(FileInputStream.java:146)
 at
 org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
 at
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 

Re: Having lots of FetchFailedException in join

2015-03-03 Thread Jianshi Huang
The failed executor has the following error messages. Any hints?

15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking
RpcHandler#receive() on RPC id 5711039715419258699
java.io.FileNotFoundException:
/hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
(No such file or directory)
at java.io.FileInputStream.open(Native Method)
at java.io.FileInputStream.init(FileInputStream.java:146)
at
org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
at
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
at
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
at
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:724)
15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking
RpcHandler#receive() on RPC id 7941985280808455530
java.io.FileNotFoundException:
/hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
(No such file or directory)
at java.io.FileInputStream.open(Native Method)
at java.io.FileInputStream.init(FileInputStream.java:146)
at
org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
at
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at