Yes customer generator is setting the event timestamp correctly like I see
below. I debugged and found that the events are getting late, so never
executed. i.e,. in the window operator the method  this.isWindowLate(
actualWindow) is getting executed to false for the rest of the events
except the first, hence the events are getting skipped, not able to figure
out where exactly the issue is.

i have removed evictot=r because I don't think I need it yet.

stream looks like

        .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
        .keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
        .trigger(new EventTimeTrigger())
        .process(new CustomAggregateFunction());

*Customer generator looks like:*

while (isRunning) {
    Customer c = new Customer(CUSTOMER_KEY[counter % 5],**, 1000); // that's the event time
    System.out.println("Writing customer: " + c);
    //sourceContext.emitWatermark(new Watermark(c.getEventTime()));
    if(counter % 11 == 0) {
        System.out.println("Sleeping for 10 seconds");

Custom Watermark generator has this:

public void onEvent(Customer customer, long l, WatermarkOutput
watermarkOutput) {
    currentMaxTimestamp = Math.max(currentMaxTimestamp,
customer.getEventTime()  );

public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
    watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));


trigger looks like:


    public TriggerResult onElement(Customer customer, long l,
TimeWindow timeWindow, TriggerContext triggerContext) throws Exception
        if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
  "Max timestamp for customer: " +
customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
            return TriggerResult.FIRE;

    public TriggerResult onEventTime(long time, TimeWindow timeWindow,
TriggerContext triggerContext) {
//        if (timeWindow.maxTimestamp() >
triggerContext.getCurrentWatermark()) {
//            triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
//            return TriggerResult.CONTINUE;
//        }

        return time == timeWindow.maxTimestamp() ?
                TriggerResult.FIRE :


On Thu, May 6, 2021 at 12:02 PM Arvid Heise <> wrote:

> Hi,
> Is your CustomerGenerator setting the event timestamp correctly? Are your
> evictors evicting too early?
> You can try to add some debug output into the watermark assigner and see
> if it's indeed progressing as expected.
> On Thu, May 6, 2021 at 12:48 AM Swagat Mishra <> wrote:
>> This seems to be working fine in processing time but doesn't work in
>> event time. Is there an issue with the way the water mark is defined or do
>> we need to set up timers?
>> Please advise.
>> customerStream
>>         .keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
>>         .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
>>         .trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
>>         .process(new CustomAggregateFunction());
>> customerStream
>>         .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
>> WaterMarkAssigner()))
>>         .keyBy(Customer::getIdentifier)
>>         .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>>         .trigger(EventTimeTrigger.create())
>>         .evictor(new CustomerEvictor())
>>         .process(new CustomAggregateFunction())
>>         .print();
>> On Thu, May 6, 2021 at 1:53 AM Sam <> wrote:
>>> Adding the code for CustomWatermarkGenerator
>>> .....
>>> @Override
>>> public void onEvent(Customer customer, long l, WatermarkOutput 
>>> watermarkOutput) {
>>>     currentMaxTimestamp = Math.max(currentMaxTimestamp, 
>>> customer.getEventTime()  );
>>> }
>>> @Override
>>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>>>     watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
>>> }
>>> .....
>>> On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <> wrote:
>>>> Hi,
>>>> Bit of background, I have a stream of customers who have purchased some
>>>> product, reading these transactions on a KAFKA topic. I want to aggregate
>>>> the number of products the customer has purchased in a particular duration
>>>> ( say 10 seconds ) and write to a sink.
>>>> I am using session windows to achieve the above.
>>>> For test purposes, i have mocked  up a customer stream and executed
>>>> session windows like below.
>>>> StreamExecutionEnvironment environment = 
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>> DataStream<Customer> customerStream = environment.addSource( new 
>>>> CustomerGenerator() );
>>>> customerStream
>>>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
>>>>         .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
>>>> WaterMarkAssigner()))
>>>>         .keyBy(Customer::getIdentifier)
>>>>         .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>>>>         .trigger(EventTimeTrigger.create())
>>>>         .evictor(new CustomerEvictor())
>>>>         .process(new CustomAggregateFunction())
>>>>         .print();
>>>> My watermark assigner looks like:
>>>> public class WaterMarkAssigner implements WatermarkStrategy<Customer> {
>>>>     static final Logger logger = 
>>>> LoggerFactory.getLogger(WaterMarkAssigner.class);
>>>>     @Override
>>>>     public WatermarkGenerator<Customer> 
>>>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
>>>>         return new CustomWatermarkGenerator();
>>>>     }
>>>> }
>>>> I notice that the evictor, and aggregation functions are getting called 
>>>> only once for the first customer in the stream.
>>>> The data stream is generating customers at 1 seconds interval and there 
>>>> are 5 customer keys for which it's generating transactions.
>>>> Am I doing something wrong with the above?
>>>> I want to be able to capture the event on each transaction getting added 
>>>> and removed from the window so that I can perform the aggregation.
>>>> please advise.

Reply via email to