Hello folks, I have few questions: Say I have a source like this:
final DataStream<Data> data = env.fromSource( source, WatermarkStrategy.<Data>forBoundedOutOfOrderness(Duration.ofSeconds(60)) .withTimestampAssigner((event, timestamp) -> event.timestamp)); My pipeline after this is as followed: data.flatMap(new MyFlattendData()) .keyBy(new MyKeySelector()) .window(TumblingEventTimeWindows.of(Time.seconds(60))) .reduce(new MyReducer()); First question I have is that the timestamp I assign from the source, would it get carried to all steps below to my window ? Example say I have timestamped data from source as: => [ (10, data1), (12, data2), (59, data3), (61, data4), ... ] would this get flattened to say: => [ (10, flatdata1), (12, flatdata2), (61, flatdata4), ...] then keyed to say: => [ (10, [key1, flatdata1]), (12, [key1, flatdata2]), (61, [key1, flatdata4]), ... ] windows: 1st => [ flatdata1, flatdata2 ] 2nd => [ flatdata4, ... ] Would the windows created before the reduce function be applied be like I have illustrated or to have it this way, do I need to output a record at each step with the timestamp assigned for that record ? Basically is the timestamp assigned when reading from the source pushed (retained) down to all the steps below when doing event time window operation ? Next question is in my watermark strategy: how do I set the period of the watermarking. Basically from An out-of-order bound B means that once an event with timestamp T was encountered, no events older than T - B will follow any more when the watermarking is done. However, how frequently is watermarking done and when say watermarking, the last encountered event was with timestamp T , does this mean watermark timestamp would be T - B ? How can we control the watermarking period ? Thanks Sachin