Re: How to parse Json formatted Kafka message in spark streaming
See following thread for 1.3.0 release: http://search-hadoop.com/m/JW1q5hV8c4 Looks like the release is around the corner. On Thu, Mar 5, 2015 at 3:26 PM, Cui Lin wrote: > Hi, Ted, > > Thanks for your reply. I noticed from the below link partitions.size > will not work for checking empty RDD in streams. It seems that the problem > can be solved in spark 1.3 which is no way to download at this time? > > https://issues.apache.org/jira/browse/SPARK-5270 > Best regards, > > Cui Lin > > From: Ted Yu > Date: Thursday, March 5, 2015 at 6:33 AM > To: Akhil Das > Cc: Cui Lin , "user@spark.apache.org" < > user@spark.apache.org> > Subject: Re: How to parse Json formatted Kafka message in spark streaming > > Cui: > You can check messages.partitions.size to determine whether messages is > an empty RDD. > > Cheers > > On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das > wrote: > >> When you use KafkaUtils.createStream with StringDecoders, it will >> return String objects inside your messages stream. To access the elements >> from the json, you could do something like the following: >> >> >> val mapStream = messages.map(x=> { >>val mapper = new ObjectMapper() with ScalaObjectMapper >> mapper.registerModule(DefaultScalaModule) >> >>mapper.readValue[Map[String,Any]](x)*.get("time")* >> }) >> >> >> >> Thanks >> Best Regards >> >> On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin wrote: >> >>> Friends, >>> >>> I'm trying to parse json formatted Kafka messages and then send back >>> to cassandra.I have two problems: >>> >>>1. I got the exception below. How to check an empty RDD? >>> >>> Exception in thread "main" java.lang.UnsupportedOperationException: >>> empty collection >>> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) >>> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) >>> at scala.Option.getOrElse(Option.scala:120) >>> at org.apache.spark.rdd.RDD.reduce(RDD.scala:869) >>> at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57) >>> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232) >>> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204) >>> >>> val messages = KafkaUtils.createStream[String, String, StringDecoder, >>> StringDecoder](…) >>> >>> messages.foreachRDD { rdd => >>> val message:RDD[String] = rdd.map { y => y._2 } >>> sqlContext.jsonRDD(message).registerTempTable("tempTable") >>> sqlContext.sql("SELECT time,To FROM tempTable") >>> .saveToCassandra(cassandra_keyspace, cassandra_table, >>> SomeColumns("key", "msg")) >>> } >>> >>> >>> 2. how to get all column names from json messages? I have hundreds of >>> columns in the json formatted message. >>> >>> Thanks for your help! >>> >>> >>> >>> >>> Best regards, >>> >>> Cui Lin >>> >> >> >
Re: How to parse Json formatted Kafka message in spark streaming
Hi, Ted, Thanks for your reply. I noticed from the below link partitions.size will not work for checking empty RDD in streams. It seems that the problem can be solved in spark 1.3 which is no way to download at this time? https://issues.apache.org/jira/browse/SPARK-5270 Best regards, Cui Lin From: Ted Yu mailto:yuzhih...@gmail.com>> Date: Thursday, March 5, 2015 at 6:33 AM To: Akhil Das mailto:ak...@sigmoidanalytics.com>> Cc: Cui Lin mailto:cui@hds.com>>, "user@spark.apache.org<mailto:user@spark.apache.org>" mailto:user@spark.apache.org>> Subject: Re: How to parse Json formatted Kafka message in spark streaming Cui: You can check messages.partitions.size to determine whether messages is an empty RDD. Cheers On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das mailto:ak...@sigmoidanalytics.com>> wrote: When you use KafkaUtils.createStream with StringDecoders, it will return String objects inside your messages stream. To access the elements from the json, you could do something like the following: val mapStream = messages.map(x=> { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x).get("time") }) Thanks Best Regards On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin mailto:cui@hds.com>> wrote: Friends, I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems: 1. I got the exception below. How to check an empty RDD? Exception in thread "main" java.lang.UnsupportedOperationException: empty collection at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.reduce(RDD.scala:869) at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204) val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…) messages.foreachRDD { rdd => val message:RDD[String] = rdd.map { y => y._2 } sqlContext.jsonRDD(message).registerTempTable("tempTable") sqlContext.sql("SELECT time,To FROM tempTable") .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", "msg")) } 2. how to get all column names from json messages? I have hundreds of columns in the json formatted message. Thanks for your help! Best regards, Cui Lin
Re: How to parse Json formatted Kafka message in spark streaming
Hi, Helena, I think your new version only fits to the json that has very limited columns. I couldn’t find MonthlyCommits, but I assume it only has small number of columns that are defined manually. In my case, I have hundreds of column names so it is not feasible to define any class for these columns. Is there any way to get column name instead of hard code “time” in this case? mapper.readValue[Map[String,Any]](x).get("time") Best regards, Cui Lin From: Helena Edelson mailto:helena.edel...@datastax.com>> Date: Thursday, March 5, 2015 at 7:02 AM To: Ted Yu mailto:yuzhih...@gmail.com>> Cc: Akhil Das mailto:ak...@sigmoidanalytics.com>>, Cui Lin mailto:cui@hds.com>>, "user@spark.apache.org<mailto:user@spark.apache.org>" mailto:user@spark.apache.org>> Subject: Re: How to parse Json formatted Kafka message in spark streaming Great point :) Cui, Here’s a cleaner way than I had before, w/out the use of spark sql for the mapping: KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafka.kafkaParams, Map("github" -> 5), StorageLevel.MEMORY_ONLY) .map{ case (k,v) => JsonParser.parse(v).extract[MonthlyCommits]} .saveToCassandra("githubstats","monthly_commits") [datastax_logo.png]<http://www.datastax.com/> HELENA EDELSON Senior Software Engineer, DSE Analytics [linkedin.png]<https://www.linkedin.com/in/helenaedelson>[twitter.png]<https://twitter.com/helenaedelson>[https://lh3.googleusercontent.com/osrzRgrOxm-gW72LtTXbYGuQkFiBqViXEQBVw4v_cbl99iphx_LETFoz0Ew_bYfYSqIg53gwho5elasykBtuKj1we5KqatfDbvYYw3vnupBmLrs0kkL0t4l9u8JDQqzwLw]<https://github.com/helena> On Mar 5, 2015, at 9:33 AM, Ted Yu mailto:yuzhih...@gmail.com>> wrote: Cui: You can check messages.partitions.size to determine whether messages is an empty RDD. Cheers On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das mailto:ak...@sigmoidanalytics.com>> wrote: When you use KafkaUtils.createStream with StringDecoders, it will return String objects inside your messages stream. To access the elements from the json, you could do something like the following: val mapStream = messages.map(x=> { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x).get("time") }) Thanks Best Regards On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin mailto:cui@hds.com>> wrote: Friends, I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems: 1. I got the exception below. How to check an empty RDD? Exception in thread "main" java.lang.UnsupportedOperationException: empty collection at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.reduce(RDD.scala:869) at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204) val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…) messages.foreachRDD { rdd => val message:RDD[String] = rdd.map { y => y._2 } sqlContext.jsonRDD(message).registerTempTable("tempTable") sqlContext.sql("SELECT time,To FROM tempTable") .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", "msg")) } 2. how to get all column names from json messages? I have hundreds of columns in the json formatted message. Thanks for your help! Best regards, Cui Lin
Re: How to parse Json formatted Kafka message in spark streaming
Great point :) Cui, Here’s a cleaner way than I had before, w/out the use of spark sql for the mapping: KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafka.kafkaParams, Map("github" -> 5), StorageLevel.MEMORY_ONLY) .map{ case (k,v) => JsonParser.parse(v).extract[MonthlyCommits]} .saveToCassandra("githubstats","monthly_commits") HELENA EDELSON Senior Software Engineer, DSE Analytics On Mar 5, 2015, at 9:33 AM, Ted Yu wrote: > Cui: > You can check messages.partitions.size to determine whether messages is an > empty RDD. > > Cheers > > On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das wrote: > When you use KafkaUtils.createStream with StringDecoders, it will return > String objects inside your messages stream. To access the elements from the > json, you could do something like the following: > > >val mapStream = messages.map(x=> { > val mapper = new ObjectMapper() with ScalaObjectMapper > mapper.registerModule(DefaultScalaModule) > > mapper.readValue[Map[String,Any]](x).get("time") > }) > > > > Thanks > Best Regards > > On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin wrote: > Friends, > > I'm trying to parse json formatted Kafka messages and then send back to > cassandra.I have two problems: > I got the exception below. How to check an empty RDD? > Exception in thread "main" java.lang.UnsupportedOperationException: empty > collection > at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) > at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.reduce(RDD.scala:869) > at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57) > at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232) > at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204) > > val messages = KafkaUtils.createStream[String, String, StringDecoder, > StringDecoder](…) > messages.foreachRDD { rdd => > val message:RDD[String] = rdd.map { y => y._2 } > sqlContext.jsonRDD(message).registerTempTable("tempTable") > sqlContext.sql("SELECT time,To FROM tempTable") > .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", > "msg")) > } > > 2. how to get all column names from json messages? I have hundreds of columns > in the json formatted message. > > Thanks for your help! > > > > > Best regards, > > Cui Lin > >
Re: How to parse Json formatted Kafka message in spark streaming
Cui: You can check messages.partitions.size to determine whether messages is an empty RDD. Cheers On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das wrote: > When you use KafkaUtils.createStream with StringDecoders, it will return > String objects inside your messages stream. To access the elements from the > json, you could do something like the following: > > >val mapStream = messages.map(x=> { > val mapper = new ObjectMapper() with ScalaObjectMapper > mapper.registerModule(DefaultScalaModule) > > mapper.readValue[Map[String,Any]](x)*.get("time")* > }) > > > > Thanks > Best Regards > > On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin wrote: > >> Friends, >> >> I'm trying to parse json formatted Kafka messages and then send back >> to cassandra.I have two problems: >> >>1. I got the exception below. How to check an empty RDD? >> >> Exception in thread "main" java.lang.UnsupportedOperationException: >> empty collection >> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) >> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) >> at scala.Option.getOrElse(Option.scala:120) >> at org.apache.spark.rdd.RDD.reduce(RDD.scala:869) >> at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57) >> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232) >> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204) >> >> val messages = KafkaUtils.createStream[String, String, StringDecoder, >> StringDecoder](…) >> >> messages.foreachRDD { rdd => >> val message:RDD[String] = rdd.map { y => y._2 } >> sqlContext.jsonRDD(message).registerTempTable("tempTable") >> sqlContext.sql("SELECT time,To FROM tempTable") >> .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", >> "msg")) >> } >> >> >> 2. how to get all column names from json messages? I have hundreds of >> columns in the json formatted message. >> >> Thanks for your help! >> >> >> >> >> Best regards, >> >> Cui Lin >> > >
Re: How to parse Json formatted Kafka message in spark streaming
Hi Cui, What version of Spark are you using? There was a bug ticket that may be related to this, fixed in core/src/main/scala/org/apache/spark/rdd/RDD.scala that is merged into versions 1.3.0 and 1.2.1 . If you are using 1.1.1 that may be the reason but it’s a stretch https://issues.apache.org/jira/browse/SPARK-4968 Did you verify that you have data streaming from Kafka? Helena https://twitter.com/helenaedelson On Mar 5, 2015, at 12:43 AM, Cui Lin wrote: > Friends, > > I'm trying to parse json formatted Kafka messages and then send back to > cassandra.I have two problems: > I got the exception below. How to check an empty RDD? > Exception in thread "main" java.lang.UnsupportedOperationException: empty > collection > at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) > at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.reduce(RDD.scala:869) > at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57) > at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232) > at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204) > > val messages = KafkaUtils.createStream[String, String, StringDecoder, > StringDecoder](…) > messages.foreachRDD { rdd => > val message:RDD[String] = rdd.map { y => y._2 } > sqlContext.jsonRDD(message).registerTempTable("tempTable") > sqlContext.sql("SELECT time,To FROM tempTable") > .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", > "msg")) > } > > 2. how to get all column names from json messages? I have hundreds of columns > in the json formatted message. > > Thanks for your help! > > > > > Best regards, > > Cui Lin
Re: How to parse Json formatted Kafka message in spark streaming
When you use KafkaUtils.createStream with StringDecoders, it will return String objects inside your messages stream. To access the elements from the json, you could do something like the following: val mapStream = messages.map(x=> { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x)*.get("time")* }) Thanks Best Regards On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin wrote: > Friends, > > I'm trying to parse json formatted Kafka messages and then send back to > cassandra.I have two problems: > >1. I got the exception below. How to check an empty RDD? > > Exception in thread "main" java.lang.UnsupportedOperationException: > empty collection > at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) > at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.reduce(RDD.scala:869) > at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57) > at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232) > at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204) > > val messages = KafkaUtils.createStream[String, String, StringDecoder, > StringDecoder](…) > > messages.foreachRDD { rdd => > val message:RDD[String] = rdd.map { y => y._2 } > sqlContext.jsonRDD(message).registerTempTable("tempTable") > sqlContext.sql("SELECT time,To FROM tempTable") > .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", > "msg")) > } > > > 2. how to get all column names from json messages? I have hundreds of > columns in the json formatted message. > > Thanks for your help! > > > > > Best regards, > > Cui Lin >
How to parse Json formatted Kafka message in spark streaming
Friends, I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems: 1. I got the exception below. How to check an empty RDD? Exception in thread "main" java.lang.UnsupportedOperationException: empty collection at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.reduce(RDD.scala:869) at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204) val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…) messages.foreachRDD { rdd => val message:RDD[String] = rdd.map { y => y._2 } sqlContext.jsonRDD(message).registerTempTable("tempTable") sqlContext.sql("SELECT time,To FROM tempTable") .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", "msg")) } 2. how to get all column names from json messages? I have hundreds of columns in the json formatted message. Thanks for your help! Best regards, Cui Lin