Of course :)

object sparkStreaming {
  def main(args: Array[String]) {
    StreamingExamples.setStreamingLogLevels() //Set reasonable logging levels 
for streaming if the user has not configured log4j.
    val topics = "test"
    val brokers = "localhost:9092"
    val topicsSet = topics.split(",").toSet
    val sparkConf = new 
SparkConf().setAppName("KafkaDroneCalc").setMaster("local") 
//spark://localhost:7077
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(30))
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder] (ssc, kafkaParams, topicsSet)
    val lines = messages.map(_._2)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    lines.foreachRDD( rdd => {
      val df = sqlContext.read.json(rdd)
      df.registerTempTable(“drone")
      sqlContext.sql("SELECT id, AVG(temp), AVG(rotor_rpm), AVG(winddirection), 
AVG(windspeed) FROM drone GROUP BY id").show()
    })
    ssc.start()
    ssc.awaitTermination()
  }
}
I haven’t checked long running performance though. 

Regards,

Siva

> On 15-Jun-2016, at 5:02 PM, Jacek Laskowski <ja...@japila.pl> wrote:
> 
> Hi,
> 
> Good to hear so! Mind sharing a few snippets of your solution?
> 
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
> 
> 
> On Wed, Jun 15, 2016 at 5:03 PM, Sivakumaran S <siva.kuma...@me.com> wrote:
>> Thanks Jacek,
>> 
>> Job completed!! :) Just used data frames and sql query. Very clean and
>> functional code.
>> 
>> Siva
>> 
>> On 15-Jun-2016, at 3:10 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>> 
>> mapWithState
>> 
>> 

Reply via email to