After some work experience with the current solution I want to give some feedback and maybe start a discussion about event time in streaming. This is not about watermarks or any of the incoming improvements just some observations from the current code.
*Starttime for EventTime:* In the current implementation you can specify a start time if you don't it defaults to 0. The default is not feasible when using the typical milliseconds since 1970. The *TimeTriggerPolicy* has the following implementation of *preNotifyTrigger*: @Override > public synchronized Object[] preNotifyTrigger(DATA datapoint) { > LinkedList<Object> fakeElements = new LinkedList<Object>(); > // check if there is more then one window border missed > // use > here. In case >= would fit, the regular call will do the job. > while (timestampWrapper.getTimestamp(datapoint) >= startTime + granularity) > { > startTime += granularity; > fakeElements.add(startTime - 1); > } > return (Object[]) fakeElements.toArray(); > } In practice this means using the default starttime will crash the program (running our of memory) since it will create fake elements to close every possible window since 1970. So you need to set a starttime to make it run which is not that simple. In production you could use the systemtime to initialize, but this might lead to some problems when consuming events from e.g. Kafka with an older timestamp. When debugging using old streams you need to know the lowest timestamp of the stream to initialize. What is the purpose of the fake elements? Is there a way to avoid the memory problem of creating enormous amounts of empty windows? Could we just use the timestamp of the first event processed as starttime instead of having it as a parameter? I testing the following modification of the above code at the moment, do you see any problem with that? @Override > public synchronized Object[] preNotifyTrigger(DATA datapoint) { > LinkedList<Object> fakeElements = new LinkedList<Object>(); > // check if there is more then one window border missed > // use > here. In case >= would fit, the regular call will do the job. > // TODO modified here > if(startTime == 0) startTime = timestampWrapper.getTimestamp(datapoint); > while (timestampWrapper.getTimestamp(datapoint) >= startTime + granularity) > { > startTime += granularity; > fakeElements.add(startTime - 1); > } > return (Object[]) fakeElements.toArray(); > } *EventTime api confusion:* I found several ways to use EventTime in my program but I find them not very intuitive. Compare the two following lines of code both using the Time.of helper one with event time and one with system time: ds.window(Time.of(long windowSize, TimeUnit)) ds.window(Time.of(long windowSize, Timestamp yourTimeStampExtractor, long startTime)) Its weird that you cannot specify the TimeUnit when using the EventTimes stamp. It would feel more natural if it would look like this (also without the starttime): ds.window(Time.of(long windowSize, TimeUnit, Timestamp yourTimeStampExtractor)) At the moment I'm using the modified TimeTriggerPolicy direct leading to this ugly piece of code: .window(new TimeTriggerPolicyHack<DataPojo>(100000l, new TimestampWrapper<DataPojo>(new EventTimeStampExtractor(), 0l)), new TimeEvictionPolicy<DataPojo>(20000, new TimestampWrapper<DataPojo>(new EventTimeStampExtractor(), 0l))) cheers Martin