Thank you.

sourceContext.collectWithTimestamp(c, c.getEventTime());

Adding this to the source context worked.

However I am still getting only one customer in the process method. i
would expect the iterable to provide all customers in the window. or
do i have to maintain state.


changes for reference:

I made the following change, also removed anly lag that i had
introduced for experimentation earlier.

so trigger looks like:


    @Override
    public TriggerResult onElement(Customer customer, long l,
TimeWindow timeWindow, TriggerContext triggerContext) throws Exception
{
        if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            //LOGGER.info("Max timestamp for customer: " +
customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
            triggerContext.registerEventTimeTimer(customer.getEventTime());
                   return TriggerResult.FIRE;

        }
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow timeWindow,
TriggerContext triggerContext) {
        return time == timeWindow.maxTimestamp() ?
                TriggerResult.FIRE :
                TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow
window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    @Override
    public boolean canMerge() {
        return true;
    }

and *removed latenness*

customerStream

        
//.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
        .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
        .keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
        .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
        //.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
        .trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
        .process(new CustomAggregateFunction());


On Thu, May 6, 2021 at 12:32 PM Arvid Heise <ar...@apache.org> wrote:

> Your source is not setting the timestamp with collectWithTimestamp. I'm
> assuming that nothing really moves from the event time's perspective.
>
> On Thu, May 6, 2021 at 8:58 AM Swagat Mishra <swaga...@gmail.com> wrote:
>
>> Yes customer generator is setting the event timestamp correctly like I
>> see below. I debugged and found that the events are getting late, so never
>> executed. i.e,. in the window operator the method  this.isWindowLate(
>> actualWindow) is getting executed to false for the rest of the events
>> except the first, hence the events are getting skipped, not able to figure
>> out where exactly the issue is.
>>
>> i have removed evictot=r because I don't think I need it yet.
>>
>> stream looks like
>>
>> customerStream
>>         .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
>>         .keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
>>         
>> .window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5))
>>         .trigger(new EventTimeTrigger())
>>         .process(new CustomAggregateFunction());
>>
>>
>> *Customer generator looks like:*
>>
>> while (isRunning) {
>>     Customer c = new Customer(CUSTOMER_KEY[counter % 5],* LocalTime.now()*, 
>> 1000); // that's the event time
>>     System.out.println("Writing customer: " + c);
>>     sourceContext.collect(c);
>>     //sourceContext.emitWatermark(new Watermark(c.getEventTime()));
>>     Thread.sleep(1000);
>>     counter++;
>>     if(counter % 11 == 0) {
>>         System.out.println("Sleeping for 10 seconds");
>>         Thread.sleep(10000);
>>     }
>> }
>>
>>
>> Custom Watermark generator has this:
>>
>> .....
>> @Override
>> public void onEvent(Customer customer, long l, WatermarkOutput 
>> watermarkOutput) {
>>     currentMaxTimestamp = Math.max(currentMaxTimestamp, 
>> customer.getEventTime()  );
>> }
>>
>> @Override
>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>>     watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
>>
>> }
>> .....
>>
>> trigger looks like:
>>
>> ------
>>
>>
>>  @Override
>>     public TriggerResult onElement(Customer customer, long l, TimeWindow 
>> timeWindow, TriggerContext triggerContext) throws Exception {
>>         if (timeWindow.maxTimestamp() <= 
>> triggerContext.getCurrentWatermark()) {
>>             // if the watermark is already past the window fire immediately
>>             return TriggerResult.FIRE;
>>         } else {
>>             LOGGER.info("Max timestamp for customer: " + 
>> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
>>             
>> triggerContext.registerEventTimeTimer(customer.getLocalTime().plusSeconds(8).toNanoOfDay());
>>             return TriggerResult.FIRE;
>>         }
>>     }
>>
>>     @Override
>>     public TriggerResult onEventTime(long time, TimeWindow timeWindow, 
>> TriggerContext triggerContext) {
>> //        if (timeWindow.maxTimestamp() > 
>> triggerContext.getCurrentWatermark()) {
>> //            
>> triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
>> //            return TriggerResult.CONTINUE;
>> //        }
>>
>>         return time == timeWindow.maxTimestamp() ?
>>                 TriggerResult.FIRE :
>>                 TriggerResult.CONTINUE;
>>     }
>>
>>
>> ....
>>
>>
>> On Thu, May 6, 2021 at 12:02 PM Arvid Heise <ar...@apache.org> wrote:
>>
>>> Hi,
>>>
>>> Is your CustomerGenerator setting the event timestamp correctly? Are
>>> your evictors evicting too early?
>>>
>>> You can try to add some debug output into the watermark assigner and see
>>> if it's indeed progressing as expected.
>>>
>>> On Thu, May 6, 2021 at 12:48 AM Swagat Mishra <swaga...@gmail.com>
>>> wrote:
>>>
>>>> This seems to be working fine in processing time but doesn't work in
>>>> event time. Is there an issue with the way the water mark is defined or do
>>>> we need to set up timers?
>>>>
>>>> Please advise.
>>>>
>>>>
>>>> WORKING:
>>>>
>>>> customerStream
>>>>         .keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
>>>>         .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
>>>>         .trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
>>>>         .process(new CustomAggregateFunction());
>>>>
>>>>
>>>> NOT WORKING:
>>>>
>>>> customerStream
>>>>         .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
>>>> WaterMarkAssigner()))
>>>>         .keyBy(Customer::getIdentifier)
>>>>         .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>>>>         .trigger(EventTimeTrigger.create())
>>>>         .evictor(new CustomerEvictor())
>>>>         .process(new CustomAggregateFunction())
>>>>         .print();
>>>>
>>>>
>>>> On Thu, May 6, 2021 at 1:53 AM Sam <swagat....@gmail.com> wrote:
>>>>
>>>>> Adding the code for CustomWatermarkGenerator
>>>>>
>>>>> .....
>>>>> @Override
>>>>> public void onEvent(Customer customer, long l, WatermarkOutput 
>>>>> watermarkOutput) {
>>>>>     currentMaxTimestamp = Math.max(currentMaxTimestamp, 
>>>>> customer.getEventTime()  );
>>>>> }
>>>>>
>>>>> @Override
>>>>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>>>>>     watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
>>>>>
>>>>> }
>>>>> .....
>>>>>
>>>>>
>>>>> On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <swaga...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Bit of background, I have a stream of customers who have purchased
>>>>>> some product, reading these transactions on a KAFKA topic. I want to
>>>>>> aggregate the number of products the customer has purchased in a 
>>>>>> particular
>>>>>> duration  ( say 10 seconds ) and write to a sink.
>>>>>>
>>>>>> I am using session windows to achieve the above.
>>>>>>
>>>>>> For test purposes, i have mocked  up a customer stream and executed
>>>>>> session windows like below.
>>>>>>
>>>>>> StreamExecutionEnvironment environment = 
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>> DataStream<Customer> customerStream = environment.addSource( new 
>>>>>> CustomerGenerator() );
>>>>>>
>>>>>> customerStream
>>>>>>         
>>>>>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
>>>>>>         
>>>>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
>>>>>> WaterMarkAssigner()))
>>>>>>         .keyBy(Customer::getIdentifier)
>>>>>>         .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>>>>>>         .trigger(EventTimeTrigger.create())
>>>>>>         .evictor(new CustomerEvictor())
>>>>>>         .process(new CustomAggregateFunction())
>>>>>>         .print();
>>>>>>
>>>>>> My watermark assigner looks like:
>>>>>>
>>>>>> public class WaterMarkAssigner implements WatermarkStrategy<Customer> {
>>>>>>     static final Logger logger = 
>>>>>> LoggerFactory.getLogger(WaterMarkAssigner.class);
>>>>>>
>>>>>>     @Override
>>>>>>     public WatermarkGenerator<Customer> 
>>>>>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
>>>>>>         return new CustomWatermarkGenerator();
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>> I notice that the evictor, and aggregation functions are getting called 
>>>>>> only once for the first customer in the stream.
>>>>>>
>>>>>> The data stream is generating customers at 1 seconds interval and there 
>>>>>> are 5 customer keys for which it's generating transactions.
>>>>>>
>>>>>> Am I doing something wrong with the above?
>>>>>>
>>>>>> I want to be able to capture the event on each transaction getting added 
>>>>>> and removed from the window so that I can perform the aggregation.
>>>>>>
>>>>>> please advise.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>

Reply via email to