Hi,

I needed some clarity on the behaviour of the windows for my use case.
I have defined my stream as follows:

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
*          env.setParallelism(1);*
        DataStream<String> live = env.addSource(new JsonTestSource());
        DataStream<FlatObject> jsonToTuple = live.flatMap(new Splitter());

        KeyedStream<FlatObject, String> keyStream =  jsonToTuple.keyBy(new
KeySelector<FlatObject,String>() {
        public String getKey(FlatObject value) throws Exception {
            return value.getIntersectionName();
        }
        });

        DataStream<FlatObject> flatStream =
keyStream.window(GlobalWindows.create())

.trigger(new WindowCustomTrigger())

.apply(new TrafficWindow());
        flatStream.print();

*For a given set of Json Objects(Ideal case): *

{"event":[{"CurrentTimeInCycle":*20*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*30*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*40*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*60*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*10*,"S01":"G"}]}
------------------------------------------------------------ Trigger
(Because the CurrentTimeInCycle is less for the current event than the
previous event)
{"event":[{"CurrentTimeInCycle":*20*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*30*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*40*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*60*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*5*,"S01":"G"}]}
------------------------------------------------------------ Trigger
(Because the CurrentTimeInCycle is less for the current event than the
previous event)

*In my current program, the output is as the following (All the objects
from the previous window are a part of the next window and it keeps on
adding up to the next): *

{"event":[{"CurrentTimeInCycle":*20*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*30*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*40*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*60*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*10*,"S01":"G"}]}
------------------------------------------------------------ Trigger
(Because the CurrentTimeInCycle is less for the current event than the
previous event)
{"event":[{"CurrentTimeInCycle":*20*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*30*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*40*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*60*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*10*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*20*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*30*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*40*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*60*,"S01":"G"}]}
{"event":[{"CurrentTimeInCycle":*5*,"S01":"G"}]}
------------------------------------------------------------ Trigger
(Because the CurrentTimeInCycle is less for the current event than the
previous event)


I am not sure if this is what is the expected behaviour of the windows. Is
there anything which I can do to get my program working to the ideal case(I
mentioned above).

Thanks in anticipation!

Reply via email to