Hi Jacek and Cody,

First of all, thanks for helping me out.

I started with using combineByKey while testing with just one field. Of course 
it worked fine, but I was worried that the code would become unreadable if 
there were many fields. Which is why I shifted to sqlContext because the code 
is comprehensible. Let me work out the stream statistics and update you in a 
while. 



Regards,

Siva



> On 16-Jun-2016, at 11:29 AM, Jacek Laskowski <ja...@japila.pl> wrote:
> 
> Rather
> 
> val df = sqlContext.read.json(rdd)
> 
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
> 
> 
> On Wed, Jun 15, 2016 at 11:55 PM, Sivakumaran S <siva.kuma...@me.com> wrote:
>> Cody,
>> 
>> Are you referring to the  val lines = messages.map(_._2)?
>> 
>> Regards,
>> 
>> Siva
>> 
>>> On 15-Jun-2016, at 10:32 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>> 
>>> Doesn't that result in consuming each RDD twice, in order to infer the
>>> json schema?
>>> 
>>> On Wed, Jun 15, 2016 at 11:19 AM, Sivakumaran S <siva.kuma...@me.com> wrote:
>>>> Of course :)
>>>> 
>>>> object sparkStreaming {
>>>> def main(args: Array[String]) {
>>>>   StreamingExamples.setStreamingLogLevels() //Set reasonable logging
>>>> levels for streaming if the user has not configured log4j.
>>>>   val topics = "test"
>>>>   val brokers = "localhost:9092"
>>>>   val topicsSet = topics.split(",").toSet
>>>>   val sparkConf = new
>>>> SparkConf().setAppName("KafkaDroneCalc").setMaster("local")
>>>> //spark://localhost:7077
>>>>   val sc = new SparkContext(sparkConf)
>>>>   val ssc = new StreamingContext(sc, Seconds(30))
>>>>   val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
>>>>   val messages = KafkaUtils.createDirectStream[String, String,
>>>> StringDecoder, StringDecoder] (ssc, kafkaParams, topicsSet)
>>>>   val lines = messages.map(_._2)
>>>>   val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>>>   lines.foreachRDD( rdd => {
>>>>     val df = sqlContext.read.json(rdd)
>>>>     df.registerTempTable(“drone")
>>>>     sqlContext.sql("SELECT id, AVG(temp), AVG(rotor_rpm),
>>>> AVG(winddirection), AVG(windspeed) FROM drone GROUP BY id").show()
>>>>   })
>>>>   ssc.start()
>>>>   ssc.awaitTermination()
>>>> }
>>>> }
>>>> 
>>>> I haven’t checked long running performance though.
>>>> 
>>>> Regards,
>>>> 
>>>> Siva
>>>> 
>>>> On 15-Jun-2016, at 5:02 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> Good to hear so! Mind sharing a few snippets of your solution?
>>>> 
>>>> Pozdrawiam,
>>>> Jacek Laskowski
>>>> ----
>>>> https://medium.com/@jaceklaskowski/
>>>> Mastering Apache Spark http://bit.ly/mastering-apache-spark
>>>> Follow me at https://twitter.com/jaceklaskowski
>>>> 
>>>> 
>>>> On Wed, Jun 15, 2016 at 5:03 PM, Sivakumaran S <siva.kuma...@me.com> wrote:
>>>> 
>>>> Thanks Jacek,
>>>> 
>>>> Job completed!! :) Just used data frames and sql query. Very clean and
>>>> functional code.
>>>> 
>>>> Siva
>>>> 
>>>> On 15-Jun-2016, at 3:10 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>>>> 
>>>> mapWithState
>>>> 
>>>> 
>>>> 
>> 
>> 
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to