Dear Jacek and Cody,

I receive a stream of JSON (exactly this much: 4 json objects) once every 30 
seconds from Kafka as follows (I have changed my data source to include more 
fields)
: 
{"windspeed":4.23,"pressure":1008.39,"location":"Dundee","latitude":56.5,"longitude":-2.96667,"id":2650752,"humidity":97.0,"temp":12.54,"winddirection":12.0003}
{"windspeed":4.23,"pressure":1008.39,"location":"Saint 
Andrews","latitude":56.338711,"longitude":-2.79902,"id":2638864,"humidity":97.0,"temp":12.54,"winddirection":12.0003}
{"windspeed":5.53,"pressure":1016.25,"location":"Arbroath","latitude":56.563171,"longitude":-2.58736,"id":2657215,"humidity":96.0,"temp":11.59,"winddirection":9.50031}
{"windspeed":4.73,"pressure":994.0,"location":"Aberdeen","latitude":57.143688,"longitude":-2.09814,"id":2657832,"humidity":1.0,"temp":0.0,"winddirection":357.0}
{"windspeed":6.13,"pressure":994.0,"location":"Peterhead","latitude":57.50584,"longitude":-1.79806,"id":2640351,"humidity":1.0,"temp":0.0,"winddirection":8.50031}

In my Spark app, I have set the batch duration as 60 seconds. Now, as per the 
1.6.1 documentation, "Spark SQL can automatically infer the schema of a JSON 
dataset and load it as a DataFrame. This conversion can be done using 
SQLContext.read.json() on either an RDD of String, or a JSON file.”. But what 
both of you pointed out is correct, it consumes the RDD twice, i do not 
understand why. Below is the snap of the DAG. 

I do not need stateful calculations and I need to write this result to a 
database at a later stage. Any input to improve this solution is appreciated. 




Regards,

Siva

> On 16-Jun-2016, at 12:48 PM, Sivakumaran S <siva.kuma...@me.com> wrote:
> 
> Hi Jacek and Cody,
> 
> First of all, thanks for helping me out.
> 
> I started with using combineByKey while testing with just one field. Of 
> course it worked fine, but I was worried that the code would become 
> unreadable if there were many fields. Which is why I shifted to sqlContext 
> because the code is comprehensible. Let me work out the stream statistics and 
> update you in a while. 
> 
> 
> 
> Regards,
> 
> Siva
> 
> 
> 
>> On 16-Jun-2016, at 11:29 AM, Jacek Laskowski <ja...@japila.pl> wrote:
>> 
>> Rather
>> 
>> val df = sqlContext.read.json(rdd)
>> 
>> Pozdrawiam,
>> Jacek Laskowski
>> ----
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>> 
>> 
>> On Wed, Jun 15, 2016 at 11:55 PM, Sivakumaran S <siva.kuma...@me.com> wrote:
>>> Cody,
>>> 
>>> Are you referring to the  val lines = messages.map(_._2)?
>>> 
>>> Regards,
>>> 
>>> Siva
>>> 
>>>> On 15-Jun-2016, at 10:32 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>>> 
>>>> Doesn't that result in consuming each RDD twice, in order to infer the
>>>> json schema?
>>>> 
>>>> On Wed, Jun 15, 2016 at 11:19 AM, Sivakumaran S <siva.kuma...@me.com> 
>>>> wrote:
>>>>> Of course :)
>>>>> 
>>>>> object sparkStreaming {
>>>>> def main(args: Array[String]) {
>>>>>  StreamingExamples.setStreamingLogLevels() //Set reasonable logging
>>>>> levels for streaming if the user has not configured log4j.
>>>>>  val topics = "test"
>>>>>  val brokers = "localhost:9092"
>>>>>  val topicsSet = topics.split(",").toSet
>>>>>  val sparkConf = new
>>>>> SparkConf().setAppName("KafkaDroneCalc").setMaster("local")
>>>>> //spark://localhost:7077
>>>>>  val sc = new SparkContext(sparkConf)
>>>>>  val ssc = new StreamingContext(sc, Seconds(30))
>>>>>  val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
>>>>>  val messages = KafkaUtils.createDirectStream[String, String,
>>>>> StringDecoder, StringDecoder] (ssc, kafkaParams, topicsSet)
>>>>>  val lines = messages.map(_._2)
>>>>>  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>>>>  lines.foreachRDD( rdd => {
>>>>>    val df = sqlContext.read.json(rdd)
>>>>>    df.registerTempTable(“drone")
>>>>>    sqlContext.sql("SELECT id, AVG(temp), AVG(rotor_rpm),
>>>>> AVG(winddirection), AVG(windspeed) FROM drone GROUP BY id").show()
>>>>>  })
>>>>>  ssc.start()
>>>>>  ssc.awaitTermination()
>>>>> }
>>>>> }
>>>>> 
>>>>> I haven’t checked long running performance though.
>>>>> 
>>>>> Regards,
>>>>> 
>>>>> Siva
>>>>> 
>>>>> On 15-Jun-2016, at 5:02 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> Good to hear so! Mind sharing a few snippets of your solution?
>>>>> 
>>>>> Pozdrawiam,
>>>>> Jacek Laskowski
>>>>> ----
>>>>> https://medium.com/@jaceklaskowski/
>>>>> Mastering Apache Spark http://bit.ly/mastering-apache-spark
>>>>> Follow me at https://twitter.com/jaceklaskowski
>>>>> 
>>>>> 
>>>>> On Wed, Jun 15, 2016 at 5:03 PM, Sivakumaran S <siva.kuma...@me.com> 
>>>>> wrote:
>>>>> 
>>>>> Thanks Jacek,
>>>>> 
>>>>> Job completed!! :) Just used data frames and sql query. Very clean and
>>>>> functional code.
>>>>> 
>>>>> Siva
>>>>> 
>>>>> On 15-Jun-2016, at 3:10 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>>>>> 
>>>>> mapWithState
>>>>> 
>>>>> 
>>>>> 
>>> 
>>> 
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>> 
>> 
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 
> 
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

Reply via email to