Thanks again!
Its with the parser only, just tried the parser
<https://gist.github.com/akhld/3948a5d91d218eaf809d> without Spark. And it
took me 52 Sec to process 8k json records. Not sure if there's an efficient
way to do this in Spark, i know if i use sparkSQL with schemaRDD and all it
will be much faster, but i need that in SparkStreaming.

Thanks
Best Regards

On Sat, Feb 14, 2015 at 8:04 PM, Enno Shioji <eshi...@gmail.com> wrote:

> I see. I'd really benchmark how the parsing performs outside Spark (in a
> tight loop or something). If *that* is slow, you know it's the parsing. If
> not, it's not the parsing.
>
> Another thing you want to look at is CPU usage. If the actual parsing
> really is the bottleneck, you should see very high CPU utilization. If not,
> it's not the parsing per se but rather the ability to feed the messages to
> the parsing library.
>
>
> ᐧ
>
> On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> Ah my bad, it works without serializable exception. But not much
>> performance difference is there though.
>>
>> Thanks
>> Best Regards
>>
>> On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das <ak...@sigmoidanalytics.com>
>> wrote:
>>
>>> Thanks for the suggestion, but doing that gives me this exception:
>>>
>>> http://pastebin.com/ni80NqKn
>>>
>>> Over this piece of code:
>>>
>>>    object Holder extends Serializable {
>>>       @transient lazy val mapper = new ObjectMapper() with
>>> ScalaObjectMapper
>>>       mapper.registerModule(DefaultScalaModule)
>>>     }
>>>
>>>     val jsonStream = myDStream.map(x=> {
>>>        Holder.mapper.readValue[Map[String,Any]](x)
>>>     })
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji <eshi...@gmail.com> wrote:
>>>
>>>> (adding back user)
>>>>
>>>> Fair enough. Regarding serialization exception, the hack I use is to
>>>> have a object with a transient lazy field, like so:
>>>>
>>>>
>>>> object Holder extends Serializable {
>>>>   @transient lazy val mapper = new ObjectMapper()
>>>> }
>>>>
>>>> This way, the ObjectMapper will be instantiated at the destination and
>>>> you can share the instance.
>>>>
>>>>
>>>>
>>>> ᐧ
>>>>
>>>> On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das <ak...@sigmoidanalytics.com>
>>>> wrote:
>>>>
>>>>> Thanks for the reply Enno, in my case rate from the stream is not the
>>>>> bottleneck as i'm able to consume all those records at a time (have tested
>>>>> it). And regarding the ObjectMapper, if i take it outside of my map
>>>>> operation then it throws Serializable Exceptions (Caused by:
>>>>> java.io.NotSerializableException:
>>>>> com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).
>>>>>
>>>>> Thanks
>>>>> Best Regards
>>>>>
>>>>> On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji <eshi...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> If I were you I'd first parse some test jsons in isolation (outside
>>>>>> Spark) to determine if the bottleneck is really the parsing. There are
>>>>>> plenty other places that could be affecting your performance, like the 
>>>>>> rate
>>>>>> you are able to read from your stream source etc.
>>>>>>
>>>>>> Apart from that, I notice that you are instantiating the ObjectMapper
>>>>>> every time. This is quite expensive and jackson recommends you to share 
>>>>>> the
>>>>>> instance. However, if you tried other parsers / mapPartitions without
>>>>>> success, this probably won't fix your problem either.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das <
>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>
>>>>>>> I'm getting a low performance while parsing json data. My cluster
>>>>>>> setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory
>>>>>>> and 4 cores.
>>>>>>>
>>>>>>> I tried both scala.util.parsing.json.JSON and and fasterxml's
>>>>>>> Jackson parser.
>>>>>>>
>>>>>>> This is what i basically do:
>>>>>>>
>>>>>>> *//Approach 1:*
>>>>>>> val jsonStream = myDStream.map(x=> {
>>>>>>>       val mapper = new ObjectMapper() with ScalaObjectMapper
>>>>>>>       mapper.registerModule(DefaultScalaModule)
>>>>>>>       mapper.readValue[Map[String,Any]](x)
>>>>>>>     })
>>>>>>>
>>>>>>> jsonStream.count().print()
>>>>>>>
>>>>>>>
>>>>>>> *//Approach 2:*
>>>>>>> val jsonStream2 =
>>>>>>> myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
>>>>>>> Any]])
>>>>>>>
>>>>>>> jsonStream2.count().print()
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> It takes around 15-20 Seconds to process/parse 35k json documents
>>>>>>> (contains nested documents and arrays) which i put in the stream.
>>>>>>>
>>>>>>> Is there any better approach/parser to process it faster? i also
>>>>>>> tried it with mapPartitions but it did not make any difference.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>> Best Regards
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to