Cui:
You can check messages.partitions.size to determine whether messages is an
empty RDD.

Cheers

On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> When you use KafkaUtils.createStream with StringDecoders, it will return
> String objects inside your messages stream. To access the elements from the
> json, you could do something like the following:
>
>
>    val mapStream = messages.map(x=> {
>       val mapper = new ObjectMapper() with ScalaObjectMapper
>       mapper.registerModule(DefaultScalaModule)
>
>       mapper.readValue[Map[String,Any]](x)*.get("time")*
>     })
>
>
>
> Thanks
> Best Regards
>
> On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin <cui....@hds.com> wrote:
>
>>   Friends,
>>
>>   I'm trying to parse json formatted Kafka messages and then send back
>> to cassandra.I have two problems:
>>
>>    1. I got the exception below. How to check an empty RDD?
>>
>>  Exception in thread "main" java.lang.UnsupportedOperationException:
>> empty collection
>>  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>>  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>>  at scala.Option.getOrElse(Option.scala:120)
>>  at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
>>  at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
>>  at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
>>  at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
>>
>>  val messages = KafkaUtils.createStream[String, String, StringDecoder, 
>> StringDecoder](…)
>>
>> messages.foreachRDD { rdd =>
>>   val message:RDD[String] = rdd.map { y => y._2 }
>>   sqlContext.jsonRDD(message).registerTempTable("tempTable")
>>   sqlContext.sql("SELECT time,To FROM tempTable")
>>     .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", 
>> "msg"))
>> }
>>
>>
>>  2. how to get all column names from json messages? I have hundreds of
>> columns in the json formatted message.
>>
>>  Thanks for your help!
>>
>>
>>
>>
>>  Best regards,
>>
>>  Cui Lin
>>
>
>

Reply via email to