Thanks for the suggestion, but doing that gives me this exception: http://pastebin.com/ni80NqKn
Over this piece of code: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) } val jsonStream = myDStream.map(x=> { Holder.mapper.readValue[Map[String,Any]](x) }) Thanks Best Regards On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji <eshi...@gmail.com> wrote: > (adding back user) > > Fair enough. Regarding serialization exception, the hack I use is to have > a object with a transient lazy field, like so: > > > object Holder extends Serializable { > @transient lazy val mapper = new ObjectMapper() > } > > This way, the ObjectMapper will be instantiated at the destination and you > can share the instance. > > > > ᐧ > > On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das <ak...@sigmoidanalytics.com> > wrote: > >> Thanks for the reply Enno, in my case rate from the stream is not the >> bottleneck as i'm able to consume all those records at a time (have tested >> it). And regarding the ObjectMapper, if i take it outside of my map >> operation then it throws Serializable Exceptions (Caused by: >> java.io.NotSerializableException: >> com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier). >> >> Thanks >> Best Regards >> >> On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji <eshi...@gmail.com> wrote: >> >>> If I were you I'd first parse some test jsons in isolation (outside >>> Spark) to determine if the bottleneck is really the parsing. There are >>> plenty other places that could be affecting your performance, like the rate >>> you are able to read from your stream source etc. >>> >>> Apart from that, I notice that you are instantiating the ObjectMapper >>> every time. This is quite expensive and jackson recommends you to share the >>> instance. However, if you tried other parsers / mapPartitions without >>> success, this probably won't fix your problem either. >>> >>> >>> >>> >>> >>> On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das <ak...@sigmoidanalytics.com> >>> wrote: >>> >>>> I'm getting a low performance while parsing json data. My cluster setup >>>> is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4 >>>> cores. >>>> >>>> I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson >>>> parser. >>>> >>>> This is what i basically do: >>>> >>>> *//Approach 1:* >>>> val jsonStream = myDStream.map(x=> { >>>> val mapper = new ObjectMapper() with ScalaObjectMapper >>>> mapper.registerModule(DefaultScalaModule) >>>> mapper.readValue[Map[String,Any]](x) >>>> }) >>>> >>>> jsonStream.count().print() >>>> >>>> >>>> *//Approach 2:* >>>> val jsonStream2 = >>>> myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String, >>>> Any]]) >>>> >>>> jsonStream2.count().print() >>>> >>>> >>>> >>>> It takes around 15-20 Seconds to process/parse 35k json documents >>>> (contains nested documents and arrays) which i put in the stream. >>>> >>>> Is there any better approach/parser to process it faster? i also tried >>>> it with mapPartitions but it did not make any difference. >>>> >>>> >>>> >>>> >>>> Thanks >>>> Best Regards >>>> >>> >>> >> >