In general it may be a better idea to actually convert the records from
hashmaps, to a specific data structure. Say

case class Record(id: int, name: String, mobile: String, score: Int,
test_type: String ... )

Then you should be able to do something like

val records = jsonf.map(m => convertMapToRecord(m))

Then to filter only math results you can do records.filter(r => r.test_type
== "math"). ...

If you have to do aggregations (sum, max, etc.) you have to figure out
whether you want to aggregate in every batch, or aggregate over a window of
time.

If you want to do each batch, then

filteredRecords.foreachRDD(rdd => {
   // get aggregates for each batch
})

If you want to do across a window of time (say 1 minute), then

filteredRecords.window(Minutes(1)).foreachRDD( rdd => {
   // get aggregates over last 1 minute, every 10 seconds (since 10 second
is the batch interval)
})




On Mon, Jul 14, 2014 at 3:06 PM, srinivas <kusamsrini...@gmail.com> wrote:

> Hi,
>   Thanks for ur reply...i imported StreamingContext and right now i am
> getting my Dstream as something like
>  map(id -> 123, name -> srini, mobile -> 12324214, score -> 123, test_type
> -> math)
>  map(id -> 321, name -> vasu, mobile -> 73942090, score -> 324, test_type
> ->sci)
>  map(id -> 432, name ->xxxx, mobile ->423141234,score -> 322,test_type ->
> math)
>
> each map collection is from json string. now if i want aggregrate the
> scores
> on only math or if i want to find out who got the highest score in math
> that
> shows both name and score..i would like to what transformation should i do
> to my existing dstream.I am very new to dealing with maps and dstream
> transformations..so please advise on how to proceed from here.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9661.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Reply via email to