Re: How to parse Json formatted Kafka message in spark streaming

2015-03-05 Thread Ted Yu
See following thread for 1.3.0 release:
http://search-hadoop.com/m/JW1q5hV8c4

Looks like the release is around the corner.

On Thu, Mar 5, 2015 at 3:26 PM, Cui Lin  wrote:

>   Hi, Ted,
>
>  Thanks for your reply. I noticed from the below link partitions.size
> will not work for checking empty RDD in streams. It seems that the problem
> can be solved in spark 1.3 which is no way to download at this time?
>
>  https://issues.apache.org/jira/browse/SPARK-5270
>  Best regards,
>
>  Cui Lin
>
>   From: Ted Yu 
> Date: Thursday, March 5, 2015 at 6:33 AM
> To: Akhil Das 
> Cc: Cui Lin , "user@spark.apache.org" <
> user@spark.apache.org>
> Subject: Re: How to parse Json formatted Kafka message in spark streaming
>
>   Cui:
> You can check messages.partitions.size to determine whether messages is
> an empty RDD.
>
>  Cheers
>
> On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das 
> wrote:
>
>>  When you use KafkaUtils.createStream with StringDecoders, it will
>> return String objects inside your messages stream. To access the elements
>> from the json, you could do something like the following:
>>
>>
>> val mapStream = messages.map(x=> {
>>val mapper = new ObjectMapper() with ScalaObjectMapper
>>   mapper.registerModule(DefaultScalaModule)
>>
>>mapper.readValue[Map[String,Any]](x)*.get("time")*
>> })
>>
>>
>>
>>  Thanks
>> Best Regards
>>
>> On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin  wrote:
>>
>>>   Friends,
>>>
>>>   I'm trying to parse json formatted Kafka messages and then send back
>>> to cassandra.I have two problems:
>>>
>>>1. I got the exception below. How to check an empty RDD?
>>>
>>>  Exception in thread "main" java.lang.UnsupportedOperationException:
>>> empty collection
>>>  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>>>  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>>>  at scala.Option.getOrElse(Option.scala:120)
>>>  at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
>>>  at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
>>>  at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
>>>  at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
>>>
>>>  val messages = KafkaUtils.createStream[String, String, StringDecoder, 
>>> StringDecoder](…)
>>>
>>> messages.foreachRDD { rdd =>
>>>   val message:RDD[String] = rdd.map { y => y._2 }
>>>   sqlContext.jsonRDD(message).registerTempTable("tempTable")
>>>   sqlContext.sql("SELECT time,To FROM tempTable")
>>> .saveToCassandra(cassandra_keyspace, cassandra_table, 
>>> SomeColumns("key", "msg"))
>>> }
>>>
>>>
>>>  2. how to get all column names from json messages? I have hundreds of
>>> columns in the json formatted message.
>>>
>>>  Thanks for your help!
>>>
>>>
>>>
>>>
>>>  Best regards,
>>>
>>>  Cui Lin
>>>
>>
>>
>


Re: How to parse Json formatted Kafka message in spark streaming

2015-03-05 Thread Cui Lin
Hi, Ted,

Thanks for your reply. I noticed from the below link partitions.size will not 
work for checking empty RDD in streams. It seems that the problem can be solved 
in spark 1.3 which is no way to download at this time?

https://issues.apache.org/jira/browse/SPARK-5270
Best regards,

Cui Lin

From: Ted Yu mailto:yuzhih...@gmail.com>>
Date: Thursday, March 5, 2015 at 6:33 AM
To: Akhil Das mailto:ak...@sigmoidanalytics.com>>
Cc: Cui Lin mailto:cui@hds.com>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Subject: Re: How to parse Json formatted Kafka message in spark streaming

Cui:
You can check messages.partitions.size to determine whether messages is an 
empty RDD.

Cheers

On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das 
mailto:ak...@sigmoidanalytics.com>> wrote:
When you use KafkaUtils.createStream with StringDecoders, it will return String 
objects inside your messages stream. To access the elements from the json, you 
could do something like the following:


   val mapStream = messages.map(x=> {
  val mapper = new ObjectMapper() with ScalaObjectMapper
  mapper.registerModule(DefaultScalaModule)

  mapper.readValue[Map[String,Any]](x).get("time")
})



Thanks
Best Regards

On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin 
mailto:cui@hds.com>> wrote:
Friends,

I'm trying to parse json formatted Kafka messages and then send back to 
cassandra.I have two problems:

  1.  I got the exception below. How to check an empty RDD?

Exception in thread "main" java.lang.UnsupportedOperationException: empty 
collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)


val messages = KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](…)

messages.foreachRDD { rdd =>
  val message:RDD[String] = rdd.map { y => y._2 }
  sqlContext.jsonRDD(message).registerTempTable("tempTable")
  sqlContext.sql("SELECT time,To FROM tempTable")
.saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", 
"msg"))
}

2. how to get all column names from json messages? I have hundreds of columns 
in the json formatted message.

Thanks for your help!




Best regards,

Cui Lin




Re: How to parse Json formatted Kafka message in spark streaming

2015-03-05 Thread Cui Lin
Hi, Helena,

I think your new version only fits to the json that has very limited columns. I 
couldn’t find MonthlyCommits, but I assume it only has small number of columns 
that are defined manually.
In my case, I have hundreds of column names so it is not feasible to define any 
class for these columns.

Is there any way to get column name instead of hard code “time” in this case?
 mapper.readValue[Map[String,Any]](x).get("time")

Best regards,

Cui Lin

From: Helena Edelson 
mailto:helena.edel...@datastax.com>>
Date: Thursday, March 5, 2015 at 7:02 AM
To: Ted Yu mailto:yuzhih...@gmail.com>>
Cc: Akhil Das mailto:ak...@sigmoidanalytics.com>>, 
Cui Lin mailto:cui@hds.com>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Subject: Re: How to parse Json formatted Kafka message in spark streaming

Great point :) Cui, Here’s a cleaner way than I had before, w/out the use of 
spark sql for the mapping:


KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
  ssc, kafka.kafkaParams, Map("github" -> 5), StorageLevel.MEMORY_ONLY)
  .map{ case (k,v) => JsonParser.parse(v).extract[MonthlyCommits]}
  .saveToCassandra("githubstats","monthly_commits")

[datastax_logo.png]<http://www.datastax.com/>
HELENA EDELSON
Senior Software Engineer,  DSE Analytics

[linkedin.png]<https://www.linkedin.com/in/helenaedelson>[twitter.png]<https://twitter.com/helenaedelson>[https://lh3.googleusercontent.com/osrzRgrOxm-gW72LtTXbYGuQkFiBqViXEQBVw4v_cbl99iphx_LETFoz0Ew_bYfYSqIg53gwho5elasykBtuKj1we5KqatfDbvYYw3vnupBmLrs0kkL0t4l9u8JDQqzwLw]<https://github.com/helena>

On Mar 5, 2015, at 9:33 AM, Ted Yu 
mailto:yuzhih...@gmail.com>> wrote:

Cui:
You can check messages.partitions.size to determine whether messages is an 
empty RDD.

Cheers

On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das 
mailto:ak...@sigmoidanalytics.com>> wrote:
When you use KafkaUtils.createStream with StringDecoders, it will return String 
objects inside your messages stream. To access the elements from the json, you 
could do something like the following:


   val mapStream = messages.map(x=> {
  val mapper = new ObjectMapper() with ScalaObjectMapper
  mapper.registerModule(DefaultScalaModule)

  mapper.readValue[Map[String,Any]](x).get("time")
})



Thanks
Best Regards

On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin 
mailto:cui@hds.com>> wrote:
Friends,

I'm trying to parse json formatted Kafka messages and then send back to 
cassandra.I have two problems:

  1.  I got the exception below. How to check an empty RDD?

Exception in thread "main" java.lang.UnsupportedOperationException: empty 
collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)


val messages = KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](…)

messages.foreachRDD { rdd =>
  val message:RDD[String] = rdd.map { y => y._2 }
  sqlContext.jsonRDD(message).registerTempTable("tempTable")
  sqlContext.sql("SELECT time,To FROM tempTable")
.saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", 
"msg"))
}

2. how to get all column names from json messages? I have hundreds of columns 
in the json formatted message.

Thanks for your help!




Best regards,

Cui Lin





Re: How to parse Json formatted Kafka message in spark streaming

2015-03-05 Thread Helena Edelson
Great point :) Cui, Here’s a cleaner way than I had before, w/out the use of 
spark sql for the mapping:

KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
  ssc, kafka.kafkaParams, Map("github" -> 5), StorageLevel.MEMORY_ONLY)
  .map{ case (k,v) => JsonParser.parse(v).extract[MonthlyCommits]}
  .saveToCassandra("githubstats","monthly_commits")


HELENA EDELSON
Senior Software Engineer,  DSE Analytics 

  

On Mar 5, 2015, at 9:33 AM, Ted Yu  wrote:

> Cui:
> You can check messages.partitions.size to determine whether messages is an 
> empty RDD.
> 
> Cheers
> 
> On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das  wrote:
> When you use KafkaUtils.createStream with StringDecoders, it will return 
> String objects inside your messages stream. To access the elements from the 
> json, you could do something like the following:
> 
> 
>val mapStream = messages.map(x=> {
>   val mapper = new ObjectMapper() with ScalaObjectMapper
>   mapper.registerModule(DefaultScalaModule)
> 
>   mapper.readValue[Map[String,Any]](x).get("time")
> })
> 
>   
> 
> Thanks
> Best Regards
> 
> On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin  wrote:
> Friends,
> 
> I'm trying to parse json formatted Kafka messages and then send back to 
> cassandra.I have two problems:
> I got the exception below. How to check an empty RDD?
> Exception in thread "main" java.lang.UnsupportedOperationException: empty 
> collection
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
> at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
> 
> val messages = KafkaUtils.createStream[String, String, StringDecoder, 
> StringDecoder](…)
> messages.foreachRDD { rdd =>
>   val message:RDD[String] = rdd.map { y => y._2 }
>   sqlContext.jsonRDD(message).registerTempTable("tempTable")
>   sqlContext.sql("SELECT time,To FROM tempTable")
> .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", 
> "msg"))
> }
> 
> 2. how to get all column names from json messages? I have hundreds of columns 
> in the json formatted message. 
> 
> Thanks for your help!
> 
> 
> 
> 
> Best regards,
> 
> Cui Lin
> 
> 



Re: How to parse Json formatted Kafka message in spark streaming

2015-03-05 Thread Ted Yu
Cui:
You can check messages.partitions.size to determine whether messages is an
empty RDD.

Cheers

On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das 
wrote:

> When you use KafkaUtils.createStream with StringDecoders, it will return
> String objects inside your messages stream. To access the elements from the
> json, you could do something like the following:
>
>
>val mapStream = messages.map(x=> {
>   val mapper = new ObjectMapper() with ScalaObjectMapper
>   mapper.registerModule(DefaultScalaModule)
>
>   mapper.readValue[Map[String,Any]](x)*.get("time")*
> })
>
>
>
> Thanks
> Best Regards
>
> On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin  wrote:
>
>>   Friends,
>>
>>   I'm trying to parse json formatted Kafka messages and then send back
>> to cassandra.I have two problems:
>>
>>1. I got the exception below. How to check an empty RDD?
>>
>>  Exception in thread "main" java.lang.UnsupportedOperationException:
>> empty collection
>>  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>>  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>>  at scala.Option.getOrElse(Option.scala:120)
>>  at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
>>  at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
>>  at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
>>  at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
>>
>>  val messages = KafkaUtils.createStream[String, String, StringDecoder, 
>> StringDecoder](…)
>>
>> messages.foreachRDD { rdd =>
>>   val message:RDD[String] = rdd.map { y => y._2 }
>>   sqlContext.jsonRDD(message).registerTempTable("tempTable")
>>   sqlContext.sql("SELECT time,To FROM tempTable")
>> .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", 
>> "msg"))
>> }
>>
>>
>>  2. how to get all column names from json messages? I have hundreds of
>> columns in the json formatted message.
>>
>>  Thanks for your help!
>>
>>
>>
>>
>>  Best regards,
>>
>>  Cui Lin
>>
>
>


Re: How to parse Json formatted Kafka message in spark streaming

2015-03-05 Thread Helena Edelson
Hi Cui,

What version of Spark are you using? There was a bug ticket that may be related 
to this, fixed in core/src/main/scala/org/apache/spark/rdd/RDD.scala that is 
merged into versions 1.3.0 and 1.2.1 . If you are using 1.1.1 that may be the 
reason but it’s a stretch https://issues.apache.org/jira/browse/SPARK-4968

Did you verify that you have data streaming from Kafka?

Helena
https://twitter.com/helenaedelson

On Mar 5, 2015, at 12:43 AM, Cui Lin  wrote:

> Friends,
> 
> I'm trying to parse json formatted Kafka messages and then send back to 
> cassandra.I have two problems:
> I got the exception below. How to check an empty RDD?
> Exception in thread "main" java.lang.UnsupportedOperationException: empty 
> collection
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
> at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
> 
> val messages = KafkaUtils.createStream[String, String, StringDecoder, 
> StringDecoder](…)
> messages.foreachRDD { rdd =>
>   val message:RDD[String] = rdd.map { y => y._2 }
>   sqlContext.jsonRDD(message).registerTempTable("tempTable")
>   sqlContext.sql("SELECT time,To FROM tempTable")
> .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", 
> "msg"))
> }
> 
> 2. how to get all column names from json messages? I have hundreds of columns 
> in the json formatted message. 
> 
> Thanks for your help!
> 
> 
> 
> 
> Best regards,
> 
> Cui Lin



Re: How to parse Json formatted Kafka message in spark streaming

2015-03-05 Thread Akhil Das
When you use KafkaUtils.createStream with StringDecoders, it will return
String objects inside your messages stream. To access the elements from the
json, you could do something like the following:


   val mapStream = messages.map(x=> {
  val mapper = new ObjectMapper() with ScalaObjectMapper
  mapper.registerModule(DefaultScalaModule)

  mapper.readValue[Map[String,Any]](x)*.get("time")*
})



Thanks
Best Regards

On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin  wrote:

>   Friends,
>
>   I'm trying to parse json formatted Kafka messages and then send back to
> cassandra.I have two problems:
>
>1. I got the exception below. How to check an empty RDD?
>
>  Exception in thread "main" java.lang.UnsupportedOperationException:
> empty collection
>  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>  at scala.Option.getOrElse(Option.scala:120)
>  at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
>  at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
>  at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
>  at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
>
>  val messages = KafkaUtils.createStream[String, String, StringDecoder, 
> StringDecoder](…)
>
> messages.foreachRDD { rdd =>
>   val message:RDD[String] = rdd.map { y => y._2 }
>   sqlContext.jsonRDD(message).registerTempTable("tempTable")
>   sqlContext.sql("SELECT time,To FROM tempTable")
> .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", 
> "msg"))
> }
>
>
>  2. how to get all column names from json messages? I have hundreds of
> columns in the json formatted message.
>
>  Thanks for your help!
>
>
>
>
>  Best regards,
>
>  Cui Lin
>


How to parse Json formatted Kafka message in spark streaming

2015-03-04 Thread Cui Lin
Friends,

I'm trying to parse json formatted Kafka messages and then send back to 
cassandra.I have two problems:

  1.  I got the exception below. How to check an empty RDD?

Exception in thread "main" java.lang.UnsupportedOperationException: empty 
collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)


val messages = KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](…)

messages.foreachRDD { rdd =>
  val message:RDD[String] = rdd.map { y => y._2 }
  sqlContext.jsonRDD(message).registerTempTable("tempTable")
  sqlContext.sql("SELECT time,To FROM tempTable")
.saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", 
"msg"))
}

2. how to get all column names from json messages? I have hundreds of columns 
in the json formatted message.

Thanks for your help!




Best regards,

Cui Lin