My imports: import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord import org.apache.avro.mapred.AvroKey import org.apache.avro.Schema import org.apache.hadoop.io.NullWritable import org.apache.avro.mapreduce.AvroKeyInputFormat import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.io.Text def readGenericRecords(sc: SparkContext, inputDir: String, startDate: Date, endDate: Date) = { val path = getInputPaths(inputDir, startDate, endDate) val hadoopConfiguration = new Configuration(sc.hadoopConfiguration) hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", "67108864") sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](path + "/*.avro") } I need to read Avro datasets and am using strings instead of constant from InputFormat class. When i click on any hadoop dependency from eclipse, i see they point to hadoop 2.2.x jars. On Fri, Jun 26, 2015 at 7:44 AM, Silvio Fiorito < silvio.fior...@granturing.com> wrote: > Make sure you’re importing the right namespace for Hadoop v2.0. This is > what I tried: > > import org.apache.hadoop.io.{LongWritable, Text} > import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, > TextInputFormat} > > val hadoopConf = new org.apache.hadoop.conf.Configuration() > hadoopConf.setLong(FileInputFormat.SPLIT_MAXSIZE, 2048) > > val input = sc.newAPIHadoopFile( > "README.md", > classOf[TextInputFormat], > classOf[LongWritable], > classOf[Text], > hadoopConf).map(_._2.toString()) > > println(input.partitions.size) > > input. > flatMap(_.split(" ")). > filter(_.length > 0). > map((_, 1)). > reduceByKey(_ + _). > coalesce(1). > sortBy(_._2, false). > take(10). > foreach(println) > > > From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" > Date: Friday, June 26, 2015 at 10:18 AM > To: Silvio Fiorito > Cc: user > Subject: Re: > > All these throw compilation error at newAPIHadoopFile > > 1) > > val hadoopConfiguration = new Configuration() > > hadoopConfiguration.set( > "mapreduce.input.fileinputformat.split.maxsize", "67108864") > > sc.newAPIHadoopFile[AvroKey, NullWritable, AvroKeyInputFormat](path + > "/*.avro", classOf[AvroKey], classOf[NullWritable], > classOf[AvroKeyInputFormat], hadoopConfiguration) > > 2) > > val hadoopConfiguration = new Configuration() > > hadoopConfiguration.set( > "mapreduce.input.fileinputformat.split.maxsize", "67108864") > > sc.newAPIHadoopFile[AvroKey, NullWritable, AvroKeyInputFormat](path + > "/*.avro", classOf[AvroKey[GenericRecord]], classOf[NullWritable], > classOf[AvroKeyInputFormat[GenericRecord]],hadoopConfiguration) > > 3) > > val hadoopConfiguration = new Configuration() > > hadoopConfiguration.set( > "mapreduce.input.fileinputformat.split.maxsize", "67108864") > > sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, > AvroKeyInputFormat[GenericRecord]](path + "/*.avro", > classOf[AvroKey[GenericRecord]], classOf[NullWritable], > classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration) > > Error: > > [ERROR] > /Users/dvasthimal/ebay/projects/ep/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/util/DataUtil.scala:37: > error: overloaded method value newAPIHadoopFile with alternatives: > > [INFO] (path: String,fClass: > Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],kClass: > Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],vClass: > Class[org.apache.hadoop.io.NullWritable],conf: > org.apache.hadoop.conf.Configuration)org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord], > org.apache.hadoop.io.NullWritable)] <and> > > [INFO] (path: String)(implicit km: > scala.reflect.ClassTag[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]], > implicit vm: scala.reflect.ClassTag[org.apache.hadoop.io.NullWritable], > implicit fm: > scala.reflect.ClassTag[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]])org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord], > org.apache.hadoop.io.NullWritable)] > > [INFO] cannot be applied to (String, > Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]], > Class[org.apache.hadoop.io.NullWritable], > Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]], > org.apache.hadoop.conf.Configuration) > > [INFO] sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, > AvroKeyInputFormat[GenericRecord]](path + "/*.avro", > classOf[AvroKey[GenericRecord]], classOf[NullWritable], > classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration) > > > > On Thu, Jun 25, 2015 at 4:14 PM, Silvio Fiorito < > silvio.fior...@granturing.com> wrote: > >> Ok, in that case I think you can set the max split size in the Hadoop >> config object, using the FileInputFormat.SPLIT_MAXSIZE config parameter. >> >> Again, I haven’t done this myself, but looking through the Spark >> codebase here: >> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1053 >> >> And the HDFS FileInputFormat implementation, that seems like a good >> option to try. >> >> You should be able to call conf.setLong(FileInputFormat.SPLIT_MAXSIZE, >> max). >> >> I hope that helps! >> >> From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" >> Date: Thursday, June 25, 2015 at 5:49 PM >> To: Silvio Fiorito >> Cc: user >> Subject: Re: >> >> I use >> >> sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, >> AvroKeyInputFormat[GenericRecord]](path + "/*.avro") >> >> >> >> https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/SparkContext.html#newAPIHadoopFile(java.lang.String, >> java.lang.Class, java.lang.Class, java.lang.Class, >> org.apache.hadoop.conf.Configuration) >> >> Does not seem to have that partition option. >> >> On Thu, Jun 25, 2015 at 12:24 PM, Silvio Fiorito < >> silvio.fior...@granturing.com> wrote: >> >>> Hi Deepak, >>> >>> Have you tried specifying the minimum partitions when you load the >>> file? I haven’t tried that myself against HDFS before, so I’m not sure if >>> it will affect data locality. Ideally not, it should still maintain data >>> locality but just more partitions. Once your job runs, you can check in the >>> Spark tasks web UI to ensure they’re all Node local. >>> >>> val details = sc.textFile(“hdfs://….”, 500) >>> >>> If you’re using something other than text file you can also specify >>> minimum partitions when using sc.hadoopFile. >>> >>> Thanks, >>> Silvio >>> >>> From: "ÐΞ€ρ@Ҝ (๏̯͡๏)" >>> Date: Thursday, June 25, 2015 at 3:10 PM >>> To: Akhil Das >>> Cc: user >>> Subject: Re: >>> >>> How can i increase the number of tasks from 174 to 500 without >>> running repartition. >>> >>> The input size is 512.0 MB (hadoop) / 4159106. Can this be reduced to >>> 64 MB so as to increase the number of tasks. Similar to split size that >>> increases the number of mappers in Hadoop M/R. >>> >>> On Thu, Jun 25, 2015 at 12:06 AM, Akhil Das <ak...@sigmoidanalytics.com> >>> wrote: >>> >>>> Look in the tuning section >>>> <https://spark.apache.org/docs/latest/tuning.html>, also you need to >>>> figure out whats taking time and where's your bottleneck etc. If everything >>>> is tuned properly, then you will need to throw more cores :) >>>> >>>> Thanks >>>> Best Regards >>>> >>>> On Thu, Jun 25, 2015 at 12:19 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>>> wrote: >>>> >>>>> Its taking an hour and on Hadoop it takes 1h 30m, is there a way to >>>>> make it run faster ? >>>>> >>>>> On Wed, Jun 24, 2015 at 11:39 AM, Akhil Das < >>>>> ak...@sigmoidanalytics.com> wrote: >>>>> >>>>>> Cool. :) >>>>>> On 24 Jun 2015 23:44, "ÐΞ€ρ@Ҝ (๏̯͡๏)" <deepuj...@gmail.com> wrote: >>>>>> >>>>>>> Its running now. >>>>>>> >>>>>>> On Wed, Jun 24, 2015 at 10:45 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com >>>>>>> > wrote: >>>>>>> >>>>>>>> Now running with >>>>>>>> >>>>>>>> *--num-executors 9973 --driver-memory 14g --driver-java-options >>>>>>>> "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M" --executor-memory 14g >>>>>>>> --executor-cores 1* >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Jun 24, 2015 at 10:34 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>> >>>>>>>>> There are multiple of these >>>>>>>>> >>>>>>>>> 1) >>>>>>>>> 15/06/24 09:53:37 ERROR executor.Executor: Exception in task 443.0 >>>>>>>>> in stage 3.0 (TID 1767) >>>>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>>>> at >>>>>>>>> sun.reflect.GeneratedSerializationConstructorAccessor1327.newInstance(Unknown >>>>>>>>> Source) >>>>>>>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:526) >>>>>>>>> at >>>>>>>>> org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:56) >>>>>>>>> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065) >>>>>>>>> at >>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228) >>>>>>>>> at >>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217) >>>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>>> at >>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134) >>>>>>>>> at >>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17) >>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) >>>>>>>>> at >>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>>>>>>>> at >>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) >>>>>>>>> at >>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>>>>>>>> at >>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) >>>>>>>>> at >>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>>>>>>>> at >>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>>> at >>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42) >>>>>>>>> at >>>>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) >>>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>>> at >>>>>>>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138) >>>>>>>>> at >>>>>>>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) >>>>>>>>> at >>>>>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>>>>>>>> at >>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>>>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>>>>>> at >>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>>>>>> at >>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>>>>>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>> at >>>>>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) >>>>>>>>> 15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on >>>>>>>>> LARS? timer thread >>>>>>>>> >>>>>>>>> 2) >>>>>>>>> 15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on LARS? >>>>>>>>> timer thread >>>>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>>>> at >>>>>>>>> akka.dispatch.AbstractNodeQueue.<init>(AbstractNodeQueue.java:22) >>>>>>>>> at >>>>>>>>> akka.actor.LightArrayRevolverScheduler$TaskQueue.<init>(Scheduler.scala:443) >>>>>>>>> at >>>>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:409) >>>>>>>>> at >>>>>>>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) >>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>> 3) >>>>>>>>> # java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>>>> # -XX:OnOutOfMemoryError="kill %p" >>>>>>>>> # Executing /bin/sh -c "kill 20674"... >>>>>>>>> [ERROR] [06/24/2015 09:53:37.590] [Executor task launch worker-5] >>>>>>>>> [akka.tcp:// >>>>>>>>> sparkdri...@phxdpehdc9dn2137.stratus.phx.ebay.com:47708/] >>>>>>>>> swallowing exception during message send >>>>>>>>> (akka.remote.RemoteTransportExceptionNoStackTrace) >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Jun 24, 2015 at 10:31 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> I see this >>>>>>>>>> >>>>>>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded >>>>>>>>>> at java.util.Arrays.copyOfRange(Arrays.java:2694) >>>>>>>>>> at java.lang.String.<init>(String.java:203) >>>>>>>>>> at java.lang.StringBuilder.toString(StringBuilder.java:405) >>>>>>>>>> at java.io.UnixFileSystem.resolve(UnixFileSystem.java:108) >>>>>>>>>> at java.io.File.<init>(File.java:367) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:81) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:84) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.shuffle.IndexShuffleBlockManager.getIndexFile(IndexShuffleBlockManager.scala:60) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:107) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:304) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >>>>>>>>>> at >>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>>>>>>>>> at >>>>>>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >>>>>>>>>> at >>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>>>>>>>>> at >>>>>>>>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >>>>>>>>>> at >>>>>>>>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >>>>>>>>>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) >>>>>>>>>> at >>>>>>>>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >>>>>>>>>> at >>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>>> at >>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >>>>>>>>>> at >>>>>>>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) >>>>>>>>>> at >>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>>> at >>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >>>>>>>>>> at >>>>>>>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) >>>>>>>>>> at >>>>>>>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>>>>>>> >>>>>>>>>> On Wed, Jun 24, 2015 at 7:16 AM, Akhil Das < >>>>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>>>> >>>>>>>>>>> Can you look a bit more in the error logs? It could be getting >>>>>>>>>>> killed because of OOM etc. One thing you can try is to set the >>>>>>>>>>> spark.shuffle.blockTransferService to nio from netty. >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> Best Regards >>>>>>>>>>> >>>>>>>>>>> On Wed, Jun 24, 2015 at 5:46 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) < >>>>>>>>>>> deepuj...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> I have a Spark job that has 7 stages. The first 3 stage >>>>>>>>>>>> complete and the fourth stage beings (joins two RDDs). This stage >>>>>>>>>>>> has >>>>>>>>>>>> multiple task failures all the below exception. >>>>>>>>>>>> >>>>>>>>>>>> Multiple tasks (100s) of them get the same exception with >>>>>>>>>>>> different hosts. How can all the host suddenly stop responding >>>>>>>>>>>> when few >>>>>>>>>>>> moments ago 3 stages ran successfully. If I re-run the three >>>>>>>>>>>> stages will >>>>>>>>>>>> again run successfully. I cannot think of it being a cluster issue. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Any suggestions ? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Spark Version : 1.3.1 >>>>>>>>>>>> >>>>>>>>>>>> Exception: >>>>>>>>>>>> >>>>>>>>>>>> org.apache.spark.shuffle.FetchFailedException: Failed to connect >>>>>>>>>>>> to HOST >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) >>>>>>>>>>>> at >>>>>>>>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>>>>>>>>>>> at >>>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>>>>> at >>>>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) >>>>>>>>>>>> at org.apache.sp >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Deepak >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Deepak >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Deepak >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Deepak >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Deepak >>>>>>> >>>>>>> >>>>> >>>>> >>>>> -- >>>>> Deepak >>>>> >>>>> >>>> >>> >>> >>> -- >>> Deepak >>> >>> >> >> >> -- >> Deepak >> >> > > > -- > Deepak > > -- Deepak