Friends, I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems:
1. I got the exception below. How to check an empty RDD? Exception in thread "main" java.lang.UnsupportedOperationException: empty collection at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.reduce(RDD.scala:869) at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204) val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…) messages.foreachRDD { rdd => val message:RDD[String] = rdd.map { y => y._2 } sqlContext.jsonRDD(message).registerTempTable("tempTable") sqlContext.sql("SELECT time,To FROM tempTable") .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", "msg")) } 2. how to get all column names from json messages? I have hundreds of columns in the json formatted message. Thanks for your help! Best regards, Cui Lin