Re: SparkStreaming Low Performance
Thanks Enno, let me have a look at Stream Parser version of Jackson. Thanks Best Regards On Sat, Feb 14, 2015 at 9:30 PM, Enno Shioji eshi...@gmail.com wrote: Huh, that would come to 6.5ms per one JSON. That does feel like a lot but if your JSON file is big enough, I guess you could get that sort of processing time. Jackson is more or less the most efficient JSON parser out there, so unless the Scala API is somehow affecting it, I don't see any better way. If you only need to read parts of the JSON, you could look into exploiting Jackson's stream parsing API http://wiki.fasterxml.com/JacksonStreamingApi. I guess the good news is you can throw machines at it. You could also look into other serialization frameworks. ᐧ On Sat, Feb 14, 2015 at 2:49 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks again! Its with the parser only, just tried the parser https://gist.github.com/akhld/3948a5d91d218eaf809d without Spark. And it took me 52 Sec to process 8k json records. Not sure if there's an efficient way to do this in Spark, i know if i use sparkSQL with schemaRDD and all it will be much faster, but i need that in SparkStreaming. Thanks Best Regards On Sat, Feb 14, 2015 at 8:04 PM, Enno Shioji eshi...@gmail.com wrote: I see. I'd really benchmark how the parsing performs outside Spark (in a tight loop or something). If *that* is slow, you know it's the parsing. If not, it's not the parsing. Another thing you want to look at is CPU usage. If the actual parsing really is the bottleneck, you should see very high CPU utilization. If not, it's not the parsing per se but rather the ability to feed the messages to the parsing library. ᐧ On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Ah my bad, it works without serializable exception. But not much performance difference is there though. Thanks Best Regards On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks for the suggestion, but doing that gives me this exception: http://pastebin.com/ni80NqKn Over this piece of code: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) } val jsonStream = myDStream.map(x= { Holder.mapper.readValue[Map[String,Any]](x) }) Thanks Best Regards On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji eshi...@gmail.com wrote: (adding back user) Fair enough. Regarding serialization exception, the hack I use is to have a object with a transient lazy field, like so: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() } This way, the ObjectMapper will be instantiated at the destination and you can share the instance. ᐧ On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks for the reply Enno, in my case rate from the stream is not the bottleneck as i'm able to consume all those records at a time (have tested it). And regarding the ObjectMapper, if i take it outside of my map operation then it throws Serializable Exceptions (Caused by: java.io.NotSerializableException: com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier). Thanks Best Regards On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com wrote: If I were you I'd first parse some test jsons in isolation (outside Spark) to determine if the bottleneck is really the parsing. There are plenty other places that could be affecting your performance, like the rate you are able to read from your stream source etc. Apart from that, I notice that you are instantiating the ObjectMapper every time. This is quite expensive and jackson recommends you to share the instance. However, if you tried other parsers / mapPartitions without success, this probably won't fix your problem either. On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das ak...@sigmoidanalytics.com wrote: I'm getting a low performance while parsing json data. My cluster setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4 cores. I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson parser. This is what i basically do: *//Approach 1:* val jsonStream = myDStream.map(x= { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x) }) jsonStream.count().print() *//Approach 2:* val jsonStream2 = myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) jsonStream2.count().print() It takes around 15-20 Seconds to process/parse 35k json documents (contains nested documents and arrays) which i put in the stream. Is there any better approach/parser to process it faster? i also tried it with mapPartitions but it did not make any difference. Thanks
SparkStreaming Low Performance
I'm getting a low performance while parsing json data. My cluster setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4 cores. I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson parser. This is what i basically do: *//Approach 1:* val jsonStream = myDStream.map(x= { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x) }) jsonStream.count().print() *//Approach 2:* val jsonStream2 = myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) jsonStream2.count().print() It takes around 15-20 Seconds to process/parse 35k json documents (contains nested documents and arrays) which i put in the stream. Is there any better approach/parser to process it faster? i also tried it with mapPartitions but it did not make any difference. Thanks Best Regards
Re: SparkStreaming Low Performance
I see. I'd really benchmark how the parsing performs outside Spark (in a tight loop or something). If *that* is slow, you know it's the parsing. If not, it's not the parsing. Another thing you want to look at is CPU usage. If the actual parsing really is the bottleneck, you should see very high CPU utilization. If not, it's not the parsing per se but rather the ability to feed the messages to the parsing library. ᐧ On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Ah my bad, it works without serializable exception. But not much performance difference is there though. Thanks Best Regards On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks for the suggestion, but doing that gives me this exception: http://pastebin.com/ni80NqKn Over this piece of code: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) } val jsonStream = myDStream.map(x= { Holder.mapper.readValue[Map[String,Any]](x) }) Thanks Best Regards On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji eshi...@gmail.com wrote: (adding back user) Fair enough. Regarding serialization exception, the hack I use is to have a object with a transient lazy field, like so: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() } This way, the ObjectMapper will be instantiated at the destination and you can share the instance. ᐧ On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks for the reply Enno, in my case rate from the stream is not the bottleneck as i'm able to consume all those records at a time (have tested it). And regarding the ObjectMapper, if i take it outside of my map operation then it throws Serializable Exceptions (Caused by: java.io.NotSerializableException: com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier). Thanks Best Regards On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com wrote: If I were you I'd first parse some test jsons in isolation (outside Spark) to determine if the bottleneck is really the parsing. There are plenty other places that could be affecting your performance, like the rate you are able to read from your stream source etc. Apart from that, I notice that you are instantiating the ObjectMapper every time. This is quite expensive and jackson recommends you to share the instance. However, if you tried other parsers / mapPartitions without success, this probably won't fix your problem either. On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das ak...@sigmoidanalytics.com wrote: I'm getting a low performance while parsing json data. My cluster setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4 cores. I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson parser. This is what i basically do: *//Approach 1:* val jsonStream = myDStream.map(x= { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x) }) jsonStream.count().print() *//Approach 2:* val jsonStream2 = myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) jsonStream2.count().print() It takes around 15-20 Seconds to process/parse 35k json documents (contains nested documents and arrays) which i put in the stream. Is there any better approach/parser to process it faster? i also tried it with mapPartitions but it did not make any difference. Thanks Best Regards
Re: SparkStreaming Low Performance
Ah my bad, it works without serializable exception. But not much performance difference is there though. Thanks Best Regards On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks for the suggestion, but doing that gives me this exception: http://pastebin.com/ni80NqKn Over this piece of code: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) } val jsonStream = myDStream.map(x= { Holder.mapper.readValue[Map[String,Any]](x) }) Thanks Best Regards On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji eshi...@gmail.com wrote: (adding back user) Fair enough. Regarding serialization exception, the hack I use is to have a object with a transient lazy field, like so: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() } This way, the ObjectMapper will be instantiated at the destination and you can share the instance. ᐧ On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks for the reply Enno, in my case rate from the stream is not the bottleneck as i'm able to consume all those records at a time (have tested it). And regarding the ObjectMapper, if i take it outside of my map operation then it throws Serializable Exceptions (Caused by: java.io.NotSerializableException: com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier). Thanks Best Regards On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com wrote: If I were you I'd first parse some test jsons in isolation (outside Spark) to determine if the bottleneck is really the parsing. There are plenty other places that could be affecting your performance, like the rate you are able to read from your stream source etc. Apart from that, I notice that you are instantiating the ObjectMapper every time. This is quite expensive and jackson recommends you to share the instance. However, if you tried other parsers / mapPartitions without success, this probably won't fix your problem either. On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das ak...@sigmoidanalytics.com wrote: I'm getting a low performance while parsing json data. My cluster setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4 cores. I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson parser. This is what i basically do: *//Approach 1:* val jsonStream = myDStream.map(x= { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x) }) jsonStream.count().print() *//Approach 2:* val jsonStream2 = myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) jsonStream2.count().print() It takes around 15-20 Seconds to process/parse 35k json documents (contains nested documents and arrays) which i put in the stream. Is there any better approach/parser to process it faster? i also tried it with mapPartitions but it did not make any difference. Thanks Best Regards
Re: SparkStreaming Low Performance
Huh, that would come to 6.5ms per one JSON. That does feel like a lot but if your JSON file is big enough, I guess you could get that sort of processing time. Jackson is more or less the most efficient JSON parser out there, so unless the Scala API is somehow affecting it, I don't see any better way. If you only need to read parts of the JSON, you could look into exploiting Jackson's stream parsing API http://wiki.fasterxml.com/JacksonStreamingApi . I guess the good news is you can throw machines at it. You could also look into other serialization frameworks. ᐧ On Sat, Feb 14, 2015 at 2:49 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks again! Its with the parser only, just tried the parser https://gist.github.com/akhld/3948a5d91d218eaf809d without Spark. And it took me 52 Sec to process 8k json records. Not sure if there's an efficient way to do this in Spark, i know if i use sparkSQL with schemaRDD and all it will be much faster, but i need that in SparkStreaming. Thanks Best Regards On Sat, Feb 14, 2015 at 8:04 PM, Enno Shioji eshi...@gmail.com wrote: I see. I'd really benchmark how the parsing performs outside Spark (in a tight loop or something). If *that* is slow, you know it's the parsing. If not, it's not the parsing. Another thing you want to look at is CPU usage. If the actual parsing really is the bottleneck, you should see very high CPU utilization. If not, it's not the parsing per se but rather the ability to feed the messages to the parsing library. ᐧ On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Ah my bad, it works without serializable exception. But not much performance difference is there though. Thanks Best Regards On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks for the suggestion, but doing that gives me this exception: http://pastebin.com/ni80NqKn Over this piece of code: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) } val jsonStream = myDStream.map(x= { Holder.mapper.readValue[Map[String,Any]](x) }) Thanks Best Regards On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji eshi...@gmail.com wrote: (adding back user) Fair enough. Regarding serialization exception, the hack I use is to have a object with a transient lazy field, like so: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() } This way, the ObjectMapper will be instantiated at the destination and you can share the instance. ᐧ On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks for the reply Enno, in my case rate from the stream is not the bottleneck as i'm able to consume all those records at a time (have tested it). And regarding the ObjectMapper, if i take it outside of my map operation then it throws Serializable Exceptions (Caused by: java.io.NotSerializableException: com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier). Thanks Best Regards On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com wrote: If I were you I'd first parse some test jsons in isolation (outside Spark) to determine if the bottleneck is really the parsing. There are plenty other places that could be affecting your performance, like the rate you are able to read from your stream source etc. Apart from that, I notice that you are instantiating the ObjectMapper every time. This is quite expensive and jackson recommends you to share the instance. However, if you tried other parsers / mapPartitions without success, this probably won't fix your problem either. On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das ak...@sigmoidanalytics.com wrote: I'm getting a low performance while parsing json data. My cluster setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4 cores. I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson parser. This is what i basically do: *//Approach 1:* val jsonStream = myDStream.map(x= { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x) }) jsonStream.count().print() *//Approach 2:* val jsonStream2 = myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) jsonStream2.count().print() It takes around 15-20 Seconds to process/parse 35k json documents (contains nested documents and arrays) which i put in the stream. Is there any better approach/parser to process it faster? i also tried it with mapPartitions but it did not make any difference. Thanks Best Regards
Re: SparkStreaming Low Performance
(adding back user) Fair enough. Regarding serialization exception, the hack I use is to have a object with a transient lazy field, like so: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() } This way, the ObjectMapper will be instantiated at the destination and you can share the instance. ᐧ On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks for the reply Enno, in my case rate from the stream is not the bottleneck as i'm able to consume all those records at a time (have tested it). And regarding the ObjectMapper, if i take it outside of my map operation then it throws Serializable Exceptions (Caused by: java.io.NotSerializableException: com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier). Thanks Best Regards On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com wrote: If I were you I'd first parse some test jsons in isolation (outside Spark) to determine if the bottleneck is really the parsing. There are plenty other places that could be affecting your performance, like the rate you are able to read from your stream source etc. Apart from that, I notice that you are instantiating the ObjectMapper every time. This is quite expensive and jackson recommends you to share the instance. However, if you tried other parsers / mapPartitions without success, this probably won't fix your problem either. On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das ak...@sigmoidanalytics.com wrote: I'm getting a low performance while parsing json data. My cluster setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4 cores. I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson parser. This is what i basically do: *//Approach 1:* val jsonStream = myDStream.map(x= { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x) }) jsonStream.count().print() *//Approach 2:* val jsonStream2 = myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) jsonStream2.count().print() It takes around 15-20 Seconds to process/parse 35k json documents (contains nested documents and arrays) which i put in the stream. Is there any better approach/parser to process it faster? i also tried it with mapPartitions but it did not make any difference. Thanks Best Regards
Re: SparkStreaming Low Performance
Thanks for the suggestion, but doing that gives me this exception: http://pastebin.com/ni80NqKn Over this piece of code: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) } val jsonStream = myDStream.map(x= { Holder.mapper.readValue[Map[String,Any]](x) }) Thanks Best Regards On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji eshi...@gmail.com wrote: (adding back user) Fair enough. Regarding serialization exception, the hack I use is to have a object with a transient lazy field, like so: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() } This way, the ObjectMapper will be instantiated at the destination and you can share the instance. ᐧ On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks for the reply Enno, in my case rate from the stream is not the bottleneck as i'm able to consume all those records at a time (have tested it). And regarding the ObjectMapper, if i take it outside of my map operation then it throws Serializable Exceptions (Caused by: java.io.NotSerializableException: com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier). Thanks Best Regards On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com wrote: If I were you I'd first parse some test jsons in isolation (outside Spark) to determine if the bottleneck is really the parsing. There are plenty other places that could be affecting your performance, like the rate you are able to read from your stream source etc. Apart from that, I notice that you are instantiating the ObjectMapper every time. This is quite expensive and jackson recommends you to share the instance. However, if you tried other parsers / mapPartitions without success, this probably won't fix your problem either. On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das ak...@sigmoidanalytics.com wrote: I'm getting a low performance while parsing json data. My cluster setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4 cores. I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson parser. This is what i basically do: *//Approach 1:* val jsonStream = myDStream.map(x= { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x) }) jsonStream.count().print() *//Approach 2:* val jsonStream2 = myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) jsonStream2.count().print() It takes around 15-20 Seconds to process/parse 35k json documents (contains nested documents and arrays) which i put in the stream. Is there any better approach/parser to process it faster? i also tried it with mapPartitions but it did not make any difference. Thanks Best Regards
Re: SparkStreaming Low Performance
Thanks again! Its with the parser only, just tried the parser https://gist.github.com/akhld/3948a5d91d218eaf809d without Spark. And it took me 52 Sec to process 8k json records. Not sure if there's an efficient way to do this in Spark, i know if i use sparkSQL with schemaRDD and all it will be much faster, but i need that in SparkStreaming. Thanks Best Regards On Sat, Feb 14, 2015 at 8:04 PM, Enno Shioji eshi...@gmail.com wrote: I see. I'd really benchmark how the parsing performs outside Spark (in a tight loop or something). If *that* is slow, you know it's the parsing. If not, it's not the parsing. Another thing you want to look at is CPU usage. If the actual parsing really is the bottleneck, you should see very high CPU utilization. If not, it's not the parsing per se but rather the ability to feed the messages to the parsing library. ᐧ On Sat, Feb 14, 2015 at 2:30 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Ah my bad, it works without serializable exception. But not much performance difference is there though. Thanks Best Regards On Sat, Feb 14, 2015 at 7:45 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks for the suggestion, but doing that gives me this exception: http://pastebin.com/ni80NqKn Over this piece of code: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) } val jsonStream = myDStream.map(x= { Holder.mapper.readValue[Map[String,Any]](x) }) Thanks Best Regards On Sat, Feb 14, 2015 at 7:32 PM, Enno Shioji eshi...@gmail.com wrote: (adding back user) Fair enough. Regarding serialization exception, the hack I use is to have a object with a transient lazy field, like so: object Holder extends Serializable { @transient lazy val mapper = new ObjectMapper() } This way, the ObjectMapper will be instantiated at the destination and you can share the instance. ᐧ On Sat, Feb 14, 2015 at 1:52 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Thanks for the reply Enno, in my case rate from the stream is not the bottleneck as i'm able to consume all those records at a time (have tested it). And regarding the ObjectMapper, if i take it outside of my map operation then it throws Serializable Exceptions (Caused by: java.io.NotSerializableException: com.fasterxml.jackson.module.scala.modifiers.SetTypeModifier). Thanks Best Regards On Sat, Feb 14, 2015 at 7:11 PM, Enno Shioji eshi...@gmail.com wrote: If I were you I'd first parse some test jsons in isolation (outside Spark) to determine if the bottleneck is really the parsing. There are plenty other places that could be affecting your performance, like the rate you are able to read from your stream source etc. Apart from that, I notice that you are instantiating the ObjectMapper every time. This is quite expensive and jackson recommends you to share the instance. However, if you tried other parsers / mapPartitions without success, this probably won't fix your problem either. On Sat, Feb 14, 2015 at 1:25 PM, Akhil Das ak...@sigmoidanalytics.com wrote: I'm getting a low performance while parsing json data. My cluster setup is 1.2.0 version of spark with 10 Nodes each having 15Gb of memory and 4 cores. I tried both scala.util.parsing.json.JSON and and fasterxml's Jackson parser. This is what i basically do: *//Approach 1:* val jsonStream = myDStream.map(x= { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x) }) jsonStream.count().print() *//Approach 2:* val jsonStream2 = myDStream.map(JSON.parseFull(_).get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) jsonStream2.count().print() It takes around 15-20 Seconds to process/parse 35k json documents (contains nested documents and arrays) which i put in the stream. Is there any better approach/parser to process it faster? i also tried it with mapPartitions but it did not make any difference. Thanks Best Regards