Hi All,
I have following requirement 1. i have avro json message containing {eventid, usage, starttime, endtime} 2. i am reading this from kafka source 3. if there is overlapping hour in a record split the record by rounding off to hourly bounderies 4.My objective is a) read the message b) aggregate the usage between the hour 5. send the aggregated data to another kafka topic. i don't want aggregate based on clock window. if i see next hour in endtime then i would like to close the window and aggregated usage to be send down to kafka sink topic. eg: input data 4.55 - 5.00 5.00 -5.25 5.25- 5.55. 5.55-625 after split 4.55- 5.00 - expect record to be going out with this 5.00 -5.25 5.25- 5.55. 5.55-6.00 - expect record to be going out with this 5.00-625 1. i have set the eventime : env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 2. val hourlyAggregate: SingleOutputStreamOperator[Tuple2[String, Report]] = stream .flatMap(new SplitFlatMap() // checks if the overlapping hour if yes then create split recordr with hourly boundarry .assignTimestampsAndWatermarks(new ReportTimestampExtractor) .keyBy(0) .window(TumblingEventTimeWindows.of(Time.seconds(intervalsecond.toLong))) .reduce(new Counter()) //aggrigates the usage collected within window 3. here is the implementation for timestampeextractor class ReportTimestampExtractor extends AssignerWithPeriodicWatermarks[Tuple2[String, EMMReport]] with Serializable { override def extractTimestamp(e: Tuple2[String, Report], prevElementTimestamp: Long) = { e.f1.getEndTime } override def getCurrentWatermark(): Watermark = { new Watermark(System.currentTimeMillis- 36000) //respect delay for 1 hour } } I see the aggregation is generated only the clock window rather than when the window sees next hour in the record. Is there anything i am missing. by definition eventtime if i set it should respect message time rather than clock window -- Thanks Rohan