In general it may be a better idea to actually convert the records from hashmaps, to a specific data structure. Say
case class Record(id: int, name: String, mobile: String, score: Int, test_type: String ... ) Then you should be able to do something like val records = jsonf.map(m => convertMapToRecord(m)) Then to filter only math results you can do records.filter(r => r.test_type == "math"). ... If you have to do aggregations (sum, max, etc.) you have to figure out whether you want to aggregate in every batch, or aggregate over a window of time. If you want to do each batch, then filteredRecords.foreachRDD(rdd => { // get aggregates for each batch }) If you want to do across a window of time (say 1 minute), then filteredRecords.window(Minutes(1)).foreachRDD( rdd => { // get aggregates over last 1 minute, every 10 seconds (since 10 second is the batch interval) }) On Mon, Jul 14, 2014 at 3:06 PM, srinivas <kusamsrini...@gmail.com> wrote: > Hi, > Thanks for ur reply...i imported StreamingContext and right now i am > getting my Dstream as something like > map(id -> 123, name -> srini, mobile -> 12324214, score -> 123, test_type > -> math) > map(id -> 321, name -> vasu, mobile -> 73942090, score -> 324, test_type > ->sci) > map(id -> 432, name ->xxxx, mobile ->423141234,score -> 322,test_type -> > math) > > each map collection is from json string. now if i want aggregrate the > scores > on only math or if i want to find out who got the highest score in math > that > shows both name and score..i would like to what transformation should i do > to my existing dstream.I am very new to dealing with maps and dstream > transformations..so please advise on how to proceed from here. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9661.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. >