I am able to maintain a list state in process function and aggregate the
values, how do i get a notification/event to remove the value from the
stored list state.

On Thu, May 6, 2021 at 8:47 PM Swagat Mishra <[email protected]> wrote:

> I meant "Do you recommend the state to be maintained in* Value** State *or
> external store like elastic?"
>
> On Thu, May 6, 2021 at 8:46 PM Swagat Mishra <[email protected]> wrote:
>
>> I want to aggregate the user activity e.g number of products the user has
>> purchased in the last 1 hour.
>>
>> so - User A (ID = USER-A)  purchases a1 product at 10:30 and another
>> product at 10:45 AM and another product at 1:30 AM.
>>
>> My API should give 2 products purchased if the API call happens at 11:29
>> AM (10:30 , 10:45) and 1 product if the API call happens at 1:45 AM
>>
>> The API will access data persisted from the flink streaming output.
>>
>> As of now I am doing keyby on (ID = USER-A) .
>>
>> Do I have to maintain my own own calculated state within the process
>> window function. Is the process window function shared across all keys or
>> one instance per key.  Do you recommend the state to be maintained in State
>> or elastic?
>>
>> Also, if I change the processing to processing time instead of event
>> time, the aggregation is happening. Any reason why flink could not provide
>> event time aggregations like the processing time aggregation.
>>
>>
>>
>> On Thu, May 6, 2021 at 7:11 PM Arvid Heise <[email protected]> wrote:
>>
>>> I'm not sure what you want to achieve exactly.
>>>
>>> You can always keyby the values by a constant pseudo-key such that all
>>> values will be in the same partition (so instead of using global but with
>>> the same effect). Then you can use a process function to maintain the
>>> state. Just make sure that your data volume is low enough as this part is
>>> not parallelizable by definition.
>>>
>>> On Thu, May 6, 2021 at 10:09 AM Swagat Mishra <[email protected]>
>>> wrote:
>>>
>>>> thank you
>>>>
>>>> i wil have a look at datasteeam.global
>>>>
>>>> is there any other way to maintain state like by using valuestate.
>>>>
>>>>
>>>> On Thu, 6 May 2021 at 1:26 PM, Arvid Heise <[email protected]> wrote:
>>>>
>>>>> If you keyby then all direct functions see only the elements with the
>>>>> same key. So that's the expected behavior and the base of Flink's parallel
>>>>> processing capabilities.
>>>>>
>>>>> If you want to generate a window over all customers, you have to use a
>>>>> global window. However, that also means that no parallelization can 
>>>>> happen,
>>>>> so I'd discourage that.
>>>>>
>>>>> A better way would be to perform as many calculations as possible in
>>>>> the process function (for example create a customer with buy information
>>>>> record) and then have a DataStream#global() reshuffle to collect all
>>>>> aggregated information on one node.
>>>>>
>>>>> On Thu, May 6, 2021 at 9:20 AM Swagat Mishra <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Thank you.
>>>>>>
>>>>>> sourceContext.collectWithTimestamp(c, c.getEventTime());
>>>>>>
>>>>>> Adding this to the source context worked.
>>>>>>
>>>>>> However I am still getting only one customer in the process method. i 
>>>>>> would expect the iterable to provide all customers in the window. or do 
>>>>>> i have to maintain state.
>>>>>>
>>>>>>
>>>>>> changes for reference:
>>>>>>
>>>>>> I made the following change, also removed anly lag that i had introduced 
>>>>>> for experimentation earlier.
>>>>>>
>>>>>> so trigger looks like:
>>>>>>
>>>>>>
>>>>>>     @Override
>>>>>>     public TriggerResult onElement(Customer customer, long l, TimeWindow 
>>>>>> timeWindow, TriggerContext triggerContext) throws Exception {
>>>>>>         if (timeWindow.maxTimestamp() <= 
>>>>>> triggerContext.getCurrentWatermark()) {
>>>>>>             // if the watermark is already past the window fire 
>>>>>> immediately
>>>>>>             return TriggerResult.FIRE;
>>>>>>         } else {
>>>>>>             //LOGGER.info("Max timestamp for customer: " + 
>>>>>> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
>>>>>>             
>>>>>> triggerContext.registerEventTimeTimer(customer.getEventTime());          
>>>>>>           return TriggerResult.FIRE;
>>>>>>
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public TriggerResult onEventTime(long time, TimeWindow timeWindow, 
>>>>>> TriggerContext triggerContext) {
>>>>>>         return time == timeWindow.maxTimestamp() ?
>>>>>>                 TriggerResult.FIRE :
>>>>>>                 TriggerResult.CONTINUE;
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public TriggerResult onProcessingTime(long time, TimeWindow window, 
>>>>>> TriggerContext ctx) throws Exception {
>>>>>>         return TriggerResult.CONTINUE;
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public void clear(TimeWindow window, TriggerContext ctx) throws 
>>>>>> Exception {
>>>>>>         ctx.deleteEventTimeTimer(window.maxTimestamp());
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public boolean canMerge() {
>>>>>>         return true;
>>>>>>     }
>>>>>>
>>>>>> and *removed latenness*
>>>>>>
>>>>>> customerStream
>>>>>>
>>>>>>         
>>>>>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
>>>>>>         .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
>>>>>>         .keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
>>>>>>         .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
>>>>>>         //.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
>>>>>>         .trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
>>>>>>         .process(new CustomAggregateFunction());
>>>>>>
>>>>>>
>>>>>> On Thu, May 6, 2021 at 12:32 PM Arvid Heise <[email protected]> wrote:
>>>>>>
>>>>>>> Your source is not setting the timestamp with collectWithTimestamp.
>>>>>>> I'm assuming that nothing really moves from the event time's 
>>>>>>> perspective.
>>>>>>>
>>>>>>> On Thu, May 6, 2021 at 8:58 AM Swagat Mishra <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Yes customer generator is setting the event timestamp correctly
>>>>>>>> like I see below. I debugged and found that the events are getting 
>>>>>>>> late, so
>>>>>>>> never executed. i.e,. in the window operator the method  this
>>>>>>>> .isWindowLate(actualWindow) is getting executed to false for the
>>>>>>>> rest of the events except the first, hence the events are getting 
>>>>>>>> skipped,
>>>>>>>> not able to figure out where exactly the issue is.
>>>>>>>>
>>>>>>>> i have removed evictot=r because I don't think I need it yet.
>>>>>>>>
>>>>>>>> stream looks like
>>>>>>>>
>>>>>>>> customerStream
>>>>>>>>         .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
>>>>>>>>         .keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
>>>>>>>>         
>>>>>>>> .window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5))
>>>>>>>>         .trigger(new EventTimeTrigger())
>>>>>>>>         .process(new CustomAggregateFunction());
>>>>>>>>
>>>>>>>>
>>>>>>>> *Customer generator looks like:*
>>>>>>>>
>>>>>>>> while (isRunning) {
>>>>>>>>     Customer c = new Customer(CUSTOMER_KEY[counter % 5],* 
>>>>>>>> LocalTime.now()*, 1000); // that's the event time
>>>>>>>>     System.out.println("Writing customer: " + c);
>>>>>>>>     sourceContext.collect(c);
>>>>>>>>     //sourceContext.emitWatermark(new Watermark(c.getEventTime()));
>>>>>>>>     Thread.sleep(1000);
>>>>>>>>     counter++;
>>>>>>>>     if(counter % 11 == 0) {
>>>>>>>>         System.out.println("Sleeping for 10 seconds");
>>>>>>>>         Thread.sleep(10000);
>>>>>>>>     }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> Custom Watermark generator has this:
>>>>>>>>
>>>>>>>> .....
>>>>>>>> @Override
>>>>>>>> public void onEvent(Customer customer, long l, WatermarkOutput 
>>>>>>>> watermarkOutput) {
>>>>>>>>     currentMaxTimestamp = Math.max(currentMaxTimestamp, 
>>>>>>>> customer.getEventTime()  );
>>>>>>>> }
>>>>>>>>
>>>>>>>> @Override
>>>>>>>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>>>>>>>>     watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
>>>>>>>>
>>>>>>>> }
>>>>>>>> .....
>>>>>>>>
>>>>>>>> trigger looks like:
>>>>>>>>
>>>>>>>> ------
>>>>>>>>
>>>>>>>>
>>>>>>>>  @Override
>>>>>>>>     public TriggerResult onElement(Customer customer, long l, 
>>>>>>>> TimeWindow timeWindow, TriggerContext triggerContext) throws Exception 
>>>>>>>> {
>>>>>>>>         if (timeWindow.maxTimestamp() <= 
>>>>>>>> triggerContext.getCurrentWatermark()) {
>>>>>>>>             // if the watermark is already past the window fire 
>>>>>>>> immediately
>>>>>>>>             return TriggerResult.FIRE;
>>>>>>>>         } else {
>>>>>>>>             LOGGER.info("Max timestamp for customer: " + 
>>>>>>>> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
>>>>>>>>             
>>>>>>>> triggerContext.registerEventTimeTimer(customer.getLocalTime().plusSeconds(8).toNanoOfDay());
>>>>>>>>             return TriggerResult.FIRE;
>>>>>>>>         }
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public TriggerResult onEventTime(long time, TimeWindow timeWindow, 
>>>>>>>> TriggerContext triggerContext) {
>>>>>>>> //        if (timeWindow.maxTimestamp() > 
>>>>>>>> triggerContext.getCurrentWatermark()) {
>>>>>>>> //            
>>>>>>>> triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
>>>>>>>> //            return TriggerResult.CONTINUE;
>>>>>>>> //        }
>>>>>>>>
>>>>>>>>         return time == timeWindow.maxTimestamp() ?
>>>>>>>>                 TriggerResult.FIRE :
>>>>>>>>                 TriggerResult.CONTINUE;
>>>>>>>>     }
>>>>>>>>
>>>>>>>>
>>>>>>>> ....
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, May 6, 2021 at 12:02 PM Arvid Heise <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Is your CustomerGenerator setting the event timestamp correctly?
>>>>>>>>> Are your evictors evicting too early?
>>>>>>>>>
>>>>>>>>> You can try to add some debug output into the watermark assigner
>>>>>>>>> and see if it's indeed progressing as expected.
>>>>>>>>>
>>>>>>>>> On Thu, May 6, 2021 at 12:48 AM Swagat Mishra <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> This seems to be working fine in processing time but doesn't work
>>>>>>>>>> in event time. Is there an issue with the way the water mark is 
>>>>>>>>>> defined or
>>>>>>>>>> do we need to set up timers?
>>>>>>>>>>
>>>>>>>>>> Please advise.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> WORKING:
>>>>>>>>>>
>>>>>>>>>> customerStream
>>>>>>>>>>         .keyBy((KeySelector<Customer, String>) 
>>>>>>>>>> Customer::getIdentifier)
>>>>>>>>>>         
>>>>>>>>>> .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
>>>>>>>>>>         .trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
>>>>>>>>>>         .process(new CustomAggregateFunction());
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> NOT WORKING:
>>>>>>>>>>
>>>>>>>>>> customerStream
>>>>>>>>>>         
>>>>>>>>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
>>>>>>>>>> WaterMarkAssigner()))
>>>>>>>>>>         .keyBy(Customer::getIdentifier)
>>>>>>>>>>         .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>>>>>>>>>>         .trigger(EventTimeTrigger.create())
>>>>>>>>>>         .evictor(new CustomerEvictor())
>>>>>>>>>>         .process(new CustomAggregateFunction())
>>>>>>>>>>         .print();
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, May 6, 2021 at 1:53 AM Sam <[email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Adding the code for CustomWatermarkGenerator
>>>>>>>>>>>
>>>>>>>>>>> .....
>>>>>>>>>>> @Override
>>>>>>>>>>> public void onEvent(Customer customer, long l, WatermarkOutput 
>>>>>>>>>>> watermarkOutput) {
>>>>>>>>>>>     currentMaxTimestamp = Math.max(currentMaxTimestamp, 
>>>>>>>>>>> customer.getEventTime()  );
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> @Override
>>>>>>>>>>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>>>>>>>>>>>     watermarkOutput.emitWatermark(new 
>>>>>>>>>>> Watermark(currentMaxTimestamp));
>>>>>>>>>>>
>>>>>>>>>>> }
>>>>>>>>>>> .....
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> Bit of background, I have a stream of customers who have
>>>>>>>>>>>> purchased some product, reading these transactions on a KAFKA 
>>>>>>>>>>>> topic. I want
>>>>>>>>>>>> to aggregate the number of products the customer has purchased in a
>>>>>>>>>>>> particular duration  ( say 10 seconds ) and write to a sink.
>>>>>>>>>>>>
>>>>>>>>>>>> I am using session windows to achieve the above.
>>>>>>>>>>>>
>>>>>>>>>>>> For test purposes, i have mocked  up a customer stream and
>>>>>>>>>>>> executed session windows like below.
>>>>>>>>>>>>
>>>>>>>>>>>> StreamExecutionEnvironment environment = 
>>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>>> DataStream<Customer> customerStream = environment.addSource( new 
>>>>>>>>>>>> CustomerGenerator() );
>>>>>>>>>>>>
>>>>>>>>>>>> customerStream
>>>>>>>>>>>>         
>>>>>>>>>>>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
>>>>>>>>>>>>         
>>>>>>>>>>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
>>>>>>>>>>>> WaterMarkAssigner()))
>>>>>>>>>>>>         .keyBy(Customer::getIdentifier)
>>>>>>>>>>>>         .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>>>>>>>>>>>>         .trigger(EventTimeTrigger.create())
>>>>>>>>>>>>         .evictor(new CustomerEvictor())
>>>>>>>>>>>>         .process(new CustomAggregateFunction())
>>>>>>>>>>>>         .print();
>>>>>>>>>>>>
>>>>>>>>>>>> My watermark assigner looks like:
>>>>>>>>>>>>
>>>>>>>>>>>> public class WaterMarkAssigner implements 
>>>>>>>>>>>> WatermarkStrategy<Customer> {
>>>>>>>>>>>>     static final Logger logger = 
>>>>>>>>>>>> LoggerFactory.getLogger(WaterMarkAssigner.class);
>>>>>>>>>>>>
>>>>>>>>>>>>     @Override
>>>>>>>>>>>>     public WatermarkGenerator<Customer> 
>>>>>>>>>>>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context 
>>>>>>>>>>>> context) {
>>>>>>>>>>>>         return new CustomWatermarkGenerator();
>>>>>>>>>>>>     }
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> I notice that the evictor, and aggregation functions are getting 
>>>>>>>>>>>> called only once for the first customer in the stream.
>>>>>>>>>>>>
>>>>>>>>>>>> The data stream is generating customers at 1 seconds interval and 
>>>>>>>>>>>> there are 5 customer keys for which it's generating transactions.
>>>>>>>>>>>>
>>>>>>>>>>>> Am I doing something wrong with the above?
>>>>>>>>>>>>
>>>>>>>>>>>> I want to be able to capture the event on each transaction getting 
>>>>>>>>>>>> added and removed from the window so that I can perform the 
>>>>>>>>>>>> aggregation.
>>>>>>>>>>>>
>>>>>>>>>>>> please advise.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to