SparkStreaming Low Performance

2015-02-14 Thread Akhil Das
I'm getting a low performance while parsing json data. My cluster setup is
1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4 cores.

I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson
parser.

This is what i basically do:

*//Approach 1:*
val jsonStream = myDStream.map(x=> {
  val mapper = new ObjectMapper() with ScalaObjectMapper
  mapper.registerModule(DefaultScalaModule)
  mapper.readValue[Map[String,Any]](x)
})

jsonStream.count().print()


*//Approach 2:*
val jsonStream2 =
myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
Any]])

jsonStream2.count().print()



It takes around 15-20 Seconds to process/parse 35k json documents (contains
nested documents and arrays) which i put in the stream.

Is there any better approach/parser to process it faster? i also tried it
with mapPartitions but it did not make any difference.




Thanks
Best Regards


Re: SparkStreaming Low Performance

2015-02-14 Thread Enno Shioji
(adding back user)

Fair enough. Regarding serialization exception, the hack I use is to have a
object with a transient lazy field, like so:


object Holder extends Serializable {
  @transient lazy val mapper = new ObjectMapper()
}

This way, the ObjectMapper will be instantiated at the destination and you
can share the instance.



ᐧ

On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das 
wrote:

> Thanks for the reply Enno, in my case rate from the stream is not the
> bottleneck as i'm able to consume all those records at a time (have tested
> it). And regarding the ObjectMapper, if i take it outside of my map
> operation then it throws Serializable Exceptions (Caused by:
> java.io.NotSerializableException:
> com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).
>
> Thanks
> Best Regards
>
> On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji  wrote:
>
>> If I were you I'd first parse some test jsons in isolation (outside
>> Spark) to determine if the bottleneck is really the parsing. There are
>> plenty other places that could be affecting your performance, like the rate
>> you are able to read from your stream source etc.
>>
>> Apart from that, I notice that you are instantiating the ObjectMapper
>> every time. This is quite expensive and jackson recommends you to share the
>> instance. However, if you tried other parsers / mapPartitions without
>> success, this probably won't fix your problem either.
>>
>>
>>
>>
>>
>> On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das 
>> wrote:
>>
>>> I'm getting a low performance while parsing json data. My cluster setup
>>> is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4
>>> cores.
>>>
>>> I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson
>>> parser.
>>>
>>> This is what i basically do:
>>>
>>> *//Approach 1:*
>>> val jsonStream = myDStream.map(x=> {
>>>   val mapper = new ObjectMapper() with ScalaObjectMapper
>>>   mapper.registerModule(DefaultScalaModule)
>>>   mapper.readValue[Map[String,Any]](x)
>>> })
>>>
>>> jsonStream.count().print()
>>>
>>>
>>> *//Approach 2:*
>>> val jsonStream2 =
>>> myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
>>> Any]])
>>>
>>> jsonStream2.count().print()
>>>
>>>
>>>
>>> It takes around 15-20 Seconds to process/parse 35k json documents
>>> (contains nested documents and arrays) which i put in the stream.
>>>
>>> Is there any better approach/parser to process it faster? i also tried
>>> it with mapPartitions but it did not make any difference.
>>>
>>>
>>>
>>>
>>> Thanks
>>> Best Regards
>>>
>>
>>
>


Re: SparkStreaming Low Performance

2015-02-14 Thread Akhil Das
Thanks for the suggestion, but doing that gives me this exception:

http://pastebin.com/ni80NqKn

Over this piece of code:

   object Holder extends Serializable {
  @transient lazy val mapper = new ObjectMapper() with ScalaObjectMapper
  mapper.registerModule(DefaultScalaModule)
}

val jsonStream = myDStream.map(x=> {
   Holder.mapper.readValue[Map[String,Any]](x)
})

Thanks
Best Regards

On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji  wrote:

> (adding back user)
>
> Fair enough. Regarding serialization exception, the hack I use is to have
> a object with a transient lazy field, like so:
>
>
> object Holder extends Serializable {
>   @transient lazy val mapper = new ObjectMapper()
> }
>
> This way, the ObjectMapper will be instantiated at the destination and you
> can share the instance.
>
>
>
> ᐧ
>
> On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das 
> wrote:
>
>> Thanks for the reply Enno, in my case rate from the stream is not the
>> bottleneck as i'm able to consume all those records at a time (have tested
>> it). And regarding the ObjectMapper, if i take it outside of my map
>> operation then it throws Serializable Exceptions (Caused by:
>> java.io.NotSerializableException:
>> com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).
>>
>> Thanks
>> Best Regards
>>
>> On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji  wrote:
>>
>>> If I were you I'd first parse some test jsons in isolation (outside
>>> Spark) to determine if the bottleneck is really the parsing. There are
>>> plenty other places that could be affecting your performance, like the rate
>>> you are able to read from your stream source etc.
>>>
>>> Apart from that, I notice that you are instantiating the ObjectMapper
>>> every time. This is quite expensive and jackson recommends you to share the
>>> instance. However, if you tried other parsers / mapPartitions without
>>> success, this probably won't fix your problem either.
>>>
>>>
>>>
>>>
>>>
>>> On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das 
>>> wrote:
>>>
 I'm getting a low performance while parsing json data. My cluster setup
 is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4
 cores.

 I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson
 parser.

 This is what i basically do:

 *//Approach 1:*
 val jsonStream = myDStream.map(x=> {
   val mapper = new ObjectMapper() with ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
   mapper.readValue[Map[String,Any]](x)
 })

 jsonStream.count().print()


 *//Approach 2:*
 val jsonStream2 =
 myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
 Any]])

 jsonStream2.count().print()



 It takes around 15-20 Seconds to process/parse 35k json documents
 (contains nested documents and arrays) which i put in the stream.

 Is there any better approach/parser to process it faster? i also tried
 it with mapPartitions but it did not make any difference.




 Thanks
 Best Regards

>>>
>>>
>>
>


Re: SparkStreaming Low Performance

2015-02-14 Thread Akhil Das
Ah my bad, it works without serializable exception. But not much
performance difference is there though.

Thanks
Best Regards

On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das 
wrote:

> Thanks for the suggestion, but doing that gives me this exception:
>
> http://pastebin.com/ni80NqKn
>
> Over this piece of code:
>
>object Holder extends Serializable {
>   @transient lazy val mapper = new ObjectMapper() with
> ScalaObjectMapper
>   mapper.registerModule(DefaultScalaModule)
> }
>
> val jsonStream = myDStream.map(x=> {
>Holder.mapper.readValue[Map[String,Any]](x)
> })
>
> Thanks
> Best Regards
>
> On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji  wrote:
>
>> (adding back user)
>>
>> Fair enough. Regarding serialization exception, the hack I use is to have
>> a object with a transient lazy field, like so:
>>
>>
>> object Holder extends Serializable {
>>   @transient lazy val mapper = new ObjectMapper()
>> }
>>
>> This way, the ObjectMapper will be instantiated at the destination and
>> you can share the instance.
>>
>>
>>
>> ᐧ
>>
>> On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das 
>> wrote:
>>
>>> Thanks for the reply Enno, in my case rate from the stream is not the
>>> bottleneck as i'm able to consume all those records at a time (have tested
>>> it). And regarding the ObjectMapper, if i take it outside of my map
>>> operation then it throws Serializable Exceptions (Caused by:
>>> java.io.NotSerializableException:
>>> com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji  wrote:
>>>
 If I were you I'd first parse some test jsons in isolation (outside
 Spark) to determine if the bottleneck is really the parsing. There are
 plenty other places that could be affecting your performance, like the rate
 you are able to read from your stream source etc.

 Apart from that, I notice that you are instantiating the ObjectMapper
 every time. This is quite expensive and jackson recommends you to share the
 instance. However, if you tried other parsers / mapPartitions without
 success, this probably won't fix your problem either.





 On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das 
 wrote:

> I'm getting a low performance while parsing json data. My cluster
> setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory
> and 4 cores.
>
> I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson
> parser.
>
> This is what i basically do:
>
> *//Approach 1:*
> val jsonStream = myDStream.map(x=> {
>   val mapper = new ObjectMapper() with ScalaObjectMapper
>   mapper.registerModule(DefaultScalaModule)
>   mapper.readValue[Map[String,Any]](x)
> })
>
> jsonStream.count().print()
>
>
> *//Approach 2:*
> val jsonStream2 =
> myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
> Any]])
>
> jsonStream2.count().print()
>
>
>
> It takes around 15-20 Seconds to process/parse 35k json documents
> (contains nested documents and arrays) which i put in the stream.
>
> Is there any better approach/parser to process it faster? i also tried
> it with mapPartitions but it did not make any difference.
>
>
>
>
> Thanks
> Best Regards
>


>>>
>>
>


Re: SparkStreaming Low Performance

2015-02-14 Thread Enno Shioji
I see. I'd really benchmark how the parsing performs outside Spark (in a
tight loop or something). If *that* is slow, you know it's the parsing. If
not, it's not the parsing.

Another thing you want to look at is CPU usage. If the actual parsing
really is the bottleneck, you should see very high CPU utilization. If not,
it's not the parsing per se but rather the ability to feed the messages to
the parsing library.


ᐧ

On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das 
wrote:

> Ah my bad, it works without serializable exception. But not much
> performance difference is there though.
>
> Thanks
> Best Regards
>
> On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das 
> wrote:
>
>> Thanks for the suggestion, but doing that gives me this exception:
>>
>> http://pastebin.com/ni80NqKn
>>
>> Over this piece of code:
>>
>>object Holder extends Serializable {
>>   @transient lazy val mapper = new ObjectMapper() with
>> ScalaObjectMapper
>>   mapper.registerModule(DefaultScalaModule)
>> }
>>
>> val jsonStream = myDStream.map(x=> {
>>Holder.mapper.readValue[Map[String,Any]](x)
>> })
>>
>> Thanks
>> Best Regards
>>
>> On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji  wrote:
>>
>>> (adding back user)
>>>
>>> Fair enough. Regarding serialization exception, the hack I use is to
>>> have a object with a transient lazy field, like so:
>>>
>>>
>>> object Holder extends Serializable {
>>>   @transient lazy val mapper = new ObjectMapper()
>>> }
>>>
>>> This way, the ObjectMapper will be instantiated at the destination and
>>> you can share the instance.
>>>
>>>
>>>
>>> ᐧ
>>>
>>> On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das 
>>> wrote:
>>>
 Thanks for the reply Enno, in my case rate from the stream is not the
 bottleneck as i'm able to consume all those records at a time (have tested
 it). And regarding the ObjectMapper, if i take it outside of my map
 operation then it throws Serializable Exceptions (Caused by:
 java.io.NotSerializableException:
 com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji  wrote:

> If I were you I'd first parse some test jsons in isolation (outside
> Spark) to determine if the bottleneck is really the parsing. There are
> plenty other places that could be affecting your performance, like the 
> rate
> you are able to read from your stream source etc.
>
> Apart from that, I notice that you are instantiating the ObjectMapper
> every time. This is quite expensive and jackson recommends you to share 
> the
> instance. However, if you tried other parsers / mapPartitions without
> success, this probably won't fix your problem either.
>
>
>
>
>
> On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das  > wrote:
>
>> I'm getting a low performance while parsing json data. My cluster
>> setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory
>> and 4 cores.
>>
>> I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson
>> parser.
>>
>> This is what i basically do:
>>
>> *//Approach 1:*
>> val jsonStream = myDStream.map(x=> {
>>   val mapper = new ObjectMapper() with ScalaObjectMapper
>>   mapper.registerModule(DefaultScalaModule)
>>   mapper.readValue[Map[String,Any]](x)
>> })
>>
>> jsonStream.count().print()
>>
>>
>> *//Approach 2:*
>> val jsonStream2 =
>> myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
>> Any]])
>>
>> jsonStream2.count().print()
>>
>>
>>
>> It takes around 15-20 Seconds to process/parse 35k json documents
>> (contains nested documents and arrays) which i put in the stream.
>>
>> Is there any better approach/parser to process it faster? i also
>> tried it with mapPartitions but it did not make any difference.
>>
>>
>>
>>
>> Thanks
>> Best Regards
>>
>
>

>>>
>>
>


Re: SparkStreaming Low Performance

2015-02-14 Thread Akhil Das
Thanks again!
Its with the parser only, just tried the parser
 without Spark. And it
took me 52 Sec to process 8k json records. Not sure if there's an efficient
way to do this in Spark, i know if i use sparkSQL with schemaRDD and all it
will be much faster, but i need that in SparkStreaming.

Thanks
Best Regards

On Sat, Feb 14, 2015 at 8:04 PM, Enno Shioji  wrote:

> I see. I'd really benchmark how the parsing performs outside Spark (in a
> tight loop or something). If *that* is slow, you know it's the parsing. If
> not, it's not the parsing.
>
> Another thing you want to look at is CPU usage. If the actual parsing
> really is the bottleneck, you should see very high CPU utilization. If not,
> it's not the parsing per se but rather the ability to feed the messages to
> the parsing library.
>
>
> ᐧ
>
> On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das 
> wrote:
>
>> Ah my bad, it works without serializable exception. But not much
>> performance difference is there though.
>>
>> Thanks
>> Best Regards
>>
>> On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das 
>> wrote:
>>
>>> Thanks for the suggestion, but doing that gives me this exception:
>>>
>>> http://pastebin.com/ni80NqKn
>>>
>>> Over this piece of code:
>>>
>>>object Holder extends Serializable {
>>>   @transient lazy val mapper = new ObjectMapper() with
>>> ScalaObjectMapper
>>>   mapper.registerModule(DefaultScalaModule)
>>> }
>>>
>>> val jsonStream = myDStream.map(x=> {
>>>Holder.mapper.readValue[Map[String,Any]](x)
>>> })
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji  wrote:
>>>
 (adding back user)

 Fair enough. Regarding serialization exception, the hack I use is to
 have a object with a transient lazy field, like so:


 object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper()
 }

 This way, the ObjectMapper will be instantiated at the destination and
 you can share the instance.



 ᐧ

 On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das 
 wrote:

> Thanks for the reply Enno, in my case rate from the stream is not the
> bottleneck as i'm able to consume all those records at a time (have tested
> it). And regarding the ObjectMapper, if i take it outside of my map
> operation then it throws Serializable Exceptions (Caused by:
> java.io.NotSerializableException:
> com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).
>
> Thanks
> Best Regards
>
> On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji 
> wrote:
>
>> If I were you I'd first parse some test jsons in isolation (outside
>> Spark) to determine if the bottleneck is really the parsing. There are
>> plenty other places that could be affecting your performance, like the 
>> rate
>> you are able to read from your stream source etc.
>>
>> Apart from that, I notice that you are instantiating the ObjectMapper
>> every time. This is quite expensive and jackson recommends you to share 
>> the
>> instance. However, if you tried other parsers / mapPartitions without
>> success, this probably won't fix your problem either.
>>
>>
>>
>>
>>
>> On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das <
>> ak...@sigmoidanalytics.com> wrote:
>>
>>> I'm getting a low performance while parsing json data. My cluster
>>> setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory
>>> and 4 cores.
>>>
>>> I tried both scala.util.parsing.json.JSON and and fasterxml's
>>> Jackson parser.
>>>
>>> This is what i basically do:
>>>
>>> *//Approach 1:*
>>> val jsonStream = myDStream.map(x=> {
>>>   val mapper = new ObjectMapper() with ScalaObjectMapper
>>>   mapper.registerModule(DefaultScalaModule)
>>>   mapper.readValue[Map[String,Any]](x)
>>> })
>>>
>>> jsonStream.count().print()
>>>
>>>
>>> *//Approach 2:*
>>> val jsonStream2 =
>>> myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
>>> Any]])
>>>
>>> jsonStream2.count().print()
>>>
>>>
>>>
>>> It takes around 15-20 Seconds to process/parse 35k json documents
>>> (contains nested documents and arrays) which i put in the stream.
>>>
>>> Is there any better approach/parser to process it faster? i also
>>> tried it with mapPartitions but it did not make any difference.
>>>
>>>
>>>
>>>
>>> Thanks
>>> Best Regards
>>>
>>
>>
>

>>>
>>
>


Re: SparkStreaming Low Performance

2015-02-14 Thread Enno Shioji
Huh, that would come to 6.5ms per one JSON. That does feel like a lot but
if your JSON file is big enough, I guess you could get that sort of
processing time.

Jackson is more or less the most efficient JSON parser out there, so unless
the Scala API is somehow affecting it, I don't see any better way. If you
only need to read parts of the JSON, you could look into exploiting
Jackson's stream parsing API 
.

I guess the good news is you can throw machines at it. You could also look
into other serialization frameworks.



ᐧ

On Sat, Feb 14, 2015 at 2:49 PM, Akhil Das 
wrote:

> Thanks again!
> Its with the parser only, just tried the parser
>  without Spark. And
> it took me 52 Sec to process 8k json records. Not sure if there's an
> efficient way to do this in Spark, i know if i use sparkSQL with schemaRDD
> and all it will be much faster, but i need that in SparkStreaming.
>
> Thanks
> Best Regards
>
> On Sat, Feb 14, 2015 at 8:04 PM, Enno Shioji  wrote:
>
>> I see. I'd really benchmark how the parsing performs outside Spark (in a
>> tight loop or something). If *that* is slow, you know it's the parsing. If
>> not, it's not the parsing.
>>
>> Another thing you want to look at is CPU usage. If the actual parsing
>> really is the bottleneck, you should see very high CPU utilization. If not,
>> it's not the parsing per se but rather the ability to feed the messages to
>> the parsing library.
>>
>>
>> ᐧ
>>
>> On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das 
>> wrote:
>>
>>> Ah my bad, it works without serializable exception. But not much
>>> performance difference is there though.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das 
>>> wrote:
>>>
 Thanks for the suggestion, but doing that gives me this exception:

 http://pastebin.com/ni80NqKn

 Over this piece of code:

object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper() with
 ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
 }

 val jsonStream = myDStream.map(x=> {
Holder.mapper.readValue[Map[String,Any]](x)
 })

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji  wrote:

> (adding back user)
>
> Fair enough. Regarding serialization exception, the hack I use is to
> have a object with a transient lazy field, like so:
>
>
> object Holder extends Serializable {
>   @transient lazy val mapper = new ObjectMapper()
> }
>
> This way, the ObjectMapper will be instantiated at the destination and
> you can share the instance.
>
>
>
> ᐧ
>
> On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das  > wrote:
>
>> Thanks for the reply Enno, in my case rate from the stream is not the
>> bottleneck as i'm able to consume all those records at a time (have 
>> tested
>> it). And regarding the ObjectMapper, if i take it outside of my map
>> operation then it throws Serializable Exceptions (Caused by:
>> java.io.NotSerializableException:
>> com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).
>>
>> Thanks
>> Best Regards
>>
>> On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji 
>> wrote:
>>
>>> If I were you I'd first parse some test jsons in isolation (outside
>>> Spark) to determine if the bottleneck is really the parsing. There are
>>> plenty other places that could be affecting your performance, like the 
>>> rate
>>> you are able to read from your stream source etc.
>>>
>>> Apart from that, I notice that you are instantiating the
>>> ObjectMapper every time. This is quite expensive and jackson recommends 
>>> you
>>> to share the instance. However, if you tried other parsers / 
>>> mapPartitions
>>> without success, this probably won't fix your problem either.
>>>
>>>
>>>
>>>
>>>
>>> On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das <
>>> ak...@sigmoidanalytics.com> wrote:
>>>
 I'm getting a low performance while parsing json data. My cluster
 setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of 
 memory
 and 4 cores.

 I tried both scala.util.parsing.json.JSON and and fasterxml's
 Jackson parser.

 This is what i basically do:

 *//Approach 1:*
 val jsonStream = myDStream.map(x=> {
   val mapper = new ObjectMapper() with ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
   mapper.readValue[Map[String,Any]](x)
 })

 jsonStream.count().print()


 *//Approach 2:*
 val jsonStream2 =
 myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immut

Re: SparkStreaming Low Performance

2015-02-15 Thread Akhil Das
Thanks Enno, let me have a look at Stream Parser version of Jackson.

Thanks
Best Regards

On Sat, Feb 14, 2015 at 9:30 PM, Enno Shioji  wrote:

> Huh, that would come to 6.5ms per one JSON. That does feel like a lot but
> if your JSON file is big enough, I guess you could get that sort of
> processing time.
>
> Jackson is more or less the most efficient JSON parser out there, so
> unless the Scala API is somehow affecting it, I don't see any better way.
> If you only need to read parts of the JSON, you could look into exploiting
> Jackson's stream parsing API
> .
>
> I guess the good news is you can throw machines at it. You could also look
> into other serialization frameworks.
>
>
>
> ᐧ
>
> On Sat, Feb 14, 2015 at 2:49 PM, Akhil Das 
> wrote:
>
>> Thanks again!
>> Its with the parser only, just tried the parser
>>  without Spark. And
>> it took me 52 Sec to process 8k json records. Not sure if there's an
>> efficient way to do this in Spark, i know if i use sparkSQL with schemaRDD
>> and all it will be much faster, but i need that in SparkStreaming.
>>
>> Thanks
>> Best Regards
>>
>> On Sat, Feb 14, 2015 at 8:04 PM, Enno Shioji  wrote:
>>
>>> I see. I'd really benchmark how the parsing performs outside Spark (in a
>>> tight loop or something). If *that* is slow, you know it's the parsing. If
>>> not, it's not the parsing.
>>>
>>> Another thing you want to look at is CPU usage. If the actual parsing
>>> really is the bottleneck, you should see very high CPU utilization. If not,
>>> it's not the parsing per se but rather the ability to feed the messages to
>>> the parsing library.
>>>
>>>
>>> ᐧ
>>>
>>> On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das 
>>> wrote:
>>>
 Ah my bad, it works without serializable exception. But not much
 performance difference is there though.

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das 
 wrote:

> Thanks for the suggestion, but doing that gives me this exception:
>
> http://pastebin.com/ni80NqKn
>
> Over this piece of code:
>
>object Holder extends Serializable {
>   @transient lazy val mapper = new ObjectMapper() with
> ScalaObjectMapper
>   mapper.registerModule(DefaultScalaModule)
> }
>
> val jsonStream = myDStream.map(x=> {
>Holder.mapper.readValue[Map[String,Any]](x)
> })
>
> Thanks
> Best Regards
>
> On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji 
> wrote:
>
>> (adding back user)
>>
>> Fair enough. Regarding serialization exception, the hack I use is to
>> have a object with a transient lazy field, like so:
>>
>>
>> object Holder extends Serializable {
>>   @transient lazy val mapper = new ObjectMapper()
>> }
>>
>> This way, the ObjectMapper will be instantiated at the destination
>> and you can share the instance.
>>
>>
>>
>> ᐧ
>>
>> On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das <
>> ak...@sigmoidanalytics.com> wrote:
>>
>>> Thanks for the reply Enno, in my case rate from the stream is not
>>> the bottleneck as i'm able to consume all those records at a time (have
>>> tested it). And regarding the ObjectMapper, if i take it outside of my 
>>> map
>>> operation then it throws Serializable Exceptions (Caused by:
>>> java.io.NotSerializableException:
>>> com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji 
>>> wrote:
>>>
 If I were you I'd first parse some test jsons in isolation (outside
 Spark) to determine if the bottleneck is really the parsing. There are
 plenty other places that could be affecting your performance, like the 
 rate
 you are able to read from your stream source etc.

 Apart from that, I notice that you are instantiating the
 ObjectMapper every time. This is quite expensive and jackson 
 recommends you
 to share the instance. However, if you tried other parsers / 
 mapPartitions
 without success, this probably won't fix your problem either.





 On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das <
 ak...@sigmoidanalytics.com> wrote:

> I'm getting a low performance while parsing json data. My cluster
> setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of 
> memory
> and 4 cores.
>
> I tried both scala.util.parsing.json.JSON and and fasterxml's
> Jackson parser.
>
> This is what i basically do:
>
> *//Approach 1:*
> val jsonStream = myDStream.map(x=> {
>   val mapper = new ObjectMapper() with Sc