Same code of yours works for me as well On Fri, Jun 26, 2015 at 8:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote:
> Is that its not supported with Avro. Unlikely. > > On Fri, Jun 26, 2015 at 8:01 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> > wrote: > >> My imports: >> >> import org.apache.avro.generic.GenericData >> >> import org.apache.avro.generic.GenericRecord >> >> import org.apache.avro.mapred.AvroKey >> >> import org.apache.avro.Schema >> >> import org.apache.hadoop.io.NullWritable >> >> import org.apache.avro.mapreduce.AvroKeyInputFormat >> >> import org.apache.hadoop.conf.Configuration >> >> import org.apache.hadoop.fs.FileSystem >> >> import org.apache.hadoop.fs.Path >> >> import org.apache.hadoop.io.Text >> >> >> def readGenericRecords(sc: SparkContext, inputDir: String, startDate: >> Date, endDate: Date) = { >> >> val path = getInputPaths(inputDir, startDate, endDate) >> >> val hadoopConfiguration = new Configuration(sc.hadoopConfiguration) >> >> hadoopConfiguration.set( >> "mapreduce.input.fileinputformat.split.maxsize", "67108864") >> >> sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, >> AvroKeyInputFormat[GenericRecord]](path + "/*.avro") >> >> } >> >> I need to read Avro datasets and am using strings instead of constant >> from InputFormat class. >> >> >> When i click on any hadoop dependency from eclipse, i see they point to >> hadoop 2.2.x jars. >> >> >> >> On Fri, Jun 26, 2015 at 7:44 AM, Silvio Fiorito < >> silvio.fior...@granturing.com> wrote: >> >>> Make sure you’re importing the right namespace for Hadoop v2.0. This >>> is what I tried: >>> >>> import org.apache.hadoop.io.{LongWritable, Text} >>> import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, >>> TextInputFormat} >>> >>> val hadoopConf = new org.apache.hadoop.conf.Configuration() >>> hadoopConf.setLong(FileInputFormat.SPLIT_MAXSIZE, 2048) >>> >>> val input = sc.newAPIHadoopFile( >>> "README.md", >>> classOf[TextInputFormat], >>> classOf[LongWritable], >>> classOf[Text], >>> hadoopConf).map(_._2.toString()) >>> >>> println(input.partitions.size) >>> >>> input. >>> flatMap(_.split(" ")). >>> filter(_.length > 0). >>> map((_, 1)). >>> reduceByKey(_ + _). >>> coalesce(1). >>> sortBy(_._2, false). >>> take(10). >>> foreach(println) >>> >>> >>> From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" >>> Date: Friday, June 26, 2015 at 10:18 AM >>> To: Silvio Fiorito >>> Cc: user >>> Subject: Re: >>> >>> All these throw compilation error at newAPIHadoopFile >>> >>> 1) >>> >>> val hadoopConfiguration = new Configuration() >>> >>> hadoopConfiguration.set( >>> "mapreduce.input.fileinputformat.split.maxsize", "67108864") >>> >>> sc.newAPIHadoopFile[AvroKey, NullWritable, AvroKeyInputFormat](path >>> + "/*.avro", classOf[AvroKey], classOf[NullWritable], >>> classOf[AvroKeyInputFormat], hadoopConfiguration) >>> >>> 2) >>> >>> val hadoopConfiguration = new Configuration() >>> >>> hadoopConfiguration.set( >>> "mapreduce.input.fileinputformat.split.maxsize", "67108864") >>> >>> sc.newAPIHadoopFile[AvroKey, NullWritable, AvroKeyInputFormat](path >>> + "/*.avro", classOf[AvroKey[GenericRecord]], classOf[NullWritable], >>> classOf[AvroKeyInputFormat[GenericRecord]],hadoopConfiguration) >>> >>> 3) >>> >>> val hadoopConfiguration = new Configuration() >>> >>> hadoopConfiguration.set( >>> "mapreduce.input.fileinputformat.split.maxsize", "67108864") >>> >>> sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, >>> AvroKeyInputFormat[GenericRecord]](path + "/*.avro", >>> classOf[AvroKey[GenericRecord]], classOf[NullWritable], >>> classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration) >>> >>> Error: >>> >>> [ERROR] >>> /Users/dvasthimal/ebay/projects/ep/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/util/DataUtil.scala:37: >>> error: overloaded method value newAPIHadoopFile with alternatives: >>> >>> [INFO] (path: String,fClass: >>> Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],kClass: >>> Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],vClass: >>> Class[org.apache.hadoop.io.NullWritable],conf: >>> org.apache.hadoop.conf.Configuration)org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord], >>> org.apache.hadoop.io.NullWritable)] <and> >>> >>> [INFO] (path: String)(implicit km: >>> scala.reflect.ClassTag[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]], >>> implicit vm: scala.reflect.ClassTag[org.apache.hadoop.io.NullWritable], >>> implicit fm: >>> scala.reflect.ClassTag[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]])org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord], >>> org.apache.hadoop.io.NullWritable)] >>> >>> [INFO] cannot be applied to (String, >>> Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]], >>> Class[org.apache.hadoop.io.NullWritable], >>> Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]], >>> org.apache.hadoop.conf.Configuration) >>> >>> [INFO] sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, >>> AvroKeyInputFormat[GenericRecord]](path + "/*.avro", >>> classOf[AvroKey[GenericRecord]], classOf[NullWritable], >>> classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration) >>> >>> >>> >>> On Thu, Jun 25, 2015 at 4:14 PM, Silvio Fiorito < >>> silvio.fior...@granturing.com> wrote: >>> >>>> Ok, in that case I think you can set the max split size in the Hadoop >>>> config object, using the FileInputFormat.SPLIT_MAXSIZE config parameter. >>>> >>>> Again, I haven’t done this myself, but looking through the Spark >>>> codebase here: >>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1053 >>>> >>>> And the HDFS FileInputFormat implementation, that seems like a good >>>> option to try. >>>> >>>> You should be able to call >>>> conf.setLong(FileInputFormat.SPLIT_MAXSIZE, max). >>>> >>>> I hope that helps! >>>> >>>> From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" >>>> Date: Thursday, June 25, 2015 at 5:49 PM >>>> To: Silvio Fiorito >>>> Cc: user >>>> Subject: Re: >>>> >>>> I use >>>> >>>> sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, >>>> AvroKeyInputFormat[GenericRecord]](path + "/*.avro") >>>> >>>> >>>> >>>> https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/SparkContext.html#newAPIHadoopFile(java.lang.String, >>>> java.lang.Class, java.lang.Class, java.lang.Class, >>>> org.apache.hadoop.conf.Configuration) >>>> >>>> Does not seem to have that partition option. >>>> >>>> On Thu, Jun 25, 2015 at 12:24 PM, Silvio Fiorito < >>>> silvio.fior...@granturing.com> wrote: >>>> >>>>> Hi Deepak, >>>>> >>>>> Have you tried specifying the minimum partitions when you load the >>>>> file? I haven’t tried that myself against HDFS before, so I’m not sure if >>>>> it will affect data locality. Ideally not, it should still maintain data >>>>> locality but just more partitions. Once your job runs, you can check in >>>>> the >>>>> Spark tasks web UI to ensure they’re all Node local. >>>>> >>>>> val details = sc.textFile(“hdfs://….”, 500) >>>>> >>>>> If you’re using something other than text file you can also specify >>>>> minimum partitions when using sc.hadoopFile. >>>>> >>>>> Thanks, >>>>> Silvio >>>>> >>>>> From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" >>>>> Date: Thursday, June 25, 2015 at 3:10 PM >>>>> To: Akhil Das >>>>> Cc: user >>>>> Subject: Re: >>>>> >>>>> How can i increase the number of tasks from 174 to 500 without >>>>> running repartition. >>>>> >>>>> The input size is 512.0 MB (hadoop) / 4159106. Can this be reduced >>>>> to 64 MB so as to increase the number of tasks. Similar to split size that >>>>> increases the number of mappers in Hadoop M/R. >>>>> >>>>> On Thu, Jun 25, 2015 at 12:06 AM, Akhil Das < >>>>> ak...@sigmoidanalytics.com> wrote: >>>>> >>>>>> Look in the tuning section >>>>>> <https://spark.apache.org/docs/latest/tuning.html>, also you need to >>>>>> figure out whats taking time and where's your bottleneck etc. If >>>>>> everything >>>>>> is tuned properly, then you will need to throw more cores :) >>>>>> >>>>>> Thanks >>>>>> Best Regards >>>>>> >>>>>> On Thu, Jun 25, 2015 at 12:19 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Its taking an hour and on Hadoop it takes 1h 30m, is there a way to >>>>>>> make it run faster ? >>>>>>> >>>>>>> On Wed, Jun 24, 2015 at 11:39 AM, Akhil Das < >>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>> >>>>>>>> Cool. :) >>>>>>>> On 24 Jun 2015 23:44, "ÐΞ€ρ@Ҝ (๏̯͡๏)" <deepuj...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Its running now. >>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2015 at 10:45 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Now running with >>>>>>>>>> >>>>>>>>>> *--num-executors 9973 --driver-memory 14g --driver-java-options >>>>>>>>>> "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M" --executor-memory 14g >>>>>>>>>> --executor-cores 1* >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Jun 24, 2015 at 10:34 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> There are multiple of these >>>>>>>>>>> >>>>>>>>>>> 1) >>>>>>>>>>> 15/06/24 09:53:37 ERROR executor.Executor: Exception in task >>>>>>>>>>> 443.0 in stage 3.0 (TID 1767) >>>>>>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>>>>>> at >>>>>>>>>>> sun.reflect.GeneratedSerializationConstructorAccessor1327.newInstance(Unknown >>>>>>>>>>> Source) >>>>>>>>>>> at >>>>>>>>>>> java.lang.reflect.Constructor.newInstance(Constructor.java:526) >>>>>>>>>>> at >>>>>>>>>>> org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:56) >>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065) >>>>>>>>>>> at >>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228) >>>>>>>>>>> at >>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217) >>>>>>>>>>> at >>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>>>>> at >>>>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134) >>>>>>>>>>> at >>>>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17) >>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) >>>>>>>>>>> at >>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>>>>>>>>>> at >>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) >>>>>>>>>>> at >>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>>>>>>>>>> at >>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) >>>>>>>>>>> at >>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>>>>>>>>>> at >>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>>>>>> at >>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>>>>> at >>>>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42) >>>>>>>>>>> at >>>>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) >>>>>>>>>>> at >>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>>>>>>>> at >>>>>>>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>>>>>>>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) >>>>>>>>>>> 15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on >>>>>>>>>>> LARS? timer thread >>>>>>>>>>> >>>>>>>>>>> 2) >>>>>>>>>>> 15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on >>>>>>>>>>> LARS? timer thread >>>>>>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>>>>>> at >>>>>>>>>>> akka.dispatch.AbstractNodeQueue.<init>(AbstractNodeQueue.java:22) >>>>>>>>>>> at >>>>>>>>>>> akka.actor.LightArrayRevolverScheduler$TaskQueue.<init>(Scheduler.scala:443) >>>>>>>>>>> at >>>>>>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:409) >>>>>>>>>>> at >>>>>>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) >>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>>> 3) >>>>>>>>>>> # java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>>>>>> # -XX:OnOutOfMemoryError="kill %p" >>>>>>>>>>> # Executing /bin/sh -c "kill 20674"... >>>>>>>>>>> [ERROR] [06/24/2015 09:53:37.590] [Executor task launch >>>>>>>>>>> worker-5] [akka.tcp:// >>>>>>>>>>> sparkdri...@phxdpehdc9dn2137.stratus.phx.ebay.com:47708/] >>>>>>>>>>> swallowing exception during message send >>>>>>>>>>> (akka.remote.RemoteTransportExceptionNoStackTrace) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Jun 24, 2015 at 10:31 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> I see this >>>>>>>>>>>> >>>>>>>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>>>>>>> at java.util.Arrays.copyOfRange(Arrays.java:2694) >>>>>>>>>>>> at java.lang.String.<init>(String.java:203) >>>>>>>>>>>> at java.lang.StringBuilder.toString(StringBuilder.java:405) >>>>>>>>>>>> at java.io.UnixFileSystem.resolve(UnixFileSystem.java:108) >>>>>>>>>>>> at java.io.File.<init>(File.java:367) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:81) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:84) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.shuffle.IndexShuffleBlockManager.getIndexFile(IndexShuffleBlockManager.scala:60) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:107) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:304) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >>>>>>>>>>>> at >>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>>>>>>>>>>> at >>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>>>>>>>>>>> at >>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>>>>>>>>>>> at >>>>>>>>>>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >>>>>>>>>>>> at >>>>>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >>>>>>>>>>>> at >>>>>>>>>>>> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) >>>>>>>>>>>> at >>>>>>>>>>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >>>>>>>>>>>> at >>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>>>>> at >>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >>>>>>>>>>>> at >>>>>>>>>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) >>>>>>>>>>>> at >>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>>>>> at >>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >>>>>>>>>>>> at >>>>>>>>>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) >>>>>>>>>>>> at >>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Jun 24, 2015 at 7:16 AM, Akhil Das < >>>>>>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Can you look a bit more in the error logs? It could be >>>>>>>>>>>>> getting killed because of OOM etc. One thing you can try is to >>>>>>>>>>>>> set the >>>>>>>>>>>>> spark.shuffle.blockTransferService to nio from netty. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> Best Regards >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Jun 24, 2015 at 5:46 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> I have a Spark job that has 7 stages. The first 3 stage >>>>>>>>>>>>>> complete and the fourth stage beings (joins two RDDs). This >>>>>>>>>>>>>> stage has >>>>>>>>>>>>>> multiple task failures all the below exception. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Multiple tasks (100s) of them get the same exception with >>>>>>>>>>>>>> different hosts. How can all the host suddenly stop responding >>>>>>>>>>>>>> when few >>>>>>>>>>>>>> moments ago 3 stages ran successfully. If I re-run the three >>>>>>>>>>>>>> stages will >>>>>>>>>>>>>> again run successfully. I cannot think of it being a cluster >>>>>>>>>>>>>> issue. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Any suggestions ? >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Spark Version : 1.3.1 >>>>>>>>>>>>>> >>>>>>>>>>>>>> Exception: >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.spark.shuffle.FetchFailedException: Failed to connect >>>>>>>>>>>>>> to HOST >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) >>>>>>>>>>>>>> at org.apache.sp >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> Deepak >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Deepak >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Deepak >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Deepak >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Deepak >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Deepak >>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Deepak >>>>> >>>>> >>>> >>>> >>>> -- >>>> Deepak >>>> >>>> >>> >>> >>> -- >>> Deepak >>> >>> >> >> >> -- >> Deepak >> >> > > > -- > Deepak > > -- Deepak