This seems to be working fine in processing time but doesn't work in event
time. Is there an issue with the way the water mark is defined or do we
need to set up timers?

Please advise.


WORKING:

customerStream
        .keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
        .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
        .trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
        .process(new CustomAggregateFunction());


NOT WORKING:

customerStream
        .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new
WaterMarkAssigner()))
        .keyBy(Customer::getIdentifier)
        .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
        .trigger(EventTimeTrigger.create())
        .evictor(new CustomerEvictor())
        .process(new CustomAggregateFunction())
        .print();


On Thu, May 6, 2021 at 1:53 AM Sam <swagat....@gmail.com> wrote:

> Adding the code for CustomWatermarkGenerator
>
> .....
> @Override
> public void onEvent(Customer customer, long l, WatermarkOutput 
> watermarkOutput) {
>     currentMaxTimestamp = Math.max(currentMaxTimestamp, 
> customer.getEventTime()  );
> }
>
> @Override
> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>     watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
>
> }
> .....
>
>
> On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <swaga...@gmail.com> wrote:
>
>> Hi,
>>
>> Bit of background, I have a stream of customers who have purchased some
>> product, reading these transactions on a KAFKA topic. I want to aggregate
>> the number of products the customer has purchased in a particular duration
>> ( say 10 seconds ) and write to a sink.
>>
>> I am using session windows to achieve the above.
>>
>> For test purposes, i have mocked  up a customer stream and executed
>> session windows like below.
>>
>> StreamExecutionEnvironment environment = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStream<Customer> customerStream = environment.addSource( new 
>> CustomerGenerator() );
>>
>> customerStream
>>         
>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
>>         .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
>> WaterMarkAssigner()))
>>         .keyBy(Customer::getIdentifier)
>>         .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>>         .trigger(EventTimeTrigger.create())
>>         .evictor(new CustomerEvictor())
>>         .process(new CustomAggregateFunction())
>>         .print();
>>
>> My watermark assigner looks like:
>>
>> public class WaterMarkAssigner implements WatermarkStrategy<Customer> {
>>     static final Logger logger = 
>> LoggerFactory.getLogger(WaterMarkAssigner.class);
>>
>>     @Override
>>     public WatermarkGenerator<Customer> 
>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
>>         return new CustomWatermarkGenerator();
>>     }
>> }
>>
>> I notice that the evictor, and aggregation functions are getting called only 
>> once for the first customer in the stream.
>>
>> The data stream is generating customers at 1 seconds interval and there are 
>> 5 customer keys for which it's generating transactions.
>>
>> Am I doing something wrong with the above?
>>
>> I want to be able to capture the event on each transaction getting added and 
>> removed from the window so that I can perform the aggregation.
>>
>> please advise.
>>
>>
>>
>>
>>

Reply via email to