My imports:
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.AvroKey
import org.apache.avro.Schema
import org.apache.hadoop.io.NullWritable
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.Text
def readGenericRecords(sc: SparkContext, inputDir: String, startDate:
Date, endDate: Date) = {
val path = getInputPaths(inputDir, startDate, endDate)
val hadoopConfiguration = new Configuration(sc.hadoopConfiguration)
hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize",
"67108864")
sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
AvroKeyInputFormat[GenericRecord]](path + "/*.avro")
}
I need to read Avro datasets and am using strings instead of constant from
InputFormat class.
When i click on any hadoop dependency from eclipse, i see they point to
hadoop 2.2.x jars.
On Fri, Jun 26, 2015 at 7:44 AM, Silvio Fiorito <
[email protected]> wrote:
> Make sure you’re importing the right namespace for Hadoop v2.0. This is
> what I tried:
>
> import org.apache.hadoop.io.{LongWritable, Text}
> import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat,
> TextInputFormat}
>
> val hadoopConf = new org.apache.hadoop.conf.Configuration()
> hadoopConf.setLong(FileInputFormat.SPLIT_MAXSIZE, 2048)
>
> val input = sc.newAPIHadoopFile(
> "README.md",
> classOf[TextInputFormat],
> classOf[LongWritable],
> classOf[Text],
> hadoopConf).map(_._2.toString())
>
> println(input.partitions.size)
>
> input.
> flatMap(_.split(" ")).
> filter(_.length > 0).
> map((_, 1)).
> reduceByKey(_ + _).
> coalesce(1).
> sortBy(_._2, false).
> take(10).
> foreach(println)
>
>
> From: "ÐΞ€ρ@Ҝ (๏̯͡๏)"
> Date: Friday, June 26, 2015 at 10:18 AM
> To: Silvio Fiorito
> Cc: user
> Subject: Re:
>
> All these throw compilation error at newAPIHadoopFile
>
> 1)
>
> val hadoopConfiguration = new Configuration()
>
> hadoopConfiguration.set(
> "mapreduce.input.fileinputformat.split.maxsize", "67108864")
>
> sc.newAPIHadoopFile[AvroKey, NullWritable, AvroKeyInputFormat](path +
> "/*.avro", classOf[AvroKey], classOf[NullWritable],
> classOf[AvroKeyInputFormat], hadoopConfiguration)
>
> 2)
>
> val hadoopConfiguration = new Configuration()
>
> hadoopConfiguration.set(
> "mapreduce.input.fileinputformat.split.maxsize", "67108864")
>
> sc.newAPIHadoopFile[AvroKey, NullWritable, AvroKeyInputFormat](path +
> "/*.avro", classOf[AvroKey[GenericRecord]], classOf[NullWritable],
> classOf[AvroKeyInputFormat[GenericRecord]],hadoopConfiguration)
>
> 3)
>
> val hadoopConfiguration = new Configuration()
>
> hadoopConfiguration.set(
> "mapreduce.input.fileinputformat.split.maxsize", "67108864")
>
> sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
> AvroKeyInputFormat[GenericRecord]](path + "/*.avro",
> classOf[AvroKey[GenericRecord]], classOf[NullWritable],
> classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration)
>
> Error:
>
> [ERROR]
> /Users/dvasthimal/ebay/projects/ep/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/util/DataUtil.scala:37:
> error: overloaded method value newAPIHadoopFile with alternatives:
>
> [INFO] (path: String,fClass:
> Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],kClass:
> Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],vClass:
> Class[org.apache.hadoop.io.NullWritable],conf:
> org.apache.hadoop.conf.Configuration)org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord],
> org.apache.hadoop.io.NullWritable)] <and>
>
> [INFO] (path: String)(implicit km:
> scala.reflect.ClassTag[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],
> implicit vm: scala.reflect.ClassTag[org.apache.hadoop.io.NullWritable],
> implicit fm:
> scala.reflect.ClassTag[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]])org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord],
> org.apache.hadoop.io.NullWritable)]
>
> [INFO] cannot be applied to (String,
> Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],
> Class[org.apache.hadoop.io.NullWritable],
> Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],
> org.apache.hadoop.conf.Configuration)
>
> [INFO] sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
> AvroKeyInputFormat[GenericRecord]](path + "/*.avro",
> classOf[AvroKey[GenericRecord]], classOf[NullWritable],
> classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration)
>
>
>
> On Thu, Jun 25, 2015 at 4:14 PM, Silvio Fiorito <
> [email protected]> wrote:
>
>> Ok, in that case I think you can set the max split size in the Hadoop
>> config object, using the FileInputFormat.SPLIT_MAXSIZE config parameter.
>>
>> Again, I haven’t done this myself, but looking through the Spark
>> codebase here:
>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1053
>>
>> And the HDFS FileInputFormat implementation, that seems like a good
>> option to try.
>>
>> You should be able to call conf.setLong(FileInputFormat.SPLIT_MAXSIZE,
>> max).
>>
>> I hope that helps!
>>
>> From: "ÐΞ€ρ@Ҝ (๏̯͡๏)"
>> Date: Thursday, June 25, 2015 at 5:49 PM
>> To: Silvio Fiorito
>> Cc: user
>> Subject: Re:
>>
>> I use
>>
>> sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
>> AvroKeyInputFormat[GenericRecord]](path + "/*.avro")
>>
>>
>>
>> https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/SparkContext.html#newAPIHadoopFile(java.lang.String,
>> java.lang.Class, java.lang.Class, java.lang.Class,
>> org.apache.hadoop.conf.Configuration)
>>
>> Does not seem to have that partition option.
>>
>> On Thu, Jun 25, 2015 at 12:24 PM, Silvio Fiorito <
>> [email protected]> wrote:
>>
>>> Hi Deepak,
>>>
>>> Have you tried specifying the minimum partitions when you load the
>>> file? I haven’t tried that myself against HDFS before, so I’m not sure if
>>> it will affect data locality. Ideally not, it should still maintain data
>>> locality but just more partitions. Once your job runs, you can check in the
>>> Spark tasks web UI to ensure they’re all Node local.
>>>
>>> val details = sc.textFile(“hdfs://….”, 500)
>>>
>>> If you’re using something other than text file you can also specify
>>> minimum partitions when using sc.hadoopFile.
>>>
>>> Thanks,
>>> Silvio
>>>
>>> From: "ÐΞ€ρ@Ҝ (๏̯͡๏)"
>>> Date: Thursday, June 25, 2015 at 3:10 PM
>>> To: Akhil Das
>>> Cc: user
>>> Subject: Re:
>>>
>>> How can i increase the number of tasks from 174 to 500 without
>>> running repartition.
>>>
>>> The input size is 512.0 MB (hadoop) / 4159106. Can this be reduced to
>>> 64 MB so as to increase the number of tasks. Similar to split size that
>>> increases the number of mappers in Hadoop M/R.
>>>
>>> On Thu, Jun 25, 2015 at 12:06 AM, Akhil Das <[email protected]>
>>> wrote:
>>>
>>>> Look in the tuning section
>>>> <https://spark.apache.org/docs/latest/tuning.html>, also you need to
>>>> figure out whats taking time and where's your bottleneck etc. If everything
>>>> is tuned properly, then you will need to throw more cores :)
>>>>
>>>> Thanks
>>>> Best Regards
>>>>
>>>> On Thu, Jun 25, 2015 at 12:19 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <[email protected]>
>>>> wrote:
>>>>
>>>>> Its taking an hour and on Hadoop it takes 1h 30m, is there a way to
>>>>> make it run faster ?
>>>>>
>>>>> On Wed, Jun 24, 2015 at 11:39 AM, Akhil Das <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Cool. :)
>>>>>> On 24 Jun 2015 23:44, "ÐΞ€ρ@Ҝ (๏̯͡๏)" <[email protected]> wrote:
>>>>>>
>>>>>>> Its running now.
>>>>>>>
>>>>>>> On Wed, Jun 24, 2015 at 10:45 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <[email protected]
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Now running with
>>>>>>>>
>>>>>>>> *--num-executors 9973 --driver-memory 14g --driver-java-options
>>>>>>>> "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M" --executor-memory 14g
>>>>>>>> --executor-cores 1*
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jun 24, 2015 at 10:34 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> There are multiple of these
>>>>>>>>>
>>>>>>>>> 1)
>>>>>>>>> 15/06/24 09:53:37 ERROR executor.Executor: Exception in task 443.0
>>>>>>>>> in stage 3.0 (TID 1767)
>>>>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>>>>>>> at
>>>>>>>>> sun.reflect.GeneratedSerializationConstructorAccessor1327.newInstance(Unknown
>>>>>>>>> Source)
>>>>>>>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>>>>>>>>> at
>>>>>>>>> org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:56)
>>>>>>>>> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065)
>>>>>>>>> at
>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228)
>>>>>>>>> at
>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217)
>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>>> at
>>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134)
>>>>>>>>> at
>>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>>>>>>>>> at
>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>>>>>> at
>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>>>>>>>>> at
>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>>>>>> at
>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>>>>>>>>> at
>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>>>>>> at
>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>>> at
>>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
>>>>>>>>> at
>>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
>>>>>>>>> 15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on
>>>>>>>>> LARS? timer thread
>>>>>>>>>
>>>>>>>>> 2)
>>>>>>>>> 15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on LARS?
>>>>>>>>> timer thread
>>>>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>>>>>>> at
>>>>>>>>> akka.dispatch.AbstractNodeQueue.<init>(AbstractNodeQueue.java:22)
>>>>>>>>> at
>>>>>>>>> akka.actor.LightArrayRevolverScheduler$TaskQueue.<init>(Scheduler.scala:443)
>>>>>>>>> at
>>>>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:409)
>>>>>>>>> at
>>>>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>> 3)
>>>>>>>>> # java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>>>>>>> # -XX:OnOutOfMemoryError="kill %p"
>>>>>>>>> # Executing /bin/sh -c "kill 20674"...
>>>>>>>>> [ERROR] [06/24/2015 09:53:37.590] [Executor task launch worker-5]
>>>>>>>>> [akka.tcp://
>>>>>>>>> [email protected]:47708/]
>>>>>>>>> swallowing exception during message send
>>>>>>>>> (akka.remote.RemoteTransportExceptionNoStackTrace)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jun 24, 2015 at 10:31 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> I see this
>>>>>>>>>>
>>>>>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>>>>>>>> at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>>>>>>>> at java.lang.String.<init>(String.java:203)
>>>>>>>>>> at java.lang.StringBuilder.toString(StringBuilder.java:405)
>>>>>>>>>> at java.io.UnixFileSystem.resolve(UnixFileSystem.java:108)
>>>>>>>>>> at java.io.File.<init>(File.java:367)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:81)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:84)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.shuffle.IndexShuffleBlockManager.getIndexFile(IndexShuffleBlockManager.scala:60)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:107)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:304)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>>>>>>>>> at
>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>>>>>>>> at
>>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>>>>>>>> at
>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>>>>>>> at
>>>>>>>>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>>>>>>>> at
>>>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>>>>>>>>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
>>>>>>>>>> at
>>>>>>>>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>>>>>>>>> at
>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>>>> at
>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>>>>>> at
>>>>>>>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>>>>>>>>> at
>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>>>> at
>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>>>>>>>> at
>>>>>>>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>>>>>>>>>> at
>>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>>>>>>>>
>>>>>>>>>> On Wed, Jun 24, 2015 at 7:16 AM, Akhil Das <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Can you look a bit more in the error logs? It could be getting
>>>>>>>>>>> killed because of OOM etc. One thing you can try is to set the
>>>>>>>>>>> spark.shuffle.blockTransferService to nio from netty.
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Best Regards
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jun 24, 2015 at 5:46 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I have a Spark job that has 7 stages. The first 3 stage
>>>>>>>>>>>> complete and the fourth stage beings (joins two RDDs). This stage
>>>>>>>>>>>> has
>>>>>>>>>>>> multiple task failures all the below exception.
>>>>>>>>>>>>
>>>>>>>>>>>> Multiple tasks (100s) of them get the same exception with
>>>>>>>>>>>> different hosts. How can all the host suddenly stop responding
>>>>>>>>>>>> when few
>>>>>>>>>>>> moments ago 3 stages ran successfully. If I re-run the three
>>>>>>>>>>>> stages will
>>>>>>>>>>>> again run successfully. I cannot think of it being a cluster issue.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Any suggestions ?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Spark Version : 1.3.1
>>>>>>>>>>>>
>>>>>>>>>>>> Exception:
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.shuffle.FetchFailedException: Failed to connect
>>>>>>>>>>>> to HOST
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>>>>>>>>>>> at
>>>>>>>>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>>>>>>>> at
>>>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>>> at
>>>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
>>>>>>>>>>>> at org.apache.sp
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Deepak
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Deepak
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Deepak
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Deepak
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Deepak
>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Deepak
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>>
>> --
>> Deepak
>>
>>
>
>
> --
> Deepak
>
>
--
Deepak