(adding back user)

Fair enough. Regarding serialization exception, the hack I use is to have a
object with a transient lazy field, like so:


object Holder extends Serializable {
  @transient lazy val mapper = new ObjectMapper()
}

This way, the ObjectMapper will be instantiated at the destination and you
can share the instance.



ᐧ

On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> Thanks for the reply Enno, in my case rate from the stream is not the
> bottleneck as i'm able to consume all those records at a time (have tested
> it). And regarding the ObjectMapper, if i take it outside of my map
> operation then it throws Serializable Exceptions (Caused by:
> java.io.NotSerializableException:
> com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).
>
> Thanks
> Best Regards
>
> On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji <eshi...@gmail.com> wrote:
>
>> If I were you I'd first parse some test jsons in isolation (outside
>> Spark) to determine if the bottleneck is really the parsing. There are
>> plenty other places that could be affecting your performance, like the rate
>> you are able to read from your stream source etc.
>>
>> Apart from that, I notice that you are instantiating the ObjectMapper
>> every time. This is quite expensive and jackson recommends you to share the
>> instance. However, if you tried other parsers / mapPartitions without
>> success, this probably won't fix your problem either.
>>
>>
>>
>>
>>
>> On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das <ak...@sigmoidanalytics.com>
>> wrote:
>>
>>> I'm getting a low performance while parsing json data. My cluster setup
>>> is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4
>>> cores.
>>>
>>> I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson
>>> parser.
>>>
>>> This is what i basically do:
>>>
>>> *//Approach 1:*
>>> val jsonStream = myDStream.map(x=> {
>>>       val mapper = new ObjectMapper() with ScalaObjectMapper
>>>       mapper.registerModule(DefaultScalaModule)
>>>       mapper.readValue[Map[String,Any]](x)
>>>     })
>>>
>>> jsonStream.count().print()
>>>
>>>
>>> *//Approach 2:*
>>> val jsonStream2 =
>>> myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
>>> Any]])
>>>
>>> jsonStream2.count().print()
>>>
>>>
>>>
>>> It takes around 15-20 Seconds to process/parse 35k json documents
>>> (contains nested documents and arrays) which i put in the stream.
>>>
>>> Is there any better approach/parser to process it faster? i also tried
>>> it with mapPartitions but it did not make any difference.
>>>
>>>
>>>
>>>
>>> Thanks
>>> Best Regards
>>>
>>
>>
>

Reply via email to