Huh, that would come to 6.5ms per one JSON. That does feel like a lot but
if your JSON file is big enough, I guess you could get that sort of
processing time.

Jackson is more or less the most efficient JSON parser out there, so unless
the Scala API is somehow affecting it, I don't see any better way. If you
only need to read parts of the JSON, you could look into exploiting
Jackson's stream parsing API <http://wiki.fasterxml.com/JacksonStreamingApi>
.

I guess the good news is you can throw machines at it. You could also look
into other serialization frameworks.



ᐧ

On Sat, Feb 14, 2015 at 2:49 PM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> Thanks again!
> Its with the parser only, just tried the parser
> <https://gist.github.com/akhld/3948a5d91d218eaf809d> without Spark. And
> it took me 52 Sec to process 8k json records. Not sure if there's an
> efficient way to do this in Spark, i know if i use sparkSQL with schemaRDD
> and all it will be much faster, but i need that in SparkStreaming.
>
> Thanks
> Best Regards
>
> On Sat, Feb 14, 2015 at 8:04 PM, Enno Shioji <eshi...@gmail.com> wrote:
>
>> I see. I'd really benchmark how the parsing performs outside Spark (in a
>> tight loop or something). If *that* is slow, you know it's the parsing. If
>> not, it's not the parsing.
>>
>> Another thing you want to look at is CPU usage. If the actual parsing
>> really is the bottleneck, you should see very high CPU utilization. If not,
>> it's not the parsing per se but rather the ability to feed the messages to
>> the parsing library.
>>
>>
>> ᐧ
>>
>> On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das <ak...@sigmoidanalytics.com>
>> wrote:
>>
>>> Ah my bad, it works without serializable exception. But not much
>>> performance difference is there though.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das <ak...@sigmoidanalytics.com>
>>> wrote:
>>>
>>>> Thanks for the suggestion, but doing that gives me this exception:
>>>>
>>>> http://pastebin.com/ni80NqKn
>>>>
>>>> Over this piece of code:
>>>>
>>>>    object Holder extends Serializable {
>>>>       @transient lazy val mapper = new ObjectMapper() with
>>>> ScalaObjectMapper
>>>>       mapper.registerModule(DefaultScalaModule)
>>>>     }
>>>>
>>>>     val jsonStream = myDStream.map(x=> {
>>>>        Holder.mapper.readValue[Map[String,Any]](x)
>>>>     })
>>>>
>>>> Thanks
>>>> Best Regards
>>>>
>>>> On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji <eshi...@gmail.com> wrote:
>>>>
>>>>> (adding back user)
>>>>>
>>>>> Fair enough. Regarding serialization exception, the hack I use is to
>>>>> have a object with a transient lazy field, like so:
>>>>>
>>>>>
>>>>> object Holder extends Serializable {
>>>>>   @transient lazy val mapper = new ObjectMapper()
>>>>> }
>>>>>
>>>>> This way, the ObjectMapper will be instantiated at the destination and
>>>>> you can share the instance.
>>>>>
>>>>>
>>>>>
>>>>> ᐧ
>>>>>
>>>>> On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das <ak...@sigmoidanalytics.com
>>>>> > wrote:
>>>>>
>>>>>> Thanks for the reply Enno, in my case rate from the stream is not the
>>>>>> bottleneck as i'm able to consume all those records at a time (have 
>>>>>> tested
>>>>>> it). And regarding the ObjectMapper, if i take it outside of my map
>>>>>> operation then it throws Serializable Exceptions (Caused by:
>>>>>> java.io.NotSerializableException:
>>>>>> com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).
>>>>>>
>>>>>> Thanks
>>>>>> Best Regards
>>>>>>
>>>>>> On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji <eshi...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> If I were you I'd first parse some test jsons in isolation (outside
>>>>>>> Spark) to determine if the bottleneck is really the parsing. There are
>>>>>>> plenty other places that could be affecting your performance, like the 
>>>>>>> rate
>>>>>>> you are able to read from your stream source etc.
>>>>>>>
>>>>>>> Apart from that, I notice that you are instantiating the
>>>>>>> ObjectMapper every time. This is quite expensive and jackson recommends 
>>>>>>> you
>>>>>>> to share the instance. However, if you tried other parsers / 
>>>>>>> mapPartitions
>>>>>>> without success, this probably won't fix your problem either.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das <
>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>
>>>>>>>> I'm getting a low performance while parsing json data. My cluster
>>>>>>>> setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of 
>>>>>>>> memory
>>>>>>>> and 4 cores.
>>>>>>>>
>>>>>>>> I tried both scala.util.parsing.json.JSON and and fasterxml's
>>>>>>>> Jackson parser.
>>>>>>>>
>>>>>>>> This is what i basically do:
>>>>>>>>
>>>>>>>> *//Approach 1:*
>>>>>>>> val jsonStream = myDStream.map(x=> {
>>>>>>>>       val mapper = new ObjectMapper() with ScalaObjectMapper
>>>>>>>>       mapper.registerModule(DefaultScalaModule)
>>>>>>>>       mapper.readValue[Map[String,Any]](x)
>>>>>>>>     })
>>>>>>>>
>>>>>>>> jsonStream.count().print()
>>>>>>>>
>>>>>>>>
>>>>>>>> *//Approach 2:*
>>>>>>>> val jsonStream2 =
>>>>>>>> myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
>>>>>>>> Any]])
>>>>>>>>
>>>>>>>> jsonStream2.count().print()
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> It takes around 15-20 Seconds to process/parse 35k json documents
>>>>>>>> (contains nested documents and arrays) which i put in the stream.
>>>>>>>>
>>>>>>>> Is there any better approach/parser to process it faster? i also
>>>>>>>> tried it with mapPartitions but it did not make any difference.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Best Regards
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to