Solved! I have solved the problem combining both solutions. The result is this:
messages.foreachRDD { rdd => val message: RDD[String] = rdd.map { y => y._2 } val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) import sqlContext.implicits._ val df :DataFrame = sqlContext.jsonRDD(message).toDF() df.groupBy("classification").count().show() println("") } With the SQLContextSingleton the function of Spark Documentation Thanks for all! 2015-04-23 10:29 GMT+02:00 Sergio Jiménez Barrio <drarse.a...@gmail.com>: > Thank you ver much, Tathagata! > > > El miércoles, 22 de abril de 2015, Tathagata Das <t...@databricks.com> > escribió: > >> Aaah, that. That is probably a limitation of the SQLContext (cc'ing Yin >> for more information). >> >> >> On Wed, Apr 22, 2015 at 7:07 AM, Sergio Jiménez Barrio < >> drarse.a...@gmail.com> wrote: >> >>> Sorry, this is the error: >>> >>> [error] /home/sergio/Escritorio/hello/streaming.scala:77: Implementation >>> restriction: case classes cannot have more than 22 parameters. >>> >>> >>> >>> 2015-04-22 16:06 GMT+02:00 Sergio Jiménez Barrio <drarse.a...@gmail.com> >>> : >>> >>>> I tried the solution of the guide, but I exceded the size of case class >>>> Row: >>>> >>>> >>>> 2015-04-22 15:22 GMT+02:00 Tathagata Das <tathagata.das1...@gmail.com>: >>>> >>>>> Did you checkout the latest streaming programming guide? >>>>> >>>>> >>>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations >>>>> >>>>> You also need to be aware of that to convert json RDDs to dataframe, >>>>> sqlContext has to make a pass on the data to learn the schema. This will >>>>> fail if a batch has no data. You have to safeguard against that. >>>>> >>>>> On Wed, Apr 22, 2015 at 6:19 AM, ayan guha <guha.a...@gmail.com> >>>>> wrote: >>>>> >>>>>> What about sqlcontext.createDataframe(rdd)? >>>>>> On 22 Apr 2015 23:04, "Sergio Jiménez Barrio" <drarse.a...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I am using Kafka with Apache Stream to send JSON to Apache Spark: >>>>>>> >>>>>>> val messages = KafkaUtils.createDirectStream[String, String, >>>>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) >>>>>>> >>>>>>> Now, I want parse the DStream created to DataFrame, but I don't know >>>>>>> if Spark 1.3 have some easy way for this. ¿Any suggestion? I can get the >>>>>>> message with: >>>>>>> >>>>>>> val lines = messages.map(_._2) >>>>>>> >>>>>>> Thank u for all. Sergio J. >>>>>>> >>>>>>> >>>>>>> >>>>> >>>> >>> >> > > -- > Atte. Sergio Jiménez >