Thanks again! Its with the parser only, just tried the parser <https://gist.github.com/akhld/3948a5d91d218eaf809d> without Spark. And it took me 52 Sec to process 8k json records. Not sure if there's an efficient way to do this in Spark, i know if i use sparkSQL with schemaRDD and all it will be much faster, but i need that in SparkStreaming.
Thanks Best Regards On Sat, Feb 14, 2015 at 8:04 PM, Enno Shioji <eshi...@gmail.com> wrote: > I see. I'd really benchmark how the parsing performs outside Spark (in a > tight loop or something). If *that* is slow, you know it's the parsing. If > not, it's not the parsing. > > Another thing you want to look at is CPU usage. If the actual parsing > really is the bottleneck, you should see very high CPU utilization. If not, > it's not the parsing per se but rather the ability to feed the messages to > the parsing library. > > > ᐧ > > On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das <ak...@sigmoidanalytics.com> > wrote: > >> Ah my bad, it works without serializable exception. But not much >> performance difference is there though. >> >> Thanks >> Best Regards >> >> On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das <ak...@sigmoidanalytics.com> >> wrote: >> >>> Thanks for the suggestion, but doing that gives me this exception: >>> >>> http://pastebin.com/ni80NqKn >>> >>> Over this piece of code: >>> >>> object Holder extends Serializable { >>> @transient lazy val mapper = new ObjectMapper() with >>> ScalaObjectMapper >>> mapper.registerModule(DefaultScalaModule) >>> } >>> >>> val jsonStream = myDStream.map(x=> { >>> Holder.mapper.readValue[Map[String,Any]](x) >>> }) >>> >>> Thanks >>> Best Regards >>> >>> On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji <eshi...@gmail.com> wrote: >>> >>>> (adding back user) >>>> >>>> Fair enough. Regarding serialization exception, the hack I use is to >>>> have a object with a transient lazy field, like so: >>>> >>>> >>>> object Holder extends Serializable { >>>> @transient lazy val mapper = new ObjectMapper() >>>> } >>>> >>>> This way, the ObjectMapper will be instantiated at the destination and >>>> you can share the instance. >>>> >>>> >>>> >>>> ᐧ >>>> >>>> On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das <ak...@sigmoidanalytics.com> >>>> wrote: >>>> >>>>> Thanks for the reply Enno, in my case rate from the stream is not the >>>>> bottleneck as i'm able to consume all those records at a time (have tested >>>>> it). And regarding the ObjectMapper, if i take it outside of my map >>>>> operation then it throws Serializable Exceptions (Caused by: >>>>> java.io.NotSerializableException: >>>>> com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier). >>>>> >>>>> Thanks >>>>> Best Regards >>>>> >>>>> On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji <eshi...@gmail.com> >>>>> wrote: >>>>> >>>>>> If I were you I'd first parse some test jsons in isolation (outside >>>>>> Spark) to determine if the bottleneck is really the parsing. There are >>>>>> plenty other places that could be affecting your performance, like the >>>>>> rate >>>>>> you are able to read from your stream source etc. >>>>>> >>>>>> Apart from that, I notice that you are instantiating the ObjectMapper >>>>>> every time. This is quite expensive and jackson recommends you to share >>>>>> the >>>>>> instance. However, if you tried other parsers / mapPartitions without >>>>>> success, this probably won't fix your problem either. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das < >>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>> >>>>>>> I'm getting a low performance while parsing json data. My cluster >>>>>>> setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory >>>>>>> and 4 cores. >>>>>>> >>>>>>> I tried both scala.util.parsing.json.JSON and and fasterxml's >>>>>>> Jackson parser. >>>>>>> >>>>>>> This is what i basically do: >>>>>>> >>>>>>> *//Approach 1:* >>>>>>> val jsonStream = myDStream.map(x=> { >>>>>>> val mapper = new ObjectMapper() with ScalaObjectMapper >>>>>>> mapper.registerModule(DefaultScalaModule) >>>>>>> mapper.readValue[Map[String,Any]](x) >>>>>>> }) >>>>>>> >>>>>>> jsonStream.count().print() >>>>>>> >>>>>>> >>>>>>> *//Approach 2:* >>>>>>> val jsonStream2 = >>>>>>> myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String, >>>>>>> Any]]) >>>>>>> >>>>>>> jsonStream2.count().print() >>>>>>> >>>>>>> >>>>>>> >>>>>>> It takes around 15-20 Seconds to process/parse 35k json documents >>>>>>> (contains nested documents and arrays) which i put in the stream. >>>>>>> >>>>>>> Is there any better approach/parser to process it faster? i also >>>>>>> tried it with mapPartitions but it did not make any difference. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> Thanks >>>>>>> Best Regards >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >