Hi All,

I have following requirement

1. i have avro json message containing {eventid, usage, starttime, endtime}
2. i am reading this from kafka source

3. if there is overlapping hour in a record split the record by rounding
off to hourly bounderies
4.My objective is a) read the message b) aggregate the usage between the
hour
5. send the aggregated data to another kafka topic.

i don't want aggregate based on clock window. if i see next hour in endtime
then i would like to close the window and aggregated usage to be send down
to kafka sink topic.


eg:
input data
4.55 - 5.00
5.00 -5.25
5.25- 5.55.
5.55-625

after split
4.55- 5.00 - expect record to be going out with this
5.00 -5.25
5.25- 5.55.
5.55-6.00 - expect record to be going out with this
5.00-625




1. i have set the eventime :
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

2. val hourlyAggregate: SingleOutputStreamOperator[Tuple2[String,
Report]] = stream
  .flatMap(new SplitFlatMap()  // checks if the overlapping hour if
yes then create split recordr with hourly boundarry
  .assignTimestampsAndWatermarks(new ReportTimestampExtractor)
  .keyBy(0)
      .window(TumblingEventTimeWindows.of(Time.seconds(intervalsecond.toLong)))

  .reduce(new Counter()) //aggrigates the usage collected within window

3. here is the implementation for timestampeextractor

class ReportTimestampExtractor extends
AssignerWithPeriodicWatermarks[Tuple2[String, EMMReport]] with
Serializable {
  override def extractTimestamp(e: Tuple2[String, Report],
prevElementTimestamp: Long) = {
    e.f1.getEndTime
  }

  override def getCurrentWatermark(): Watermark = {
    new Watermark(System.currentTimeMillis- 36000) //respect delay for 1 hour
  }
}


I see the aggregation is generated only the clock window rather than
when the window sees next hour in the record.



Is there anything i am missing. by definition eventtime if i set it
should respect message time rather than clock window




-- 
Thanks
Rohan

Reply via email to