Re: SparkStreaming Low Performance

2015-02-15 Thread Akhil Das
Thanks Enno, let me have a look at Stream Parser version of Jackson.

Thanks
Best Regards

On Sat, Feb 14, 2015 at 9:30 PM, Enno Shioji eshi...@gmail.com wrote:

 Huh, that would come to 6.5ms per one JSON. That does feel like a lot but
 if your JSON file is big enough, I guess you could get that sort of
 processing time.

 Jackson is more or less the most efficient JSON parser out there, so
 unless the Scala API is somehow affecting it, I don't see any better way.
 If you only need to read parts of the JSON, you could look into exploiting
 Jackson's stream parsing API
 http://wiki.fasterxml.com/JacksonStreamingApi.

 I guess the good news is you can throw machines at it. You could also look
 into other serialization frameworks.



 ᐧ

 On Sat, Feb 14, 2015 at 2:49 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Thanks again!
 Its with the parser only, just tried the parser
 https://gist.github.com/akhld/3948a5d91d218eaf809d without Spark. And
 it took me 52 Sec to process 8k json records. Not sure if there's an
 efficient way to do this in Spark, i know if i use sparkSQL with schemaRDD
 and all it will be much faster, but i need that in SparkStreaming.

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 8:04 PM, Enno Shioji eshi...@gmail.com wrote:

 I see. I'd really benchmark how the parsing performs outside Spark (in a
 tight loop or something). If *that* is slow, you know it's the parsing. If
 not, it's not the parsing.

 Another thing you want to look at is CPU usage. If the actual parsing
 really is the bottleneck, you should see very high CPU utilization. If not,
 it's not the parsing per se but rather the ability to feed the messages to
 the parsing library.


 ᐧ

 On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Ah my bad, it works without serializable exception. But not much
 performance difference is there though.

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Thanks for the suggestion, but doing that gives me this exception:

 http://pastebin.com/ni80NqKn

 Over this piece of code:

object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper() with
 ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
 }

 val jsonStream = myDStream.map(x= {
Holder.mapper.readValue[Map[String,Any]](x)
 })

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji eshi...@gmail.com
 wrote:

 (adding back user)

 Fair enough. Regarding serialization exception, the hack I use is to
 have a object with a transient lazy field, like so:


 object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper()
 }

 This way, the ObjectMapper will be instantiated at the destination
 and you can share the instance.



 ᐧ

 On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 Thanks for the reply Enno, in my case rate from the stream is not
 the bottleneck as i'm able to consume all those records at a time (have
 tested it). And regarding the ObjectMapper, if i take it outside of my 
 map
 operation then it throws Serializable Exceptions (Caused by:
 java.io.NotSerializableException:
 com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com
 wrote:

 If I were you I'd first parse some test jsons in isolation (outside
 Spark) to determine if the bottleneck is really the parsing. There are
 plenty other places that could be affecting your performance, like the 
 rate
 you are able to read from your stream source etc.

 Apart from that, I notice that you are instantiating the
 ObjectMapper every time. This is quite expensive and jackson 
 recommends you
 to share the instance. However, if you tried other parsers / 
 mapPartitions
 without success, this probably won't fix your problem either.





 On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 I'm getting a low performance while parsing json data. My cluster
 setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of 
 memory
 and 4 cores.

 I tried both scala.util.parsing.json.JSON and and fasterxml's
 Jackson parser.

 This is what i basically do:

 *//Approach 1:*
 val jsonStream = myDStream.map(x= {
   val mapper = new ObjectMapper() with ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
   mapper.readValue[Map[String,Any]](x)
 })

 jsonStream.count().print()


 *//Approach 2:*
 val jsonStream2 =
 myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
 Any]])

 jsonStream2.count().print()



 It takes around 15-20 Seconds to process/parse 35k json documents
 (contains nested documents and arrays) which i put in the stream.

 Is there any better approach/parser to process it faster? i also
 tried it with mapPartitions but it did not make any difference.




 Thanks
 

SparkStreaming Low Performance

2015-02-14 Thread Akhil Das
I'm getting a low performance while parsing json data. My cluster setup is
1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4 cores.

I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson
parser.

This is what i basically do:

*//Approach 1:*
val jsonStream = myDStream.map(x= {
  val mapper = new ObjectMapper() with ScalaObjectMapper
  mapper.registerModule(DefaultScalaModule)
  mapper.readValue[Map[String,Any]](x)
})

jsonStream.count().print()


*//Approach 2:*
val jsonStream2 =
myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
Any]])

jsonStream2.count().print()



It takes around 15-20 Seconds to process/parse 35k json documents (contains
nested documents and arrays) which i put in the stream.

Is there any better approach/parser to process it faster? i also tried it
with mapPartitions but it did not make any difference.




Thanks
Best Regards


Re: SparkStreaming Low Performance

2015-02-14 Thread Enno Shioji
I see. I'd really benchmark how the parsing performs outside Spark (in a
tight loop or something). If *that* is slow, you know it's the parsing. If
not, it's not the parsing.

Another thing you want to look at is CPU usage. If the actual parsing
really is the bottleneck, you should see very high CPU utilization. If not,
it's not the parsing per se but rather the ability to feed the messages to
the parsing library.


ᐧ

On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Ah my bad, it works without serializable exception. But not much
 performance difference is there though.

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Thanks for the suggestion, but doing that gives me this exception:

 http://pastebin.com/ni80NqKn

 Over this piece of code:

object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper() with
 ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
 }

 val jsonStream = myDStream.map(x= {
Holder.mapper.readValue[Map[String,Any]](x)
 })

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji eshi...@gmail.com wrote:

 (adding back user)

 Fair enough. Regarding serialization exception, the hack I use is to
 have a object with a transient lazy field, like so:


 object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper()
 }

 This way, the ObjectMapper will be instantiated at the destination and
 you can share the instance.



 ᐧ

 On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Thanks for the reply Enno, in my case rate from the stream is not the
 bottleneck as i'm able to consume all those records at a time (have tested
 it). And regarding the ObjectMapper, if i take it outside of my map
 operation then it throws Serializable Exceptions (Caused by:
 java.io.NotSerializableException:
 com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com wrote:

 If I were you I'd first parse some test jsons in isolation (outside
 Spark) to determine if the bottleneck is really the parsing. There are
 plenty other places that could be affecting your performance, like the 
 rate
 you are able to read from your stream source etc.

 Apart from that, I notice that you are instantiating the ObjectMapper
 every time. This is quite expensive and jackson recommends you to share 
 the
 instance. However, if you tried other parsers / mapPartitions without
 success, this probably won't fix your problem either.





 On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 I'm getting a low performance while parsing json data. My cluster
 setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory
 and 4 cores.

 I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson
 parser.

 This is what i basically do:

 *//Approach 1:*
 val jsonStream = myDStream.map(x= {
   val mapper = new ObjectMapper() with ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
   mapper.readValue[Map[String,Any]](x)
 })

 jsonStream.count().print()


 *//Approach 2:*
 val jsonStream2 =
 myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
 Any]])

 jsonStream2.count().print()



 It takes around 15-20 Seconds to process/parse 35k json documents
 (contains nested documents and arrays) which i put in the stream.

 Is there any better approach/parser to process it faster? i also
 tried it with mapPartitions but it did not make any difference.




 Thanks
 Best Regards









Re: SparkStreaming Low Performance

2015-02-14 Thread Akhil Das
Ah my bad, it works without serializable exception. But not much
performance difference is there though.

Thanks
Best Regards

On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Thanks for the suggestion, but doing that gives me this exception:

 http://pastebin.com/ni80NqKn

 Over this piece of code:

object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper() with
 ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
 }

 val jsonStream = myDStream.map(x= {
Holder.mapper.readValue[Map[String,Any]](x)
 })

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji eshi...@gmail.com wrote:

 (adding back user)

 Fair enough. Regarding serialization exception, the hack I use is to have
 a object with a transient lazy field, like so:


 object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper()
 }

 This way, the ObjectMapper will be instantiated at the destination and
 you can share the instance.



 ᐧ

 On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Thanks for the reply Enno, in my case rate from the stream is not the
 bottleneck as i'm able to consume all those records at a time (have tested
 it). And regarding the ObjectMapper, if i take it outside of my map
 operation then it throws Serializable Exceptions (Caused by:
 java.io.NotSerializableException:
 com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com wrote:

 If I were you I'd first parse some test jsons in isolation (outside
 Spark) to determine if the bottleneck is really the parsing. There are
 plenty other places that could be affecting your performance, like the rate
 you are able to read from your stream source etc.

 Apart from that, I notice that you are instantiating the ObjectMapper
 every time. This is quite expensive and jackson recommends you to share the
 instance. However, if you tried other parsers / mapPartitions without
 success, this probably won't fix your problem either.





 On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 I'm getting a low performance while parsing json data. My cluster
 setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory
 and 4 cores.

 I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson
 parser.

 This is what i basically do:

 *//Approach 1:*
 val jsonStream = myDStream.map(x= {
   val mapper = new ObjectMapper() with ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
   mapper.readValue[Map[String,Any]](x)
 })

 jsonStream.count().print()


 *//Approach 2:*
 val jsonStream2 =
 myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
 Any]])

 jsonStream2.count().print()



 It takes around 15-20 Seconds to process/parse 35k json documents
 (contains nested documents and arrays) which i put in the stream.

 Is there any better approach/parser to process it faster? i also tried
 it with mapPartitions but it did not make any difference.




 Thanks
 Best Regards








Re: SparkStreaming Low Performance

2015-02-14 Thread Enno Shioji
Huh, that would come to 6.5ms per one JSON. That does feel like a lot but
if your JSON file is big enough, I guess you could get that sort of
processing time.

Jackson is more or less the most efficient JSON parser out there, so unless
the Scala API is somehow affecting it, I don't see any better way. If you
only need to read parts of the JSON, you could look into exploiting
Jackson's stream parsing API http://wiki.fasterxml.com/JacksonStreamingApi
.

I guess the good news is you can throw machines at it. You could also look
into other serialization frameworks.



ᐧ

On Sat, Feb 14, 2015 at 2:49 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Thanks again!
 Its with the parser only, just tried the parser
 https://gist.github.com/akhld/3948a5d91d218eaf809d without Spark. And
 it took me 52 Sec to process 8k json records. Not sure if there's an
 efficient way to do this in Spark, i know if i use sparkSQL with schemaRDD
 and all it will be much faster, but i need that in SparkStreaming.

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 8:04 PM, Enno Shioji eshi...@gmail.com wrote:

 I see. I'd really benchmark how the parsing performs outside Spark (in a
 tight loop or something). If *that* is slow, you know it's the parsing. If
 not, it's not the parsing.

 Another thing you want to look at is CPU usage. If the actual parsing
 really is the bottleneck, you should see very high CPU utilization. If not,
 it's not the parsing per se but rather the ability to feed the messages to
 the parsing library.


 ᐧ

 On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Ah my bad, it works without serializable exception. But not much
 performance difference is there though.

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Thanks for the suggestion, but doing that gives me this exception:

 http://pastebin.com/ni80NqKn

 Over this piece of code:

object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper() with
 ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
 }

 val jsonStream = myDStream.map(x= {
Holder.mapper.readValue[Map[String,Any]](x)
 })

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji eshi...@gmail.com wrote:

 (adding back user)

 Fair enough. Regarding serialization exception, the hack I use is to
 have a object with a transient lazy field, like so:


 object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper()
 }

 This way, the ObjectMapper will be instantiated at the destination and
 you can share the instance.



 ᐧ

 On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 Thanks for the reply Enno, in my case rate from the stream is not the
 bottleneck as i'm able to consume all those records at a time (have 
 tested
 it). And regarding the ObjectMapper, if i take it outside of my map
 operation then it throws Serializable Exceptions (Caused by:
 java.io.NotSerializableException:
 com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com
 wrote:

 If I were you I'd first parse some test jsons in isolation (outside
 Spark) to determine if the bottleneck is really the parsing. There are
 plenty other places that could be affecting your performance, like the 
 rate
 you are able to read from your stream source etc.

 Apart from that, I notice that you are instantiating the
 ObjectMapper every time. This is quite expensive and jackson recommends 
 you
 to share the instance. However, if you tried other parsers / 
 mapPartitions
 without success, this probably won't fix your problem either.





 On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 I'm getting a low performance while parsing json data. My cluster
 setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of 
 memory
 and 4 cores.

 I tried both scala.util.parsing.json.JSON and and fasterxml's
 Jackson parser.

 This is what i basically do:

 *//Approach 1:*
 val jsonStream = myDStream.map(x= {
   val mapper = new ObjectMapper() with ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
   mapper.readValue[Map[String,Any]](x)
 })

 jsonStream.count().print()


 *//Approach 2:*
 val jsonStream2 =
 myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
 Any]])

 jsonStream2.count().print()



 It takes around 15-20 Seconds to process/parse 35k json documents
 (contains nested documents and arrays) which i put in the stream.

 Is there any better approach/parser to process it faster? i also
 tried it with mapPartitions but it did not make any difference.




 Thanks
 Best Regards











Re: SparkStreaming Low Performance

2015-02-14 Thread Enno Shioji
(adding back user)

Fair enough. Regarding serialization exception, the hack I use is to have a
object with a transient lazy field, like so:


object Holder extends Serializable {
  @transient lazy val mapper = new ObjectMapper()
}

This way, the ObjectMapper will be instantiated at the destination and you
can share the instance.



ᐧ

On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Thanks for the reply Enno, in my case rate from the stream is not the
 bottleneck as i'm able to consume all those records at a time (have tested
 it). And regarding the ObjectMapper, if i take it outside of my map
 operation then it throws Serializable Exceptions (Caused by:
 java.io.NotSerializableException:
 com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com wrote:

 If I were you I'd first parse some test jsons in isolation (outside
 Spark) to determine if the bottleneck is really the parsing. There are
 plenty other places that could be affecting your performance, like the rate
 you are able to read from your stream source etc.

 Apart from that, I notice that you are instantiating the ObjectMapper
 every time. This is quite expensive and jackson recommends you to share the
 instance. However, if you tried other parsers / mapPartitions without
 success, this probably won't fix your problem either.





 On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 I'm getting a low performance while parsing json data. My cluster setup
 is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4
 cores.

 I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson
 parser.

 This is what i basically do:

 *//Approach 1:*
 val jsonStream = myDStream.map(x= {
   val mapper = new ObjectMapper() with ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
   mapper.readValue[Map[String,Any]](x)
 })

 jsonStream.count().print()


 *//Approach 2:*
 val jsonStream2 =
 myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
 Any]])

 jsonStream2.count().print()



 It takes around 15-20 Seconds to process/parse 35k json documents
 (contains nested documents and arrays) which i put in the stream.

 Is there any better approach/parser to process it faster? i also tried
 it with mapPartitions but it did not make any difference.




 Thanks
 Best Regards






Re: SparkStreaming Low Performance

2015-02-14 Thread Akhil Das
Thanks for the suggestion, but doing that gives me this exception:

http://pastebin.com/ni80NqKn

Over this piece of code:

   object Holder extends Serializable {
  @transient lazy val mapper = new ObjectMapper() with ScalaObjectMapper
  mapper.registerModule(DefaultScalaModule)
}

val jsonStream = myDStream.map(x= {
   Holder.mapper.readValue[Map[String,Any]](x)
})

Thanks
Best Regards

On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji eshi...@gmail.com wrote:

 (adding back user)

 Fair enough. Regarding serialization exception, the hack I use is to have
 a object with a transient lazy field, like so:


 object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper()
 }

 This way, the ObjectMapper will be instantiated at the destination and you
 can share the instance.



 ᐧ

 On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Thanks for the reply Enno, in my case rate from the stream is not the
 bottleneck as i'm able to consume all those records at a time (have tested
 it). And regarding the ObjectMapper, if i take it outside of my map
 operation then it throws Serializable Exceptions (Caused by:
 java.io.NotSerializableException:
 com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com wrote:

 If I were you I'd first parse some test jsons in isolation (outside
 Spark) to determine if the bottleneck is really the parsing. There are
 plenty other places that could be affecting your performance, like the rate
 you are able to read from your stream source etc.

 Apart from that, I notice that you are instantiating the ObjectMapper
 every time. This is quite expensive and jackson recommends you to share the
 instance. However, if you tried other parsers / mapPartitions without
 success, this probably won't fix your problem either.





 On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 I'm getting a low performance while parsing json data. My cluster setup
 is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4
 cores.

 I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson
 parser.

 This is what i basically do:

 *//Approach 1:*
 val jsonStream = myDStream.map(x= {
   val mapper = new ObjectMapper() with ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
   mapper.readValue[Map[String,Any]](x)
 })

 jsonStream.count().print()


 *//Approach 2:*
 val jsonStream2 =
 myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
 Any]])

 jsonStream2.count().print()



 It takes around 15-20 Seconds to process/parse 35k json documents
 (contains nested documents and arrays) which i put in the stream.

 Is there any better approach/parser to process it faster? i also tried
 it with mapPartitions but it did not make any difference.




 Thanks
 Best Regards







Re: SparkStreaming Low Performance

2015-02-14 Thread Akhil Das
Thanks again!
Its with the parser only, just tried the parser
https://gist.github.com/akhld/3948a5d91d218eaf809d without Spark. And it
took me 52 Sec to process 8k json records. Not sure if there's an efficient
way to do this in Spark, i know if i use sparkSQL with schemaRDD and all it
will be much faster, but i need that in SparkStreaming.

Thanks
Best Regards

On Sat, Feb 14, 2015 at 8:04 PM, Enno Shioji eshi...@gmail.com wrote:

 I see. I'd really benchmark how the parsing performs outside Spark (in a
 tight loop or something). If *that* is slow, you know it's the parsing. If
 not, it's not the parsing.

 Another thing you want to look at is CPU usage. If the actual parsing
 really is the bottleneck, you should see very high CPU utilization. If not,
 it's not the parsing per se but rather the ability to feed the messages to
 the parsing library.


 ᐧ

 On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Ah my bad, it works without serializable exception. But not much
 performance difference is there though.

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Thanks for the suggestion, but doing that gives me this exception:

 http://pastebin.com/ni80NqKn

 Over this piece of code:

object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper() with
 ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
 }

 val jsonStream = myDStream.map(x= {
Holder.mapper.readValue[Map[String,Any]](x)
 })

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji eshi...@gmail.com wrote:

 (adding back user)

 Fair enough. Regarding serialization exception, the hack I use is to
 have a object with a transient lazy field, like so:


 object Holder extends Serializable {
   @transient lazy val mapper = new ObjectMapper()
 }

 This way, the ObjectMapper will be instantiated at the destination and
 you can share the instance.



 ᐧ

 On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Thanks for the reply Enno, in my case rate from the stream is not the
 bottleneck as i'm able to consume all those records at a time (have tested
 it). And regarding the ObjectMapper, if i take it outside of my map
 operation then it throws Serializable Exceptions (Caused by:
 java.io.NotSerializableException:
 com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier).

 Thanks
 Best Regards

 On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com
 wrote:

 If I were you I'd first parse some test jsons in isolation (outside
 Spark) to determine if the bottleneck is really the parsing. There are
 plenty other places that could be affecting your performance, like the 
 rate
 you are able to read from your stream source etc.

 Apart from that, I notice that you are instantiating the ObjectMapper
 every time. This is quite expensive and jackson recommends you to share 
 the
 instance. However, if you tried other parsers / mapPartitions without
 success, this probably won't fix your problem either.





 On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 I'm getting a low performance while parsing json data. My cluster
 setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory
 and 4 cores.

 I tried both scala.util.parsing.json.JSON and and fasterxml's
 Jackson parser.

 This is what i basically do:

 *//Approach 1:*
 val jsonStream = myDStream.map(x= {
   val mapper = new ObjectMapper() with ScalaObjectMapper
   mapper.registerModule(DefaultScalaModule)
   mapper.readValue[Map[String,Any]](x)
 })

 jsonStream.count().print()


 *//Approach 2:*
 val jsonStream2 =
 myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String,
 Any]])

 jsonStream2.count().print()



 It takes around 15-20 Seconds to process/parse 35k json documents
 (contains nested documents and arrays) which i put in the stream.

 Is there any better approach/parser to process it faster? i also
 tried it with mapPartitions but it did not make any difference.




 Thanks
 Best Regards