<dependency>

 <groupId>org.apache.avro</groupId>

 <artifactId>avro</artifactId>

 <version>1.7.7</version>

 <scope>provided</scope>

 </dependency>

 <dependency>

 <groupId>com.databricks</groupId>

 <artifactId>spark-avro_2.10</artifactId>

 <version>1.0.0</version>

 </dependency>

 <dependency>

 <groupId>org.apache.avro</groupId>

 <artifactId>avro-mapred</artifactId>

 <version>1.7.7</version>

 <classifier>hadoop2</classifier>

 <scope>provided</scope>

 </dependency>

On Fri, Jun 26, 2015 at 8:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote:

> Same code of yours works for me as well
>
> On Fri, Jun 26, 2015 at 8:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
> wrote:
>
>> Is that its not supported with Avro. Unlikely.
>>
>> On Fri, Jun 26, 2015 at 8:01 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>> wrote:
>>
>>> My imports:
>>>
>>> import org.apache.avro.generic.GenericData
>>>
>>> import org.apache.avro.generic.GenericRecord
>>>
>>> import org.apache.avro.mapred.AvroKey
>>>
>>> import org.apache.avro.Schema
>>>
>>> import org.apache.hadoop.io.NullWritable
>>>
>>> import org.apache.avro.mapreduce.AvroKeyInputFormat
>>>
>>> import org.apache.hadoop.conf.Configuration
>>>
>>> import org.apache.hadoop.fs.FileSystem
>>>
>>> import org.apache.hadoop.fs.Path
>>>
>>> import org.apache.hadoop.io.Text
>>>
>>>
>>>   def readGenericRecords(sc: SparkContext, inputDir: String, startDate:
>>> Date, endDate: Date) = {
>>>
>>>     val path = getInputPaths(inputDir, startDate, endDate)
>>>
>>>     val hadoopConfiguration = new Configuration(sc.hadoopConfiguration)
>>>
>>>     hadoopConfiguration.set(
>>> "mapreduce.input.fileinputformat.split.maxsize", "67108864")
>>>
>>>     sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
>>> AvroKeyInputFormat[GenericRecord]](path + "/*.avro")
>>>
>>>   }
>>>
>>> I need to read Avro datasets and am using strings instead of constant
>>> from InputFormat class.
>>>
>>>
>>> When i click on any hadoop dependency from eclipse, i see they point to
>>> hadoop 2.2.x jars.
>>>
>>>
>>>
>>> On Fri, Jun 26, 2015 at 7:44 AM, Silvio Fiorito <
>>> silvio.fior...@granturing.com> wrote:
>>>
>>>>   Make sure you’re importing the right namespace for Hadoop v2.0. This
>>>> is what I tried:
>>>>
>>>>   import org.apache.hadoop.io.{LongWritable, Text}
>>>> import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat,
>>>> TextInputFormat}
>>>>
>>>>  val hadoopConf = new org.apache.hadoop.conf.Configuration()
>>>> hadoopConf.setLong(FileInputFormat.SPLIT_MAXSIZE, 2048)
>>>>
>>>>  val input = sc.newAPIHadoopFile(
>>>>   "README.md",
>>>>   classOf[TextInputFormat],
>>>>   classOf[LongWritable],
>>>>   classOf[Text],
>>>>   hadoopConf).map(_._2.toString())
>>>>
>>>>  println(input.partitions.size)
>>>>
>>>>  input.
>>>>   flatMap(_.split(" ")).
>>>>   filter(_.length > 0).
>>>>   map((_, 1)).
>>>>   reduceByKey(_ + _).
>>>>   coalesce(1).
>>>>   sortBy(_._2, false).
>>>>   take(10).
>>>>   foreach(println)
>>>>
>>>>
>>>>   From: "ÐΞ€ρ@Ҝ (๏̯͡๏)"
>>>> Date: Friday, June 26, 2015 at 10:18 AM
>>>> To: Silvio Fiorito
>>>> Cc: user
>>>> Subject: Re:
>>>>
>>>>   All these throw compilation error at newAPIHadoopFile
>>>>
>>>> 1)
>>>>
>>>> val hadoopConfiguration = new Configuration()
>>>>
>>>>     hadoopConfiguration.set(
>>>> "mapreduce.input.fileinputformat.split.maxsize", "67108864")
>>>>
>>>>     sc.newAPIHadoopFile[AvroKey, NullWritable,
>>>> AvroKeyInputFormat](path + "/*.avro", classOf[AvroKey],
>>>> classOf[NullWritable], classOf[AvroKeyInputFormat], hadoopConfiguration)
>>>>
>>>> 2)
>>>>
>>>> val hadoopConfiguration = new Configuration()
>>>>
>>>>     hadoopConfiguration.set(
>>>> "mapreduce.input.fileinputformat.split.maxsize", "67108864")
>>>>
>>>>     sc.newAPIHadoopFile[AvroKey, NullWritable,
>>>> AvroKeyInputFormat](path + "/*.avro", classOf[AvroKey[GenericRecord]],
>>>> classOf[NullWritable],
>>>> classOf[AvroKeyInputFormat[GenericRecord]],hadoopConfiguration)
>>>>
>>>> 3)
>>>>
>>>>     val hadoopConfiguration = new Configuration()
>>>>
>>>>     hadoopConfiguration.set(
>>>> "mapreduce.input.fileinputformat.split.maxsize", "67108864")
>>>>
>>>>     sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
>>>> AvroKeyInputFormat[GenericRecord]](path + "/*.avro",
>>>> classOf[AvroKey[GenericRecord]], classOf[NullWritable],
>>>> classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration)
>>>>
>>>> Error:
>>>>
>>>> [ERROR]
>>>> /Users/dvasthimal/ebay/projects/ep/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/util/DataUtil.scala:37:
>>>> error: overloaded method value newAPIHadoopFile with alternatives:
>>>>
>>>> [INFO]   (path: String,fClass:
>>>> Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],kClass:
>>>> Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],vClass:
>>>> Class[org.apache.hadoop.io.NullWritable],conf:
>>>> org.apache.hadoop.conf.Configuration)org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord],
>>>> org.apache.hadoop.io.NullWritable)] <and>
>>>>
>>>> [INFO]   (path: String)(implicit km:
>>>> scala.reflect.ClassTag[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],
>>>> implicit vm: scala.reflect.ClassTag[org.apache.hadoop.io.NullWritable],
>>>> implicit fm:
>>>> scala.reflect.ClassTag[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]])org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord],
>>>> org.apache.hadoop.io.NullWritable)]
>>>>
>>>> [INFO]  cannot be applied to (String,
>>>> Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],
>>>> Class[org.apache.hadoop.io.NullWritable],
>>>> Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],
>>>> org.apache.hadoop.conf.Configuration)
>>>>
>>>> [INFO]     sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
>>>> AvroKeyInputFormat[GenericRecord]](path + "/*.avro",
>>>> classOf[AvroKey[GenericRecord]], classOf[NullWritable],
>>>> classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration)
>>>>
>>>>
>>>>
>>>> On Thu, Jun 25, 2015 at 4:14 PM, Silvio Fiorito <
>>>> silvio.fior...@granturing.com> wrote:
>>>>
>>>>>  Ok, in that case I think you can set the max split size in the
>>>>> Hadoop config object, using the FileInputFormat.SPLIT_MAXSIZE config
>>>>> parameter.
>>>>>
>>>>>  Again, I haven’t done this myself, but looking through the Spark
>>>>> codebase here:
>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1053
>>>>>
>>>>>  And the HDFS FileInputFormat implementation, that seems like a good
>>>>> option to try.
>>>>>
>>>>>  You should be able to call
>>>>> conf.setLong(FileInputFormat.SPLIT_MAXSIZE, max).
>>>>>
>>>>>  I hope that helps!
>>>>>
>>>>>   From: "ÐΞ€ρ@Ҝ (๏̯͡๏)"
>>>>> Date: Thursday, June 25, 2015 at 5:49 PM
>>>>> To: Silvio Fiorito
>>>>> Cc: user
>>>>> Subject: Re:
>>>>>
>>>>>   I use
>>>>>
>>>>> sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
>>>>> AvroKeyInputFormat[GenericRecord]](path + "/*.avro")
>>>>>
>>>>>
>>>>>
>>>>> https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/SparkContext.html#newAPIHadoopFile(java.lang.String,
>>>>> java.lang.Class, java.lang.Class, java.lang.Class,
>>>>> org.apache.hadoop.conf.Configuration)
>>>>>
>>>>> Does not seem to have that partition option.
>>>>>
>>>>> On Thu, Jun 25, 2015 at 12:24 PM, Silvio Fiorito <
>>>>> silvio.fior...@granturing.com> wrote:
>>>>>
>>>>>>  Hi Deepak,
>>>>>>
>>>>>>  Have you tried specifying the minimum partitions when you load the
>>>>>> file? I haven’t tried that myself against HDFS before, so I’m not sure if
>>>>>> it will affect data locality. Ideally not, it should still maintain data
>>>>>> locality but just more partitions. Once your job runs, you can check in 
>>>>>> the
>>>>>> Spark tasks web UI to ensure they’re all Node local.
>>>>>>
>>>>>>  val details = sc.textFile(“hdfs://….”, 500)
>>>>>>
>>>>>>  If you’re using something other than text file you can also specify
>>>>>> minimum partitions when using sc.hadoopFile.
>>>>>>
>>>>>>  Thanks,
>>>>>> Silvio
>>>>>>
>>>>>>   From: "ÐΞ€ρ@Ҝ (๏̯͡๏)"
>>>>>> Date: Thursday, June 25, 2015 at 3:10 PM
>>>>>> To: Akhil Das
>>>>>> Cc: user
>>>>>> Subject: Re:
>>>>>>
>>>>>>   How can i increase the number of tasks from 174 to 500 without
>>>>>> running repartition.
>>>>>>
>>>>>>  The input size is 512.0 MB (hadoop) / 4159106. Can this be reduced
>>>>>> to 64 MB so as to increase the number of tasks. Similar to split size 
>>>>>> that
>>>>>> increases the number of mappers in Hadoop M/R.
>>>>>>
>>>>>> On Thu, Jun 25, 2015 at 12:06 AM, Akhil Das <
>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>
>>>>>>>  Look in the tuning section
>>>>>>> <https://spark.apache.org/docs/latest/tuning.html>, also you need
>>>>>>> to figure out whats taking time and where's your bottleneck etc. If
>>>>>>> everything is tuned properly, then you will need to throw more cores :)
>>>>>>>
>>>>>>>  Thanks
>>>>>>> Best Regards
>>>>>>>
>>>>>>> On Thu, Jun 25, 2015 at 12:19 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Its taking an hour and on Hadoop it takes 1h 30m, is there a way to
>>>>>>>> make it run faster ?
>>>>>>>>
>>>>>>>> On Wed, Jun 24, 2015 at 11:39 AM, Akhil Das <
>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>
>>>>>>>>> Cool. :)
>>>>>>>>>  On 24 Jun 2015 23:44, "ÐΞ€ρ@Ҝ (๏̯͡๏)" <deepuj...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Its running now.
>>>>>>>>>>
>>>>>>>>>> On Wed, Jun 24, 2015 at 10:45 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Now running with
>>>>>>>>>>>
>>>>>>>>>>>  *--num-executors 9973 --driver-memory 14g
>>>>>>>>>>> --driver-java-options "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M"
>>>>>>>>>>> --executor-memory 14g --executor-cores 1*
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jun 24, 2015 at 10:34 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>>  There are multiple of these
>>>>>>>>>>>>
>>>>>>>>>>>>  1)
>>>>>>>>>>>> 15/06/24 09:53:37 ERROR executor.Executor: Exception in task
>>>>>>>>>>>> 443.0 in stage 3.0 (TID 1767)
>>>>>>>>>>>>  java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>>>>>>>>>>  at
>>>>>>>>>>>> sun.reflect.GeneratedSerializationConstructorAccessor1327.newInstance(Unknown
>>>>>>>>>>>> Source)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:56)
>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065)
>>>>>>>>>>>> at
>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228)
>>>>>>>>>>>> at
>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217)
>>>>>>>>>>>> at
>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>>>>>> at
>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134)
>>>>>>>>>>>> at
>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>>>>>>>>>>>> at
>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>>>>>>>>> at
>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>>>>>>>>>>>> at
>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>>>>>>>>> at
>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>>>>>>>>>>>> at
>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>>>>>>>>> at
>>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>>>>>>>> at
>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>>>>>> at
>>>>>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
>>>>>>>>>>>> at
>>>>>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>>>>>>>>>>> at
>>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>>>>>>>>  at
>>>>>>>>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>>>>>>>> at
>>>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>>> at
>>>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
>>>>>>>>>>>>  15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on
>>>>>>>>>>>> LARS? timer thread
>>>>>>>>>>>>
>>>>>>>>>>>>  2)
>>>>>>>>>>>> 15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on
>>>>>>>>>>>> LARS? timer thread
>>>>>>>>>>>>  java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>>>>>>>>>>  at
>>>>>>>>>>>> akka.dispatch.AbstractNodeQueue.<init>(AbstractNodeQueue.java:22)
>>>>>>>>>>>> at
>>>>>>>>>>>> akka.actor.LightArrayRevolverScheduler$TaskQueue.<init>(Scheduler.scala:443)
>>>>>>>>>>>> at
>>>>>>>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:409)
>>>>>>>>>>>> at
>>>>>>>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>>> 3)
>>>>>>>>>>>> # java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>>>>>>>>>> # -XX:OnOutOfMemoryError="kill %p"
>>>>>>>>>>>> #   Executing /bin/sh -c "kill 20674"...
>>>>>>>>>>>> [ERROR] [06/24/2015 09:53:37.590] [Executor task launch
>>>>>>>>>>>> worker-5] [akka.tcp://
>>>>>>>>>>>> sparkdri...@phxdpehdc9dn2137.stratus.phx.ebay.com:47708/]
>>>>>>>>>>>> swallowing exception during message send
>>>>>>>>>>>> (akka.remote.RemoteTransportExceptionNoStackTrace)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jun 24, 2015 at 10:31 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I see this
>>>>>>>>>>>>>
>>>>>>>>>>>>>  java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>>>>>>>>>>> at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>>>>>>>>>>> at java.lang.String.<init>(String.java:203)
>>>>>>>>>>>>> at java.lang.StringBuilder.toString(StringBuilder.java:405)
>>>>>>>>>>>>> at java.io.UnixFileSystem.resolve(UnixFileSystem.java:108)
>>>>>>>>>>>>> at java.io.File.<init>(File.java:367)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:81)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:84)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.shuffle.IndexShuffleBlockManager.getIndexFile(IndexShuffleBlockManager.scala:60)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:107)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:304)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Jun 24, 2015 at 7:16 AM, Akhil Das <
>>>>>>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>>  Can you look a bit more in the error logs? It could be
>>>>>>>>>>>>>> getting killed because of OOM etc. One thing you can try is to 
>>>>>>>>>>>>>> set the
>>>>>>>>>>>>>> spark.shuffle.blockTransferService to nio from netty.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  Thanks
>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Jun 24, 2015 at 5:46 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>>>>>> deepuj...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  I have a Spark job that has 7 stages. The first 3 stage
>>>>>>>>>>>>>>> complete and the fourth stage beings (joins two RDDs). This 
>>>>>>>>>>>>>>> stage has
>>>>>>>>>>>>>>> multiple task  failures all the below exception.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  Multiple tasks (100s) of them get the same exception with
>>>>>>>>>>>>>>> different hosts. How can all the host suddenly stop responding 
>>>>>>>>>>>>>>> when few
>>>>>>>>>>>>>>> moments ago 3 stages ran successfully. If I re-run the three 
>>>>>>>>>>>>>>> stages will
>>>>>>>>>>>>>>> again run successfully. I cannot think of it being a cluster 
>>>>>>>>>>>>>>> issue.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  Any suggestions ?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  Spark Version : 1.3.1
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  Exception:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.spark.shuffle.FetchFailedException: Failed to 
>>>>>>>>>>>>>>> connect to HOST
>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
>>>>>>>>>>>>>>>         at org.apache.sp
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  --
>>>>>>>>>>>>>>>  Deepak
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>   --
>>>>>>>>>>>>>  Deepak
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>   --
>>>>>>>>>>>>  Deepak
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>   --
>>>>>>>>>>>  Deepak
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>  --
>>>>>>>>>>  Deepak
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>   --
>>>>>>>>  Deepak
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>>  --
>>>>>>  Deepak
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>  --
>>>>>  Deepak
>>>>>
>>>>>
>>>>
>>>>
>>>>  --
>>>>  Deepak
>>>>
>>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>>
>> --
>> Deepak
>>
>>
>
>
> --
> Deepak
>
>


-- 
Deepak
  • Re: Silvio Fiorito
    • Re: ๏̯͡๏
      • Re: ๏̯͡๏
        • Re: ๏̯͡๏
          • Re: ๏̯͡๏

Reply via email to