Re: Having lots of FetchFailedException in join
However, Executors were dying when using Netty as well, so it is possible that the OOM was occurring then too. It is also possible only one of your Executors OOMs (due to a particularly large task) and the others display various exceptions while trying to fetch the shuffle blocks from the failed executor. I cannot explain the local FileNotFoundExcepions occurring on machines that were not throwing fatal errors, though -- typically I have only seen that happen when a fatal error (e.g., OOM) was thrown on an Executor, causing it to begin the termination process which involves deleting its own shuffle files. It may then throw the FNF if other Executors request those files before it has completed its shutdown (and will throw a ConnectionFailed once it's completed terminating). On Thu, Mar 5, 2015 at 12:19 AM, Shao, Saisai saisai.s...@intel.com wrote: I’ve no idea why Netty didn’t meet OOM issue, one possibility is that Netty uses direct memory to save each block, whereas NIO uses on-heap memory, so Netty occupies less on heap memory than NIO. *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 4:14 PM *To:* Shao, Saisai *Cc:* Cheng, Hao; user *Subject:* Re: Having lots of FetchFailedException in join Thanks. I was about to submit a ticket for this :) Also there's a ticket for sort-merge based groupbykey https://issues.apache.org/jira/browse/SPARK-3461 BTW, any idea why run with netty didn't output OOM error messages? It's very confusing in troubleshooting. Jianshi On Thu, Mar 5, 2015 at 4:01 PM, Shao, Saisai saisai.s...@intel.com wrote: I think there’s a lot of JIRA trying to solve this problem ( https://issues.apache.org/jira/browse/SPARK-5763). Basically sort merge join is a good choice. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 3:55 PM *To:* Shao, Saisai *Cc:* Cheng, Hao; user *Subject:* Re: Having lots of FetchFailedException in join There're some skew. 64 6164 0 SUCCESS PROCESS_LOCAL 200 / 2015/03/04 23:45:47 1.1 min 6 s 198.6 MB 21.1 GB 240.8 MB 59 6159 0 SUCCESS PROCESS_LOCAL 30 / 2015/03/04 23:45:47 44 s 5 s 200.7 MB 4.8 GB 154.0 MB But I expect this kind of skewness to be quite common. Jianshi On Thu, Mar 5, 2015 at 3:48 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I see. I'm using core's join. The data might have some skewness (checking). I understand shuffle can spill data to disk but when consuming it, say in cogroup or groupByKey, it still needs to read the whole group elements, right? I guess OOM happened there when reading very large groups. Jianshi On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai saisai.s...@intel.com wrote: I think what you could do is to monitor through web UI to see if there’s any skew or other symptoms in shuffle write and read. For GC you could use the below configuration as you mentioned. From Spark core side, all the shuffle related operations can spill the data into disk and no need to read the whole partition into memory. But if you uses SparkSQL, it depends on how SparkSQL uses this operators. CC @hao if he has some thoughts on it. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 3:28 PM *To:* Shao, Saisai *Cc:* user *Subject:* Re: Having lots of FetchFailedException in join Hi Saisai, What's your suggested settings on monitoring shuffle? I've enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging. I found SPARK-3461 (Support external groupByKey using repartitionAndSortWithinPartitions) want to make groupByKey using external storage. It's still open status. Does that mean now groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the group as a whole during consuming? How can I deal with the key skewness in joins? Is there a skew-join implementation? Jianshi On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Jianshi, From my understanding, it may not be the problem of NIO or Netty, looking at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), theoretically EAOM can spill the data into disk if memory is not enough, but there might some issues when join key is skewed or key number is smaller, so you will meet OOM. Maybe you could monitor each stage or task’s shuffle and GC status also system status to identify the problem. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 2:32 PM *To:* Aaron Davidson *Cc:* user *Subject:* Re: Having lots of FetchFailedException in join One really interesting is that when I'm using the netty-based spark.shuffle.blockTransferService, there's no OOM error messages (java.lang.OutOfMemoryError: Java heap space
Re: Having lots of FetchFailedException in join
Thanks. I was about to submit a ticket for this :) Also there's a ticket for sort-merge based groupbykey https://issues.apache.org/jira/browse/SPARK-3461 BTW, any idea why run with netty didn't output OOM error messages? It's very confusing in troubleshooting. Jianshi On Thu, Mar 5, 2015 at 4:01 PM, Shao, Saisai saisai.s...@intel.com wrote: I think there’s a lot of JIRA trying to solve this problem ( https://issues.apache.org/jira/browse/SPARK-5763). Basically sort merge join is a good choice. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 3:55 PM *To:* Shao, Saisai *Cc:* Cheng, Hao; user *Subject:* Re: Having lots of FetchFailedException in join There're some skew. 64 6164 0 SUCCESS PROCESS_LOCAL 200 / 2015/03/04 23:45:47 1.1 min 6 s 198.6 MB 21.1 GB 240.8 MB 59 6159 0 SUCCESS PROCESS_LOCAL 30 / 2015/03/04 23:45:47 44 s 5 s 200.7 MB 4.8 GB 154.0 MB But I expect this kind of skewness to be quite common. Jianshi On Thu, Mar 5, 2015 at 3:48 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I see. I'm using core's join. The data might have some skewness (checking). I understand shuffle can spill data to disk but when consuming it, say in cogroup or groupByKey, it still needs to read the whole group elements, right? I guess OOM happened there when reading very large groups. Jianshi On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai saisai.s...@intel.com wrote: I think what you could do is to monitor through web UI to see if there’s any skew or other symptoms in shuffle write and read. For GC you could use the below configuration as you mentioned. From Spark core side, all the shuffle related operations can spill the data into disk and no need to read the whole partition into memory. But if you uses SparkSQL, it depends on how SparkSQL uses this operators. CC @hao if he has some thoughts on it. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 3:28 PM *To:* Shao, Saisai *Cc:* user *Subject:* Re: Having lots of FetchFailedException in join Hi Saisai, What's your suggested settings on monitoring shuffle? I've enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging. I found SPARK-3461 (Support external groupByKey using repartitionAndSortWithinPartitions) want to make groupByKey using external storage. It's still open status. Does that mean now groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the group as a whole during consuming? How can I deal with the key skewness in joins? Is there a skew-join implementation? Jianshi On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Jianshi, From my understanding, it may not be the problem of NIO or Netty, looking at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), theoretically EAOM can spill the data into disk if memory is not enough, but there might some issues when join key is skewed or key number is smaller, so you will meet OOM. Maybe you could monitor each stage or task’s shuffle and GC status also system status to identify the problem. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 2:32 PM *To:* Aaron Davidson *Cc:* user *Subject:* Re: Having lots of FetchFailedException in join One really interesting is that when I'm using the netty-based spark.shuffle.blockTransferService, there's no OOM error messages (java.lang.OutOfMemoryError: Java heap space). Any idea why it's not here? I'm using Spark 1.2.1. Jianshi On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM errors, I'm doing a big join operation. 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 6207) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32
Re: Having lots of FetchFailedException in join
One really interesting is that when I'm using the netty-based spark.shuffle.blockTransferService, there's no OOM error messages (java.lang.OutOfMemoryError: Java heap space). Any idea why it's not here? I'm using Spark 1.2.1. Jianshi On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM errors, I'm doing a big join operation. 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 6207) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) Is join/cogroup still memory bound? Jianshi On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hmm... ok, previous errors are still block fetch errors. 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning fetch of 11 outstanding blocks java.io.IOException: Failed to connect to host-/:55597 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87) at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:289) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at
Re: Having lots of FetchFailedException in join
There're some skew. 6461640SUCCESSPROCESS_LOCAL200 / 2015/03/04 23:45:471.1 min6 s198.6 MB21.1 GB240.8 MB5961590SUCCESSPROCESS_LOCAL30 / 2015/03/04 23:45:4744 s5 s200.7 MB4.8 GB154.0 MB But I expect this kind of skewness to be quite common. Jianshi On Thu, Mar 5, 2015 at 3:48 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I see. I'm using core's join. The data might have some skewness (checking). I understand shuffle can spill data to disk but when consuming it, say in cogroup or groupByKey, it still needs to read the whole group elements, right? I guess OOM happened there when reading very large groups. Jianshi On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai saisai.s...@intel.com wrote: I think what you could do is to monitor through web UI to see if there’s any skew or other symptoms in shuffle write and read. For GC you could use the below configuration as you mentioned. From Spark core side, all the shuffle related operations can spill the data into disk and no need to read the whole partition into memory. But if you uses SparkSQL, it depends on how SparkSQL uses this operators. CC @hao if he has some thoughts on it. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 3:28 PM *To:* Shao, Saisai *Cc:* user *Subject:* Re: Having lots of FetchFailedException in join Hi Saisai, What's your suggested settings on monitoring shuffle? I've enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging. I found SPARK-3461 (Support external groupByKey using repartitionAndSortWithinPartitions) want to make groupByKey using external storage. It's still open status. Does that mean now groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the group as a whole during consuming? How can I deal with the key skewness in joins? Is there a skew-join implementation? Jianshi On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Jianshi, From my understanding, it may not be the problem of NIO or Netty, looking at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), theoretically EAOM can spill the data into disk if memory is not enough, but there might some issues when join key is skewed or key number is smaller, so you will meet OOM. Maybe you could monitor each stage or task’s shuffle and GC status also system status to identify the problem. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 2:32 PM *To:* Aaron Davidson *Cc:* user *Subject:* Re: Having lots of FetchFailedException in join One really interesting is that when I'm using the netty-based spark.shuffle.blockTransferService, there's no OOM error messages (java.lang.OutOfMemoryError: Java heap space). Any idea why it's not here? I'm using Spark 1.2.1. Jianshi On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM errors, I'm doing a big join operation. 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 6207) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator
RE: Having lots of FetchFailedException in join
Yes, if one key has too many values, there still has a chance to meet the OOM. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.com] Sent: Thursday, March 5, 2015 3:49 PM To: Shao, Saisai Cc: Cheng, Hao; user Subject: Re: Having lots of FetchFailedException in join I see. I'm using core's join. The data might have some skewness (checking). I understand shuffle can spill data to disk but when consuming it, say in cogroup or groupByKey, it still needs to read the whole group elements, right? I guess OOM happened there when reading very large groups. Jianshi On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: I think what you could do is to monitor through web UI to see if there’s any skew or other symptoms in shuffle write and read. For GC you could use the below configuration as you mentioned. From Spark core side, all the shuffle related operations can spill the data into disk and no need to read the whole partition into memory. But if you uses SparkSQL, it depends on how SparkSQL uses this operators. CC @hao if he has some thoughts on it. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com] Sent: Thursday, March 5, 2015 3:28 PM To: Shao, Saisai Cc: user Subject: Re: Having lots of FetchFailedException in join Hi Saisai, What's your suggested settings on monitoring shuffle? I've enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging. I found SPARK-3461 (Support external groupByKey using repartitionAndSortWithinPartitions) want to make groupByKey using external storage. It's still open status. Does that mean now groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the group as a whole during consuming? How can I deal with the key skewness in joins? Is there a skew-join implementation? Jianshi On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi Jianshi, From my understanding, it may not be the problem of NIO or Netty, looking at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), theoretically EAOM can spill the data into disk if memory is not enough, but there might some issues when join key is skewed or key number is smaller, so you will meet OOM. Maybe you could monitor each stage or task’s shuffle and GC status also system status to identify the problem. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com] Sent: Thursday, March 5, 2015 2:32 PM To: Aaron Davidson Cc: user Subject: Re: Having lots of FetchFailedException in join One really interesting is that when I'm using the netty-based spark.shuffle.blockTransferService, there's no OOM error messages (java.lang.OutOfMemoryError: Java heap space). Any idea why it's not here? I'm using Spark 1.2.1. Jianshi On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote: I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM errors, I'm doing a big join operation. 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 6207) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247
RE: Having lots of FetchFailedException in join
Hi Jianshi, From my understanding, it may not be the problem of NIO or Netty, looking at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), theoretically EAOM can spill the data into disk if memory is not enough, but there might some issues when join key is skewed or key number is smaller, so you will meet OOM. Maybe you could monitor each stage or task’s shuffle and GC status also system status to identify the problem. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.com] Sent: Thursday, March 5, 2015 2:32 PM To: Aaron Davidson Cc: user Subject: Re: Having lots of FetchFailedException in join One really interesting is that when I'm using the netty-based spark.shuffle.blockTransferService, there's no OOM error messages (java.lang.OutOfMemoryError: Java heap space). Any idea why it's not here? I'm using Spark 1.2.1. Jianshi On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote: I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM errors, I'm doing a big join operation. 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 6207) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) Is join/cogroup still memory bound? Jianshi On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote: Hmm... ok, previous errors are still block fetch errors. 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning fetch of 11 outstanding blocks java.io.IOException: Failed to connect to host-/:55597 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87
Re: Having lots of FetchFailedException in join
Hi Saisai, What's your suggested settings on monitoring shuffle? I've enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging. I found SPARK-3461 (Support external groupByKey using repartitionAndSortWithinPartitions) want to make groupByKey using external storage. It's still open status. Does that mean now groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the group as a whole during consuming? How can I deal with the key skewness in joins? Is there a skew-join implementation? Jianshi On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Jianshi, From my understanding, it may not be the problem of NIO or Netty, looking at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), theoretically EAOM can spill the data into disk if memory is not enough, but there might some issues when join key is skewed or key number is smaller, so you will meet OOM. Maybe you could monitor each stage or task’s shuffle and GC status also system status to identify the problem. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 2:32 PM *To:* Aaron Davidson *Cc:* user *Subject:* Re: Having lots of FetchFailedException in join One really interesting is that when I'm using the netty-based spark.shuffle.blockTransferService, there's no OOM error messages (java.lang.OutOfMemoryError: Java heap space). Any idea why it's not here? I'm using Spark 1.2.1. Jianshi On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM errors, I'm doing a big join operation. 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 6207) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) Is join/cogroup still memory bound? Jianshi On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hmm... ok, previous errors are still block fetch errors. 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning fetch of 11 outstanding blocks java.io.IOException: Failed to connect to host-/:55597
RE: Having lots of FetchFailedException in join
I think what you could do is to monitor through web UI to see if there’s any skew or other symptoms in shuffle write and read. For GC you could use the below configuration as you mentioned. From Spark core side, all the shuffle related operations can spill the data into disk and no need to read the whole partition into memory. But if you uses SparkSQL, it depends on how SparkSQL uses this operators. CC @hao if he has some thoughts on it. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.com] Sent: Thursday, March 5, 2015 3:28 PM To: Shao, Saisai Cc: user Subject: Re: Having lots of FetchFailedException in join Hi Saisai, What's your suggested settings on monitoring shuffle? I've enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging. I found SPARK-3461 (Support external groupByKey using repartitionAndSortWithinPartitions) want to make groupByKey using external storage. It's still open status. Does that mean now groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the group as a whole during consuming? How can I deal with the key skewness in joins? Is there a skew-join implementation? Jianshi On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi Jianshi, From my understanding, it may not be the problem of NIO or Netty, looking at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), theoretically EAOM can spill the data into disk if memory is not enough, but there might some issues when join key is skewed or key number is smaller, so you will meet OOM. Maybe you could monitor each stage or task’s shuffle and GC status also system status to identify the problem. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com] Sent: Thursday, March 5, 2015 2:32 PM To: Aaron Davidson Cc: user Subject: Re: Having lots of FetchFailedException in join One really interesting is that when I'm using the netty-based spark.shuffle.blockTransferService, there's no OOM error messages (java.lang.OutOfMemoryError: Java heap space). Any idea why it's not here? I'm using Spark 1.2.1. Jianshi On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote: I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM errors, I'm doing a big join operation. 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 6207) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31
Re: Having lots of FetchFailedException in join
I see. I'm using core's join. The data might have some skewness (checking). I understand shuffle can spill data to disk but when consuming it, say in cogroup or groupByKey, it still needs to read the whole group elements, right? I guess OOM happened there when reading very large groups. Jianshi On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai saisai.s...@intel.com wrote: I think what you could do is to monitor through web UI to see if there’s any skew or other symptoms in shuffle write and read. For GC you could use the below configuration as you mentioned. From Spark core side, all the shuffle related operations can spill the data into disk and no need to read the whole partition into memory. But if you uses SparkSQL, it depends on how SparkSQL uses this operators. CC @hao if he has some thoughts on it. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 3:28 PM *To:* Shao, Saisai *Cc:* user *Subject:* Re: Having lots of FetchFailedException in join Hi Saisai, What's your suggested settings on monitoring shuffle? I've enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging. I found SPARK-3461 (Support external groupByKey using repartitionAndSortWithinPartitions) want to make groupByKey using external storage. It's still open status. Does that mean now groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the group as a whole during consuming? How can I deal with the key skewness in joins? Is there a skew-join implementation? Jianshi On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Jianshi, From my understanding, it may not be the problem of NIO or Netty, looking at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), theoretically EAOM can spill the data into disk if memory is not enough, but there might some issues when join key is skewed or key number is smaller, so you will meet OOM. Maybe you could monitor each stage or task’s shuffle and GC status also system status to identify the problem. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 2:32 PM *To:* Aaron Davidson *Cc:* user *Subject:* Re: Having lots of FetchFailedException in join One really interesting is that when I'm using the netty-based spark.shuffle.blockTransferService, there's no OOM error messages (java.lang.OutOfMemoryError: Java heap space). Any idea why it's not here? I'm using Spark 1.2.1. Jianshi On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM errors, I'm doing a big join operation. 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 6207) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31
Re: Having lots of FetchFailedException in join
I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM errors, I'm doing a big join operation. 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 6207) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) Is join/cogroup still memory bound? Jianshi On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hmm... ok, previous errors are still block fetch errors. 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning fetch of 11 outstanding blocks java.io.IOException: Failed to connect to host-/:55597 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87) at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:289) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at
Re: Having lots of FetchFailedException in join
Failed to connect implies that the executor at that host died, please check its logs as well. On Tue, Mar 3, 2015 at 11:03 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Sorry that I forgot the subject. And in the driver, I got many FetchFailedException. The error messages are 15/03/03 10:34:32 WARN TaskSetManager: Lost task 31.0 in stage 2.2 (TID 7943, ): FetchFailed(BlockManagerId(86, , 43070), shuffleId=0, mapId=24, reduceId=1220, message= org.apache.spark.shuffle.FetchFailedException: Failed to connect to /:43070 at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) Jianshi On Wed, Mar 4, 2015 at 2:55 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I got this error message: 15/03/03 10:22:41 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) And then for the same index file and executor, I got the following errors multiple times 15/03/03 10:22:41 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) from host-:39534 java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) 15/03/03 10:22:41 ERROR RetryingBlockFetcher: Failed to fetch block shuffle_0_13_1228, and will not retry (0 retries) java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) ... Caused by: java.net.ConnectException: Connection refused: host- What's the problem? BTW, I'm using Spark 1.2.1-SNAPSHOT I built around Dec. 20. Is there any bug fixes related to shuffle block fetching or index files after that? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Having lots of FetchFailedException in join
Sorry that I forgot the subject. And in the driver, I got many FetchFailedException. The error messages are 15/03/03 10:34:32 WARN TaskSetManager: Lost task 31.0 in stage 2.2 (TID 7943, ): FetchFailed(BlockManagerId(86, , 43070), shuffleId=0, mapId=24, reduceId=1220, message= org.apache.spark.shuffle.FetchFailedException: Failed to connect to /:43070 at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) Jianshi On Wed, Mar 4, 2015 at 2:55 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I got this error message: 15/03/03 10:22:41 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) And then for the same index file and executor, I got the following errors multiple times 15/03/03 10:22:41 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) from host-:39534 java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) 15/03/03 10:22:41 ERROR RetryingBlockFetcher: Failed to fetch block shuffle_0_13_1228, and will not retry (0 retries) java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) ... Caused by: java.net.ConnectException: Connection refused: host- What's the problem? BTW, I'm using Spark 1.2.1-SNAPSHOT I built around Dec. 20. Is there any bug fixes related to shuffle block fetching or index files after that? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Having lots of FetchFailedException in join
Hmm... ok, previous errors are still block fetch errors. 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning fetch of 11 outstanding blocks java.io.IOException: Failed to connect to host-/:55597 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87) at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:289) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) Caused by: java.net.ConnectException: Connection refused: lvshdc5dn0518.lvs.paypal.com/10.196.244.48:55597 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) And I checked executor on container host-, everything is good. Jianshi On Wed, Mar 4, 2015 at 12:28 PM, Aaron Davidson ilike...@gmail.com wrote: Drat! That doesn't help. Could you scan from the top to see if there were any fatal errors preceding these? Sometimes a OOM will cause this type of issue further down. On Tue, Mar 3, 2015 at 8:16 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: The failed executor has the following error messages. Any hints? 15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 5711039715419258699
Re: Having lots of FetchFailedException in join
Drat! That doesn't help. Could you scan from the top to see if there were any fatal errors preceding these? Sometimes a OOM will cause this type of issue further down. On Tue, Mar 3, 2015 at 8:16 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: The failed executor has the following error messages. Any hints? 15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 5711039715419258699 java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:724) 15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 7941985280808455530 java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at
Re: Having lots of FetchFailedException in join
The failed executor has the following error messages. Any hints? 15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 5711039715419258699 java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:724) 15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 7941985280808455530 java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at