Thank you.
sourceContext.collectWithTimestamp(c, c.getEventTime());
Adding this to the source context worked.
However I am still getting only one customer in the process method. i
would expect the iterable to provide all customers in the window. or
do i have to maintain state.
changes for reference:
I made the following change, also removed anly lag that i had
introduced for experimentation earlier.
so trigger looks like:
@Override
public TriggerResult onElement(Customer customer, long l,
TimeWindow timeWindow, TriggerContext triggerContext) throws Exception
{
if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
//LOGGER.info("Max timestamp for customer: " +
customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
triggerContext.registerEventTimeTimer(customer.getEventTime());
return TriggerResult.FIRE;
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow timeWindow,
TriggerContext triggerContext) {
return time == timeWindow.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow
window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
@Override
public boolean canMerge() {
return true;
}
and *removed latenness*
customerStream
//.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(20)))
//.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
.process(new CustomAggregateFunction());
On Thu, May 6, 2021 at 12:32 PM Arvid Heise <[email protected]> wrote:
> Your source is not setting the timestamp with collectWithTimestamp. I'm
> assuming that nothing really moves from the event time's perspective.
>
> On Thu, May 6, 2021 at 8:58 AM Swagat Mishra <[email protected]> wrote:
>
>> Yes customer generator is setting the event timestamp correctly like I
>> see below. I debugged and found that the events are getting late, so never
>> executed. i.e,. in the window operator the method this.isWindowLate(
>> actualWindow) is getting executed to false for the rest of the events
>> except the first, hence the events are getting skipped, not able to figure
>> out where exactly the issue is.
>>
>> i have removed evictot=r because I don't think I need it yet.
>>
>> stream looks like
>>
>> customerStream
>> .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
>> .keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
>>
>> .window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5))
>> .trigger(new EventTimeTrigger())
>> .process(new CustomAggregateFunction());
>>
>>
>> *Customer generator looks like:*
>>
>> while (isRunning) {
>> Customer c = new Customer(CUSTOMER_KEY[counter % 5],* LocalTime.now()*,
>> 1000); // that's the event time
>> System.out.println("Writing customer: " + c);
>> sourceContext.collect(c);
>> //sourceContext.emitWatermark(new Watermark(c.getEventTime()));
>> Thread.sleep(1000);
>> counter++;
>> if(counter % 11 == 0) {
>> System.out.println("Sleeping for 10 seconds");
>> Thread.sleep(10000);
>> }
>> }
>>
>>
>> Custom Watermark generator has this:
>>
>> .....
>> @Override
>> public void onEvent(Customer customer, long l, WatermarkOutput
>> watermarkOutput) {
>> currentMaxTimestamp = Math.max(currentMaxTimestamp,
>> customer.getEventTime() );
>> }
>>
>> @Override
>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>> watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
>>
>> }
>> .....
>>
>> trigger looks like:
>>
>> ------
>>
>>
>> @Override
>> public TriggerResult onElement(Customer customer, long l, TimeWindow
>> timeWindow, TriggerContext triggerContext) throws Exception {
>> if (timeWindow.maxTimestamp() <=
>> triggerContext.getCurrentWatermark()) {
>> // if the watermark is already past the window fire immediately
>> return TriggerResult.FIRE;
>> } else {
>> LOGGER.info("Max timestamp for customer: " +
>> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
>>
>> triggerContext.registerEventTimeTimer(customer.getLocalTime().plusSeconds(8).toNanoOfDay());
>> return TriggerResult.FIRE;
>> }
>> }
>>
>> @Override
>> public TriggerResult onEventTime(long time, TimeWindow timeWindow,
>> TriggerContext triggerContext) {
>> // if (timeWindow.maxTimestamp() >
>> triggerContext.getCurrentWatermark()) {
>> //
>> triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
>> // return TriggerResult.CONTINUE;
>> // }
>>
>> return time == timeWindow.maxTimestamp() ?
>> TriggerResult.FIRE :
>> TriggerResult.CONTINUE;
>> }
>>
>>
>> ....
>>
>>
>> On Thu, May 6, 2021 at 12:02 PM Arvid Heise <[email protected]> wrote:
>>
>>> Hi,
>>>
>>> Is your CustomerGenerator setting the event timestamp correctly? Are
>>> your evictors evicting too early?
>>>
>>> You can try to add some debug output into the watermark assigner and see
>>> if it's indeed progressing as expected.
>>>
>>> On Thu, May 6, 2021 at 12:48 AM Swagat Mishra <[email protected]>
>>> wrote:
>>>
>>>> This seems to be working fine in processing time but doesn't work in
>>>> event time. Is there an issue with the way the water mark is defined or do
>>>> we need to set up timers?
>>>>
>>>> Please advise.
>>>>
>>>>
>>>> WORKING:
>>>>
>>>> customerStream
>>>> .keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
>>>> .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
>>>> .trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
>>>> .process(new CustomAggregateFunction());
>>>>
>>>>
>>>> NOT WORKING:
>>>>
>>>> customerStream
>>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new
>>>> WaterMarkAssigner()))
>>>> .keyBy(Customer::getIdentifier)
>>>> .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>>>> .trigger(EventTimeTrigger.create())
>>>> .evictor(new CustomerEvictor())
>>>> .process(new CustomAggregateFunction())
>>>> .print();
>>>>
>>>>
>>>> On Thu, May 6, 2021 at 1:53 AM Sam <[email protected]> wrote:
>>>>
>>>>> Adding the code for CustomWatermarkGenerator
>>>>>
>>>>> .....
>>>>> @Override
>>>>> public void onEvent(Customer customer, long l, WatermarkOutput
>>>>> watermarkOutput) {
>>>>> currentMaxTimestamp = Math.max(currentMaxTimestamp,
>>>>> customer.getEventTime() );
>>>>> }
>>>>>
>>>>> @Override
>>>>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>>>>> watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
>>>>>
>>>>> }
>>>>> .....
>>>>>
>>>>>
>>>>> On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Bit of background, I have a stream of customers who have purchased
>>>>>> some product, reading these transactions on a KAFKA topic. I want to
>>>>>> aggregate the number of products the customer has purchased in a
>>>>>> particular
>>>>>> duration ( say 10 seconds ) and write to a sink.
>>>>>>
>>>>>> I am using session windows to achieve the above.
>>>>>>
>>>>>> For test purposes, i have mocked up a customer stream and executed
>>>>>> session windows like below.
>>>>>>
>>>>>> StreamExecutionEnvironment environment =
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>> DataStream<Customer> customerStream = environment.addSource( new
>>>>>> CustomerGenerator() );
>>>>>>
>>>>>> customerStream
>>>>>>
>>>>>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
>>>>>>
>>>>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new
>>>>>> WaterMarkAssigner()))
>>>>>> .keyBy(Customer::getIdentifier)
>>>>>> .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>>>>>> .trigger(EventTimeTrigger.create())
>>>>>> .evictor(new CustomerEvictor())
>>>>>> .process(new CustomAggregateFunction())
>>>>>> .print();
>>>>>>
>>>>>> My watermark assigner looks like:
>>>>>>
>>>>>> public class WaterMarkAssigner implements WatermarkStrategy<Customer> {
>>>>>> static final Logger logger =
>>>>>> LoggerFactory.getLogger(WaterMarkAssigner.class);
>>>>>>
>>>>>> @Override
>>>>>> public WatermarkGenerator<Customer>
>>>>>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
>>>>>> return new CustomWatermarkGenerator();
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> I notice that the evictor, and aggregation functions are getting called
>>>>>> only once for the first customer in the stream.
>>>>>>
>>>>>> The data stream is generating customers at 1 seconds interval and there
>>>>>> are 5 customer keys for which it's generating transactions.
>>>>>>
>>>>>> Am I doing something wrong with the above?
>>>>>>
>>>>>> I want to be able to capture the event on each transaction getting added
>>>>>> and removed from the window so that I can perform the aggregation.
>>>>>>
>>>>>> please advise.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>