Hi, I needed some clarity on the behaviour of the windows for my use case. I have defined my stream as follows:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); * env.setParallelism(1);* DataStream<String> live = env.addSource(new JsonTestSource()); DataStream<FlatObject> jsonToTuple = live.flatMap(new Splitter()); KeyedStream<FlatObject, String> keyStream = jsonToTuple.keyBy(new KeySelector<FlatObject,String>() { public String getKey(FlatObject value) throws Exception { return value.getIntersectionName(); } }); DataStream<FlatObject> flatStream = keyStream.window(GlobalWindows.create()) .trigger(new WindowCustomTrigger()) .apply(new TrafficWindow()); flatStream.print(); *For a given set of Json Objects(Ideal case): * {"event":[{"CurrentTimeInCycle":*20*,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":*30*,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":*40*,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":*60*,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":*10*,"S01":"G"}]} ------------------------------------------------------------ Trigger (Because the CurrentTimeInCycle is less for the current event than the previous event) {"event":[{"CurrentTimeInCycle":*20*,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":*30*,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":*40*,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":*60*,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":*5*,"S01":"G"}]} ------------------------------------------------------------ Trigger (Because the CurrentTimeInCycle is less for the current event than the previous event) *In my current program, the output is as the following (All the objects from the previous window are a part of the next window and it keeps on adding up to the next): * {"event":[{"CurrentTimeInCycle":*20*,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":*30*,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":*40*,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":*60*,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":*10*,"S01":"G"}]} ------------------------------------------------------------ Trigger (Because the CurrentTimeInCycle is less for the current event than the previous event) {"event":[{"CurrentTimeInCycle":*20*,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":*30*,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":*40*,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":*60*,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":*10*,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":*20*,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":*30*,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":*40*,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":*60*,"S01":"G"}]} {"event":[{"CurrentTimeInCycle":*5*,"S01":"G"}]} ------------------------------------------------------------ Trigger (Because the CurrentTimeInCycle is less for the current event than the previous event) I am not sure if this is what is the expected behaviour of the windows. Is there anything which I can do to get my program working to the ideal case(I mentioned above). Thanks in anticipation!