Like many people, I'm trying to do hourly counts. The twist is that I don't
want to count per hour of streaming, but per hour of the actual occurrence
of the event (wall clock, say YYYY-mm-dd HH).

My thought is to make the streaming window large enough that a full hour of
streaming data would fit inside it. Since my window slides in small
increments, I want to drop the lowest hour from the stream before
persisting the results(since it would have been reduced during the previous
batch and would be a partial count in the current). I have gotten this far

Every line of the input files is parsed into Event(type, hour), and stream
is a DStream[RDD[Event]]

val evtCountsByHour =
      stream.map(evt => (evt, 1))
        .reduceByKeyAndWindow(_+_, Seconds(secondsInWindow)) //hourly
counts per event
        .mapPartitions(iter => iter.map(x=>(x._1.hour,x)))

My understanding is that at this point, the event counts are keyed by hour.

1. How do I detect the smallest key? I have seen some examples of
partitionBy + mapPartitionsWithIndex and dropping the lowest index but
can't figure out how to do it with a DStream. My gut feeling is that the
first RDD in the stream has to contain the oldest data but that doesn't
seem to be the case(printed from inside evtCountsByHour.foreachRDD)

2. If someone is further ahead with this type of problem, could you give
some insight on how you approached it -- I think Streaming would be the
correct approach since I don't really want to worry about data that was
already processed and I want to process it continuously.  I opted on
reduceByKeyAndWindow with a large window as opposed to updateStateByKey as
the hour the event occurred in is part of the key and I don't care to keep
around that key once the next hour's events are coming in (I'm assuming
RDDs outside the window are considered unreferenced). But I'd love to hear
other suggestions if my logic is off.

Reply via email to