<dependency> <groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId> <version>1.7.7</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-avro_2.10</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-mapred</artifactId> <version>1.7.7</version> <classifier>hadoop2</classifier> <scope>provided</scope> </dependency> On Fri, Jun 26, 2015 at 8:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote: > Same code of yours works for me as well > > On Fri, Jun 26, 2015 at 8:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> > wrote: > >> Is that its not supported with Avro. Unlikely. >> >> On Fri, Jun 26, 2015 at 8:01 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >> wrote: >> >>> My imports: >>> >>> import org.apache.avro.generic.GenericData >>> >>> import org.apache.avro.generic.GenericRecord >>> >>> import org.apache.avro.mapred.AvroKey >>> >>> import org.apache.avro.Schema >>> >>> import org.apache.hadoop.io.NullWritable >>> >>> import org.apache.avro.mapreduce.AvroKeyInputFormat >>> >>> import org.apache.hadoop.conf.Configuration >>> >>> import org.apache.hadoop.fs.FileSystem >>> >>> import org.apache.hadoop.fs.Path >>> >>> import org.apache.hadoop.io.Text >>> >>> >>> def readGenericRecords(sc: SparkContext, inputDir: String, startDate: >>> Date, endDate: Date) = { >>> >>> val path = getInputPaths(inputDir, startDate, endDate) >>> >>> val hadoopConfiguration = new Configuration(sc.hadoopConfiguration) >>> >>> hadoopConfiguration.set( >>> "mapreduce.input.fileinputformat.split.maxsize", "67108864") >>> >>> sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, >>> AvroKeyInputFormat[GenericRecord]](path + "/*.avro") >>> >>> } >>> >>> I need to read Avro datasets and am using strings instead of constant >>> from InputFormat class. >>> >>> >>> When i click on any hadoop dependency from eclipse, i see they point to >>> hadoop 2.2.x jars. >>> >>> >>> >>> On Fri, Jun 26, 2015 at 7:44 AM, Silvio Fiorito < >>> silvio.fior...@granturing.com> wrote: >>> >>>> Make sure you’re importing the right namespace for Hadoop v2.0. This >>>> is what I tried: >>>> >>>> import org.apache.hadoop.io.{LongWritable, Text} >>>> import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, >>>> TextInputFormat} >>>> >>>> val hadoopConf = new org.apache.hadoop.conf.Configuration() >>>> hadoopConf.setLong(FileInputFormat.SPLIT_MAXSIZE, 2048) >>>> >>>> val input = sc.newAPIHadoopFile( >>>> "README.md", >>>> classOf[TextInputFormat], >>>> classOf[LongWritable], >>>> classOf[Text], >>>> hadoopConf).map(_._2.toString()) >>>> >>>> println(input.partitions.size) >>>> >>>> input. >>>> flatMap(_.split(" ")). >>>> filter(_.length > 0). >>>> map((_, 1)). >>>> reduceByKey(_ + _). >>>> coalesce(1). >>>> sortBy(_._2, false). >>>> take(10). >>>> foreach(println) >>>> >>>> >>>> From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" >>>> Date: Friday, June 26, 2015 at 10:18 AM >>>> To: Silvio Fiorito >>>> Cc: user >>>> Subject: Re: >>>> >>>> All these throw compilation error at newAPIHadoopFile >>>> >>>> 1) >>>> >>>> val hadoopConfiguration = new Configuration() >>>> >>>> hadoopConfiguration.set( >>>> "mapreduce.input.fileinputformat.split.maxsize", "67108864") >>>> >>>> sc.newAPIHadoopFile[AvroKey, NullWritable, >>>> AvroKeyInputFormat](path + "/*.avro", classOf[AvroKey], >>>> classOf[NullWritable], classOf[AvroKeyInputFormat], hadoopConfiguration) >>>> >>>> 2) >>>> >>>> val hadoopConfiguration = new Configuration() >>>> >>>> hadoopConfiguration.set( >>>> "mapreduce.input.fileinputformat.split.maxsize", "67108864") >>>> >>>> sc.newAPIHadoopFile[AvroKey, NullWritable, >>>> AvroKeyInputFormat](path + "/*.avro", classOf[AvroKey[GenericRecord]], >>>> classOf[NullWritable], >>>> classOf[AvroKeyInputFormat[GenericRecord]],hadoopConfiguration) >>>> >>>> 3) >>>> >>>> val hadoopConfiguration = new Configuration() >>>> >>>> hadoopConfiguration.set( >>>> "mapreduce.input.fileinputformat.split.maxsize", "67108864") >>>> >>>> sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, >>>> AvroKeyInputFormat[GenericRecord]](path + "/*.avro", >>>> classOf[AvroKey[GenericRecord]], classOf[NullWritable], >>>> classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration) >>>> >>>> Error: >>>> >>>> [ERROR] >>>> /Users/dvasthimal/ebay/projects/ep/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/util/DataUtil.scala:37: >>>> error: overloaded method value newAPIHadoopFile with alternatives: >>>> >>>> [INFO] (path: String,fClass: >>>> Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],kClass: >>>> Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],vClass: >>>> Class[org.apache.hadoop.io.NullWritable],conf: >>>> org.apache.hadoop.conf.Configuration)org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord], >>>> org.apache.hadoop.io.NullWritable)] <and> >>>> >>>> [INFO] (path: String)(implicit km: >>>> scala.reflect.ClassTag[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]], >>>> implicit vm: scala.reflect.ClassTag[org.apache.hadoop.io.NullWritable], >>>> implicit fm: >>>> scala.reflect.ClassTag[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]])org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord], >>>> org.apache.hadoop.io.NullWritable)] >>>> >>>> [INFO] cannot be applied to (String, >>>> Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]], >>>> Class[org.apache.hadoop.io.NullWritable], >>>> Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]], >>>> org.apache.hadoop.conf.Configuration) >>>> >>>> [INFO] sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, >>>> AvroKeyInputFormat[GenericRecord]](path + "/*.avro", >>>> classOf[AvroKey[GenericRecord]], classOf[NullWritable], >>>> classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration) >>>> >>>> >>>> >>>> On Thu, Jun 25, 2015 at 4:14 PM, Silvio Fiorito < >>>> silvio.fior...@granturing.com> wrote: >>>> >>>>> Ok, in that case I think you can set the max split size in the >>>>> Hadoop config object, using the FileInputFormat.SPLIT_MAXSIZE config >>>>> parameter. >>>>> >>>>> Again, I haven’t done this myself, but looking through the Spark >>>>> codebase here: >>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1053 >>>>> >>>>> And the HDFS FileInputFormat implementation, that seems like a good >>>>> option to try. >>>>> >>>>> You should be able to call >>>>> conf.setLong(FileInputFormat.SPLIT_MAXSIZE, max). >>>>> >>>>> I hope that helps! >>>>> >>>>> From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" >>>>> Date: Thursday, June 25, 2015 at 5:49 PM >>>>> To: Silvio Fiorito >>>>> Cc: user >>>>> Subject: Re: >>>>> >>>>> I use >>>>> >>>>> sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, >>>>> AvroKeyInputFormat[GenericRecord]](path + "/*.avro") >>>>> >>>>> >>>>> >>>>> https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/SparkContext.html#newAPIHadoopFile(java.lang.String, >>>>> java.lang.Class, java.lang.Class, java.lang.Class, >>>>> org.apache.hadoop.conf.Configuration) >>>>> >>>>> Does not seem to have that partition option. >>>>> >>>>> On Thu, Jun 25, 2015 at 12:24 PM, Silvio Fiorito < >>>>> silvio.fior...@granturing.com> wrote: >>>>> >>>>>> Hi Deepak, >>>>>> >>>>>> Have you tried specifying the minimum partitions when you load the >>>>>> file? I haven’t tried that myself against HDFS before, so I’m not sure if >>>>>> it will affect data locality. Ideally not, it should still maintain data >>>>>> locality but just more partitions. Once your job runs, you can check in >>>>>> the >>>>>> Spark tasks web UI to ensure they’re all Node local. >>>>>> >>>>>> val details = sc.textFile(“hdfs://….”, 500) >>>>>> >>>>>> If you’re using something other than text file you can also specify >>>>>> minimum partitions when using sc.hadoopFile. >>>>>> >>>>>> Thanks, >>>>>> Silvio >>>>>> >>>>>> From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" >>>>>> Date: Thursday, June 25, 2015 at 3:10 PM >>>>>> To: Akhil Das >>>>>> Cc: user >>>>>> Subject: Re: >>>>>> >>>>>> How can i increase the number of tasks from 174 to 500 without >>>>>> running repartition. >>>>>> >>>>>> The input size is 512.0 MB (hadoop) / 4159106. Can this be reduced >>>>>> to 64 MB so as to increase the number of tasks. Similar to split size >>>>>> that >>>>>> increases the number of mappers in Hadoop M/R. >>>>>> >>>>>> On Thu, Jun 25, 2015 at 12:06 AM, Akhil Das < >>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>> >>>>>>> Look in the tuning section >>>>>>> <https://spark.apache.org/docs/latest/tuning.html>, also you need >>>>>>> to figure out whats taking time and where's your bottleneck etc. If >>>>>>> everything is tuned properly, then you will need to throw more cores :) >>>>>>> >>>>>>> Thanks >>>>>>> Best Regards >>>>>>> >>>>>>> On Thu, Jun 25, 2015 at 12:19 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com >>>>>>> > wrote: >>>>>>> >>>>>>>> Its taking an hour and on Hadoop it takes 1h 30m, is there a way to >>>>>>>> make it run faster ? >>>>>>>> >>>>>>>> On Wed, Jun 24, 2015 at 11:39 AM, Akhil Das < >>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>> >>>>>>>>> Cool. :) >>>>>>>>> On 24 Jun 2015 23:44, "ÐΞ€ρ@Ҝ (๏̯͡๏)" <deepuj...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Its running now. >>>>>>>>>> >>>>>>>>>> On Wed, Jun 24, 2015 at 10:45 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Now running with >>>>>>>>>>> >>>>>>>>>>> *--num-executors 9973 --driver-memory 14g >>>>>>>>>>> --driver-java-options "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M" >>>>>>>>>>> --executor-memory 14g --executor-cores 1* >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Jun 24, 2015 at 10:34 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> There are multiple of these >>>>>>>>>>>> >>>>>>>>>>>> 1) >>>>>>>>>>>> 15/06/24 09:53:37 ERROR executor.Executor: Exception in task >>>>>>>>>>>> 443.0 in stage 3.0 (TID 1767) >>>>>>>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>>>>>>> at >>>>>>>>>>>> sun.reflect.GeneratedSerializationConstructorAccessor1327.newInstance(Unknown >>>>>>>>>>>> Source) >>>>>>>>>>>> at >>>>>>>>>>>> java.lang.reflect.Constructor.newInstance(Constructor.java:526) >>>>>>>>>>>> at >>>>>>>>>>>> org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:56) >>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065) >>>>>>>>>>>> at >>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228) >>>>>>>>>>>> at >>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217) >>>>>>>>>>>> at >>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>>>>>> at >>>>>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134) >>>>>>>>>>>> at >>>>>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17) >>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) >>>>>>>>>>>> at >>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>>>>>>>>>>> at >>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) >>>>>>>>>>>> at >>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>>>>>>>>>>> at >>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) >>>>>>>>>>>> at >>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>>>>>>>>>>> at >>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>>>>>>> at >>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>>>>>> at >>>>>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42) >>>>>>>>>>>> at >>>>>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) >>>>>>>>>>>> at >>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>>>>>>>>> at >>>>>>>>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>>>>>>>>>>> at >>>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>>>>> at >>>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) >>>>>>>>>>>> 15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on >>>>>>>>>>>> LARS? timer thread >>>>>>>>>>>> >>>>>>>>>>>> 2) >>>>>>>>>>>> 15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on >>>>>>>>>>>> LARS? timer thread >>>>>>>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>>>>>>> at >>>>>>>>>>>> akka.dispatch.AbstractNodeQueue.<init>(AbstractNodeQueue.java:22) >>>>>>>>>>>> at >>>>>>>>>>>> akka.actor.LightArrayRevolverScheduler$TaskQueue.<init>(Scheduler.scala:443) >>>>>>>>>>>> at >>>>>>>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:409) >>>>>>>>>>>> at >>>>>>>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) >>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>>>> 3) >>>>>>>>>>>> # java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>>>>>>> # -XX:OnOutOfMemoryError="kill %p" >>>>>>>>>>>> # Executing /bin/sh -c "kill 20674"... >>>>>>>>>>>> [ERROR] [06/24/2015 09:53:37.590] [Executor task launch >>>>>>>>>>>> worker-5] [akka.tcp:// >>>>>>>>>>>> sparkdri...@phxdpehdc9dn2137.stratus.phx.ebay.com:47708/] >>>>>>>>>>>> swallowing exception during message send >>>>>>>>>>>> (akka.remote.RemoteTransportExceptionNoStackTrace) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Jun 24, 2015 at 10:31 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I see this >>>>>>>>>>>>> >>>>>>>>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>>>>>>>> at java.util.Arrays.copyOfRange(Arrays.java:2694) >>>>>>>>>>>>> at java.lang.String.<init>(String.java:203) >>>>>>>>>>>>> at java.lang.StringBuilder.toString(StringBuilder.java:405) >>>>>>>>>>>>> at java.io.UnixFileSystem.resolve(UnixFileSystem.java:108) >>>>>>>>>>>>> at java.io.File.<init>(File.java:367) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:81) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:84) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.shuffle.IndexShuffleBlockManager.getIndexFile(IndexShuffleBlockManager.scala:60) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:107) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:304) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >>>>>>>>>>>>> at >>>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>>>>>>>>>>>> at >>>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>>>>>>>>>>>> at >>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>>>>>>>>>>>> at >>>>>>>>>>>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >>>>>>>>>>>>> at >>>>>>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >>>>>>>>>>>>> at >>>>>>>>>>>>> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) >>>>>>>>>>>>> at >>>>>>>>>>>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >>>>>>>>>>>>> at >>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>>>>>> at >>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >>>>>>>>>>>>> at >>>>>>>>>>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) >>>>>>>>>>>>> at >>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>>>>>> at >>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >>>>>>>>>>>>> at >>>>>>>>>>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) >>>>>>>>>>>>> at >>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Jun 24, 2015 at 7:16 AM, Akhil Das < >>>>>>>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Can you look a bit more in the error logs? It could be >>>>>>>>>>>>>> getting killed because of OOM etc. One thing you can try is to >>>>>>>>>>>>>> set the >>>>>>>>>>>>>> spark.shuffle.blockTransferService to nio from netty. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Jun 24, 2015 at 5:46 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> I have a Spark job that has 7 stages. The first 3 stage >>>>>>>>>>>>>>> complete and the fourth stage beings (joins two RDDs). This >>>>>>>>>>>>>>> stage has >>>>>>>>>>>>>>> multiple task failures all the below exception. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Multiple tasks (100s) of them get the same exception with >>>>>>>>>>>>>>> different hosts. How can all the host suddenly stop responding >>>>>>>>>>>>>>> when few >>>>>>>>>>>>>>> moments ago 3 stages ran successfully. If I re-run the three >>>>>>>>>>>>>>> stages will >>>>>>>>>>>>>>> again run successfully. I cannot think of it being a cluster >>>>>>>>>>>>>>> issue. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Any suggestions ? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Spark Version : 1.3.1 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Exception: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> org.apache.spark.shuffle.FetchFailedException: Failed to >>>>>>>>>>>>>>> connect to HOST >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) >>>>>>>>>>>>>>> at org.apache.sp >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Deepak >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Deepak >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Deepak >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Deepak >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Deepak >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Deepak >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Deepak >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Deepak >>>>> >>>>> >>>> >>>> >>>> -- >>>> Deepak >>>> >>>> >>> >>> >>> -- >>> Deepak >>> >>> >> >> >> -- >> Deepak >> >> > > > -- > Deepak > > -- Deepak