Your source is not setting the timestamp with collectWithTimestamp. I'm
assuming that nothing really moves from the event time's perspective.

On Thu, May 6, 2021 at 8:58 AM Swagat Mishra <swaga...@gmail.com> wrote:

> Yes customer generator is setting the event timestamp correctly like I see
> below. I debugged and found that the events are getting late, so never
> executed. i.e,. in the window operator the method  this.isWindowLate(
> actualWindow) is getting executed to false for the rest of the events
> except the first, hence the events are getting skipped, not able to figure
> out where exactly the issue is.
>
> i have removed evictot=r because I don't think I need it yet.
>
> stream looks like
>
> customerStream
>         .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
>         .keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
>         
> .window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5))
>         .trigger(new EventTimeTrigger())
>         .process(new CustomAggregateFunction());
>
>
> *Customer generator looks like:*
>
> while (isRunning) {
>     Customer c = new Customer(CUSTOMER_KEY[counter % 5],* LocalTime.now()*, 
> 1000); // that's the event time
>     System.out.println("Writing customer: " + c);
>     sourceContext.collect(c);
>     //sourceContext.emitWatermark(new Watermark(c.getEventTime()));
>     Thread.sleep(1000);
>     counter++;
>     if(counter % 11 == 0) {
>         System.out.println("Sleeping for 10 seconds");
>         Thread.sleep(10000);
>     }
> }
>
>
> Custom Watermark generator has this:
>
> .....
> @Override
> public void onEvent(Customer customer, long l, WatermarkOutput 
> watermarkOutput) {
>     currentMaxTimestamp = Math.max(currentMaxTimestamp, 
> customer.getEventTime()  );
> }
>
> @Override
> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>     watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
>
> }
> .....
>
> trigger looks like:
>
> ------
>
>
>  @Override
>     public TriggerResult onElement(Customer customer, long l, TimeWindow 
> timeWindow, TriggerContext triggerContext) throws Exception {
>         if (timeWindow.maxTimestamp() <= 
> triggerContext.getCurrentWatermark()) {
>             // if the watermark is already past the window fire immediately
>             return TriggerResult.FIRE;
>         } else {
>             LOGGER.info("Max timestamp for customer: " + 
> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
>             
> triggerContext.registerEventTimeTimer(customer.getLocalTime().plusSeconds(8).toNanoOfDay());
>             return TriggerResult.FIRE;
>         }
>     }
>
>     @Override
>     public TriggerResult onEventTime(long time, TimeWindow timeWindow, 
> TriggerContext triggerContext) {
> //        if (timeWindow.maxTimestamp() > 
> triggerContext.getCurrentWatermark()) {
> //            
> triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
> //            return TriggerResult.CONTINUE;
> //        }
>
>         return time == timeWindow.maxTimestamp() ?
>                 TriggerResult.FIRE :
>                 TriggerResult.CONTINUE;
>     }
>
>
> ....
>
>
> On Thu, May 6, 2021 at 12:02 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi,
>>
>> Is your CustomerGenerator setting the event timestamp correctly? Are your
>> evictors evicting too early?
>>
>> You can try to add some debug output into the watermark assigner and see
>> if it's indeed progressing as expected.
>>
>> On Thu, May 6, 2021 at 12:48 AM Swagat Mishra <swaga...@gmail.com> wrote:
>>
>>> This seems to be working fine in processing time but doesn't work in
>>> event time. Is there an issue with the way the water mark is defined or do
>>> we need to set up timers?
>>>
>>> Please advise.
>>>
>>>
>>> WORKING:
>>>
>>> customerStream
>>>         .keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
>>>         .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
>>>         .trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
>>>         .process(new CustomAggregateFunction());
>>>
>>>
>>> NOT WORKING:
>>>
>>> customerStream
>>>         .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
>>> WaterMarkAssigner()))
>>>         .keyBy(Customer::getIdentifier)
>>>         .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>>>         .trigger(EventTimeTrigger.create())
>>>         .evictor(new CustomerEvictor())
>>>         .process(new CustomAggregateFunction())
>>>         .print();
>>>
>>>
>>> On Thu, May 6, 2021 at 1:53 AM Sam <swagat....@gmail.com> wrote:
>>>
>>>> Adding the code for CustomWatermarkGenerator
>>>>
>>>> .....
>>>> @Override
>>>> public void onEvent(Customer customer, long l, WatermarkOutput 
>>>> watermarkOutput) {
>>>>     currentMaxTimestamp = Math.max(currentMaxTimestamp, 
>>>> customer.getEventTime()  );
>>>> }
>>>>
>>>> @Override
>>>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>>>>     watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
>>>>
>>>> }
>>>> .....
>>>>
>>>>
>>>> On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <swaga...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Bit of background, I have a stream of customers who have purchased
>>>>> some product, reading these transactions on a KAFKA topic. I want to
>>>>> aggregate the number of products the customer has purchased in a 
>>>>> particular
>>>>> duration  ( say 10 seconds ) and write to a sink.
>>>>>
>>>>> I am using session windows to achieve the above.
>>>>>
>>>>> For test purposes, i have mocked  up a customer stream and executed
>>>>> session windows like below.
>>>>>
>>>>> StreamExecutionEnvironment environment = 
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>> DataStream<Customer> customerStream = environment.addSource( new 
>>>>> CustomerGenerator() );
>>>>>
>>>>> customerStream
>>>>>         
>>>>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
>>>>>         .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
>>>>> WaterMarkAssigner()))
>>>>>         .keyBy(Customer::getIdentifier)
>>>>>         .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>>>>>         .trigger(EventTimeTrigger.create())
>>>>>         .evictor(new CustomerEvictor())
>>>>>         .process(new CustomAggregateFunction())
>>>>>         .print();
>>>>>
>>>>> My watermark assigner looks like:
>>>>>
>>>>> public class WaterMarkAssigner implements WatermarkStrategy<Customer> {
>>>>>     static final Logger logger = 
>>>>> LoggerFactory.getLogger(WaterMarkAssigner.class);
>>>>>
>>>>>     @Override
>>>>>     public WatermarkGenerator<Customer> 
>>>>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
>>>>>         return new CustomWatermarkGenerator();
>>>>>     }
>>>>> }
>>>>>
>>>>> I notice that the evictor, and aggregation functions are getting called 
>>>>> only once for the first customer in the stream.
>>>>>
>>>>> The data stream is generating customers at 1 seconds interval and there 
>>>>> are 5 customer keys for which it's generating transactions.
>>>>>
>>>>> Am I doing something wrong with the above?
>>>>>
>>>>> I want to be able to capture the event on each transaction getting added 
>>>>> and removed from the window so that I can perform the aggregation.
>>>>>
>>>>> please advise.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>

Reply via email to