Doesn't that result in consuming each RDD twice, in order to infer the json schema?
On Wed, Jun 15, 2016 at 11:19 AM, Sivakumaran S <siva.kuma...@me.com> wrote: > Of course :) > > object sparkStreaming { > def main(args: Array[String]) { > StreamingExamples.setStreamingLogLevels() //Set reasonable logging > levels for streaming if the user has not configured log4j. > val topics = "test" > val brokers = "localhost:9092" > val topicsSet = topics.split(",").toSet > val sparkConf = new > SparkConf().setAppName("KafkaDroneCalc").setMaster("local") > //spark://localhost:7077 > val sc = new SparkContext(sparkConf) > val ssc = new StreamingContext(sc, Seconds(30)) > val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) > val messages = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder] (ssc, kafkaParams, topicsSet) > val lines = messages.map(_._2) > val sqlContext = new org.apache.spark.sql.SQLContext(sc) > lines.foreachRDD( rdd => { > val df = sqlContext.read.json(rdd) > df.registerTempTable(“drone") > sqlContext.sql("SELECT id, AVG(temp), AVG(rotor_rpm), > AVG(winddirection), AVG(windspeed) FROM drone GROUP BY id").show() > }) > ssc.start() > ssc.awaitTermination() > } > } > > I haven’t checked long running performance though. > > Regards, > > Siva > > On 15-Jun-2016, at 5:02 PM, Jacek Laskowski <ja...@japila.pl> wrote: > > Hi, > > Good to hear so! Mind sharing a few snippets of your solution? > > Pozdrawiam, > Jacek Laskowski > ---- > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark http://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski > > > On Wed, Jun 15, 2016 at 5:03 PM, Sivakumaran S <siva.kuma...@me.com> wrote: > > Thanks Jacek, > > Job completed!! :) Just used data frames and sql query. Very clean and > functional code. > > Siva > > On 15-Jun-2016, at 3:10 PM, Jacek Laskowski <ja...@japila.pl> wrote: > > mapWithState > > > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org