Dear Jacek and Cody,
I receive a stream of JSON (exactly this much: 4 json objects) once every 30 seconds from Kafka as follows (I have changed my data source to include more fields) : {"windspeed":4.23,"pressure":1008.39,"location":"Dundee","latitude":56.5,"longitude":-2.96667,"id":2650752,"humidity":97.0,"temp":12.54,"winddirection":12.0003} {"windspeed":4.23,"pressure":1008.39,"location":"Saint Andrews","latitude":56.338711,"longitude":-2.79902,"id":2638864,"humidity":97.0,"temp":12.54,"winddirection":12.0003} {"windspeed":5.53,"pressure":1016.25,"location":"Arbroath","latitude":56.563171,"longitude":-2.58736,"id":2657215,"humidity":96.0,"temp":11.59,"winddirection":9.50031} {"windspeed":4.73,"pressure":994.0,"location":"Aberdeen","latitude":57.143688,"longitude":-2.09814,"id":2657832,"humidity":1.0,"temp":0.0,"winddirection":357.0} {"windspeed":6.13,"pressure":994.0,"location":"Peterhead","latitude":57.50584,"longitude":-1.79806,"id":2640351,"humidity":1.0,"temp":0.0,"winddirection":8.50031} In my Spark app, I have set the batch duration as 60 seconds. Now, as per the 1.6.1 documentation, "Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. This conversion can be done using SQLContext.read.json() on either an RDD of String, or a JSON file.”. But what both of you pointed out is correct, it consumes the RDD twice, i do not understand why. Below is the snap of the DAG. I do not need stateful calculations and I need to write this result to a database at a later stage. Any input to improve this solution is appreciated. Regards, Siva > On 16-Jun-2016, at 12:48 PM, Sivakumaran S <siva.kuma...@me.com> wrote: > > Hi Jacek and Cody, > > First of all, thanks for helping me out. > > I started with using combineByKey while testing with just one field. Of > course it worked fine, but I was worried that the code would become > unreadable if there were many fields. Which is why I shifted to sqlContext > because the code is comprehensible. Let me work out the stream statistics and > update you in a while. > > > > Regards, > > Siva > > > >> On 16-Jun-2016, at 11:29 AM, Jacek Laskowski <ja...@japila.pl> wrote: >> >> Rather >> >> val df = sqlContext.read.json(rdd) >> >> Pozdrawiam, >> Jacek Laskowski >> ---- >> https://medium.com/@jaceklaskowski/ >> Mastering Apache Spark http://bit.ly/mastering-apache-spark >> Follow me at https://twitter.com/jaceklaskowski >> >> >> On Wed, Jun 15, 2016 at 11:55 PM, Sivakumaran S <siva.kuma...@me.com> wrote: >>> Cody, >>> >>> Are you referring to the val lines = messages.map(_._2)? >>> >>> Regards, >>> >>> Siva >>> >>>> On 15-Jun-2016, at 10:32 PM, Cody Koeninger <c...@koeninger.org> wrote: >>>> >>>> Doesn't that result in consuming each RDD twice, in order to infer the >>>> json schema? >>>> >>>> On Wed, Jun 15, 2016 at 11:19 AM, Sivakumaran S <siva.kuma...@me.com> >>>> wrote: >>>>> Of course :) >>>>> >>>>> object sparkStreaming { >>>>> def main(args: Array[String]) { >>>>> StreamingExamples.setStreamingLogLevels() //Set reasonable logging >>>>> levels for streaming if the user has not configured log4j. >>>>> val topics = "test" >>>>> val brokers = "localhost:9092" >>>>> val topicsSet = topics.split(",").toSet >>>>> val sparkConf = new >>>>> SparkConf().setAppName("KafkaDroneCalc").setMaster("local") >>>>> //spark://localhost:7077 >>>>> val sc = new SparkContext(sparkConf) >>>>> val ssc = new StreamingContext(sc, Seconds(30)) >>>>> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) >>>>> val messages = KafkaUtils.createDirectStream[String, String, >>>>> StringDecoder, StringDecoder] (ssc, kafkaParams, topicsSet) >>>>> val lines = messages.map(_._2) >>>>> val sqlContext = new org.apache.spark.sql.SQLContext(sc) >>>>> lines.foreachRDD( rdd => { >>>>> val df = sqlContext.read.json(rdd) >>>>> df.registerTempTable(“drone") >>>>> sqlContext.sql("SELECT id, AVG(temp), AVG(rotor_rpm), >>>>> AVG(winddirection), AVG(windspeed) FROM drone GROUP BY id").show() >>>>> }) >>>>> ssc.start() >>>>> ssc.awaitTermination() >>>>> } >>>>> } >>>>> >>>>> I haven’t checked long running performance though. >>>>> >>>>> Regards, >>>>> >>>>> Siva >>>>> >>>>> On 15-Jun-2016, at 5:02 PM, Jacek Laskowski <ja...@japila.pl> wrote: >>>>> >>>>> Hi, >>>>> >>>>> Good to hear so! Mind sharing a few snippets of your solution? >>>>> >>>>> Pozdrawiam, >>>>> Jacek Laskowski >>>>> ---- >>>>> https://medium.com/@jaceklaskowski/ >>>>> Mastering Apache Spark http://bit.ly/mastering-apache-spark >>>>> Follow me at https://twitter.com/jaceklaskowski >>>>> >>>>> >>>>> On Wed, Jun 15, 2016 at 5:03 PM, Sivakumaran S <siva.kuma...@me.com> >>>>> wrote: >>>>> >>>>> Thanks Jacek, >>>>> >>>>> Job completed!! :) Just used data frames and sql query. Very clean and >>>>> functional code. >>>>> >>>>> Siva >>>>> >>>>> On 15-Jun-2016, at 3:10 PM, Jacek Laskowski <ja...@japila.pl> wrote: >>>>> >>>>> mapWithState >>>>> >>>>> >>>>> >>> >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org >