Doesn't that result in consuming each RDD twice, in order to infer the
json schema?

On Wed, Jun 15, 2016 at 11:19 AM, Sivakumaran S <siva.kuma...@me.com> wrote:
> Of course :)
>
> object sparkStreaming {
>   def main(args: Array[String]) {
>     StreamingExamples.setStreamingLogLevels() //Set reasonable logging
> levels for streaming if the user has not configured log4j.
>     val topics = "test"
>     val brokers = "localhost:9092"
>     val topicsSet = topics.split(",").toSet
>     val sparkConf = new
> SparkConf().setAppName("KafkaDroneCalc").setMaster("local")
> //spark://localhost:7077
>     val sc = new SparkContext(sparkConf)
>     val ssc = new StreamingContext(sc, Seconds(30))
>     val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
>     val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder] (ssc, kafkaParams, topicsSet)
>     val lines = messages.map(_._2)
>     val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>     lines.foreachRDD( rdd => {
>       val df = sqlContext.read.json(rdd)
>       df.registerTempTable(“drone")
>       sqlContext.sql("SELECT id, AVG(temp), AVG(rotor_rpm),
> AVG(winddirection), AVG(windspeed) FROM drone GROUP BY id").show()
>     })
>     ssc.start()
>     ssc.awaitTermination()
>   }
> }
>
> I haven’t checked long running performance though.
>
> Regards,
>
> Siva
>
> On 15-Jun-2016, at 5:02 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>
> Hi,
>
> Good to hear so! Mind sharing a few snippets of your solution?
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Wed, Jun 15, 2016 at 5:03 PM, Sivakumaran S <siva.kuma...@me.com> wrote:
>
> Thanks Jacek,
>
> Job completed!! :) Just used data frames and sql query. Very clean and
> functional code.
>
> Siva
>
> On 15-Jun-2016, at 3:10 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>
> mapWithState
>
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to