(adding back user) Fair enough. Regarding serialization exception, the hack I use is to have a object with a transient lazy field, like so:
object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() } This way, the ObjectMapper will be instantiated at the destination and you can share the instance. ᐧ On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das <ak...@sigmoidanalytics.com> wrote: > Thanks for the reply Enno, in my case rate from the stream is not the > bottleneck as i'm able to consume all those records at a time (have tested > it). And regarding the ObjectMapper, if i take it outside of my map > operation then it throws Serializable Exceptions (Caused by: > java.io.NotSerializableException: > com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier). > > Thanks > Best Regards > > On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji <eshi...@gmail.com> wrote: > >> If I were you I'd first parse some test jsons in isolation (outside >> Spark) to determine if the bottleneck is really the parsing. There are >> plenty other places that could be affecting your performance, like the rate >> you are able to read from your stream source etc. >> >> Apart from that, I notice that you are instantiating the ObjectMapper >> every time. This is quite expensive and jackson recommends you to share the >> instance. However, if you tried other parsers / mapPartitions without >> success, this probably won't fix your problem either. >> >> >> >> >> >> On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das <ak...@sigmoidanalytics.com> >> wrote: >> >>> I'm getting a low performance while parsing json data. My cluster setup >>> is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4 >>> cores. >>> >>> I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson >>> parser. >>> >>> This is what i basically do: >>> >>> *//Approach 1:* >>> val jsonStream = myDStream.map(x=> { >>> val mapper = new ObjectMapper() with ScalaObjectMapper >>> mapper.registerModule(DefaultScalaModule) >>> mapper.readValue[Map[String,Any]](x) >>> }) >>> >>> jsonStream.count().print() >>> >>> >>> *//Approach 2:* >>> val jsonStream2 = >>> myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String, >>> Any]]) >>> >>> jsonStream2.count().print() >>> >>> >>> >>> It takes around 15-20 Seconds to process/parse 35k json documents >>> (contains nested documents and arrays) which i put in the stream. >>> >>> Is there any better approach/parser to process it faster? i also tried >>> it with mapPartitions but it did not make any difference. >>> >>> >>> >>> >>> Thanks >>> Best Regards >>> >> >> >