Cui: You can check messages.partitions.size to determine whether messages is an empty RDD.
Cheers On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das <ak...@sigmoidanalytics.com> wrote: > When you use KafkaUtils.createStream with StringDecoders, it will return > String objects inside your messages stream. To access the elements from the > json, you could do something like the following: > > > val mapStream = messages.map(x=> { > val mapper = new ObjectMapper() with ScalaObjectMapper > mapper.registerModule(DefaultScalaModule) > > mapper.readValue[Map[String,Any]](x)*.get("time")* > }) > > > > Thanks > Best Regards > > On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin <cui....@hds.com> wrote: > >> Friends, >> >> I'm trying to parse json formatted Kafka messages and then send back >> to cassandra.I have two problems: >> >> 1. I got the exception below. How to check an empty RDD? >> >> Exception in thread "main" java.lang.UnsupportedOperationException: >> empty collection >> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) >> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) >> at scala.Option.getOrElse(Option.scala:120) >> at org.apache.spark.rdd.RDD.reduce(RDD.scala:869) >> at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57) >> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232) >> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204) >> >> val messages = KafkaUtils.createStream[String, String, StringDecoder, >> StringDecoder](…) >> >> messages.foreachRDD { rdd => >> val message:RDD[String] = rdd.map { y => y._2 } >> sqlContext.jsonRDD(message).registerTempTable("tempTable") >> sqlContext.sql("SELECT time,To FROM tempTable") >> .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", >> "msg")) >> } >> >> >> 2. how to get all column names from json messages? I have hundreds of >> columns in the json formatted message. >> >> Thanks for your help! >> >> >> >> >> Best regards, >> >> Cui Lin >> > >