I am able to maintain a list state in process function and aggregate the values, how do i get a notification/event to remove the value from the stored list state.
On Thu, May 6, 2021 at 8:47 PM Swagat Mishra <[email protected]> wrote: > I meant "Do you recommend the state to be maintained in* Value** State *or > external store like elastic?" > > On Thu, May 6, 2021 at 8:46 PM Swagat Mishra <[email protected]> wrote: > >> I want to aggregate the user activity e.g number of products the user has >> purchased in the last 1 hour. >> >> so - User A (ID = USER-A) purchases a1 product at 10:30 and another >> product at 10:45 AM and another product at 1:30 AM. >> >> My API should give 2 products purchased if the API call happens at 11:29 >> AM (10:30 , 10:45) and 1 product if the API call happens at 1:45 AM >> >> The API will access data persisted from the flink streaming output. >> >> As of now I am doing keyby on (ID = USER-A) . >> >> Do I have to maintain my own own calculated state within the process >> window function. Is the process window function shared across all keys or >> one instance per key. Do you recommend the state to be maintained in State >> or elastic? >> >> Also, if I change the processing to processing time instead of event >> time, the aggregation is happening. Any reason why flink could not provide >> event time aggregations like the processing time aggregation. >> >> >> >> On Thu, May 6, 2021 at 7:11 PM Arvid Heise <[email protected]> wrote: >> >>> I'm not sure what you want to achieve exactly. >>> >>> You can always keyby the values by a constant pseudo-key such that all >>> values will be in the same partition (so instead of using global but with >>> the same effect). Then you can use a process function to maintain the >>> state. Just make sure that your data volume is low enough as this part is >>> not parallelizable by definition. >>> >>> On Thu, May 6, 2021 at 10:09 AM Swagat Mishra <[email protected]> >>> wrote: >>> >>>> thank you >>>> >>>> i wil have a look at datasteeam.global >>>> >>>> is there any other way to maintain state like by using valuestate. >>>> >>>> >>>> On Thu, 6 May 2021 at 1:26 PM, Arvid Heise <[email protected]> wrote: >>>> >>>>> If you keyby then all direct functions see only the elements with the >>>>> same key. So that's the expected behavior and the base of Flink's parallel >>>>> processing capabilities. >>>>> >>>>> If you want to generate a window over all customers, you have to use a >>>>> global window. However, that also means that no parallelization can >>>>> happen, >>>>> so I'd discourage that. >>>>> >>>>> A better way would be to perform as many calculations as possible in >>>>> the process function (for example create a customer with buy information >>>>> record) and then have a DataStream#global() reshuffle to collect all >>>>> aggregated information on one node. >>>>> >>>>> On Thu, May 6, 2021 at 9:20 AM Swagat Mishra <[email protected]> >>>>> wrote: >>>>> >>>>>> Thank you. >>>>>> >>>>>> sourceContext.collectWithTimestamp(c, c.getEventTime()); >>>>>> >>>>>> Adding this to the source context worked. >>>>>> >>>>>> However I am still getting only one customer in the process method. i >>>>>> would expect the iterable to provide all customers in the window. or do >>>>>> i have to maintain state. >>>>>> >>>>>> >>>>>> changes for reference: >>>>>> >>>>>> I made the following change, also removed anly lag that i had introduced >>>>>> for experimentation earlier. >>>>>> >>>>>> so trigger looks like: >>>>>> >>>>>> >>>>>> @Override >>>>>> public TriggerResult onElement(Customer customer, long l, TimeWindow >>>>>> timeWindow, TriggerContext triggerContext) throws Exception { >>>>>> if (timeWindow.maxTimestamp() <= >>>>>> triggerContext.getCurrentWatermark()) { >>>>>> // if the watermark is already past the window fire >>>>>> immediately >>>>>> return TriggerResult.FIRE; >>>>>> } else { >>>>>> //LOGGER.info("Max timestamp for customer: " + >>>>>> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp()); >>>>>> >>>>>> triggerContext.registerEventTimeTimer(customer.getEventTime()); >>>>>> return TriggerResult.FIRE; >>>>>> >>>>>> } >>>>>> } >>>>>> >>>>>> @Override >>>>>> public TriggerResult onEventTime(long time, TimeWindow timeWindow, >>>>>> TriggerContext triggerContext) { >>>>>> return time == timeWindow.maxTimestamp() ? >>>>>> TriggerResult.FIRE : >>>>>> TriggerResult.CONTINUE; >>>>>> } >>>>>> >>>>>> @Override >>>>>> public TriggerResult onProcessingTime(long time, TimeWindow window, >>>>>> TriggerContext ctx) throws Exception { >>>>>> return TriggerResult.CONTINUE; >>>>>> } >>>>>> >>>>>> @Override >>>>>> public void clear(TimeWindow window, TriggerContext ctx) throws >>>>>> Exception { >>>>>> ctx.deleteEventTimeTimer(window.maxTimestamp()); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public boolean canMerge() { >>>>>> return true; >>>>>> } >>>>>> >>>>>> and *removed latenness* >>>>>> >>>>>> customerStream >>>>>> >>>>>> >>>>>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7))) >>>>>> .assignTimestampsAndWatermarks(new CustomWatermarkStrategy()) >>>>>> .keyBy((KeySelector<Customer, String>) Customer::getIdentifier) >>>>>> .window(EventTimeSessionWindows.withGap(Time.seconds(20))) >>>>>> //.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) >>>>>> .trigger(new EventTimeTrigger()) //EventTimeTrigger.create()) >>>>>> .process(new CustomAggregateFunction()); >>>>>> >>>>>> >>>>>> On Thu, May 6, 2021 at 12:32 PM Arvid Heise <[email protected]> wrote: >>>>>> >>>>>>> Your source is not setting the timestamp with collectWithTimestamp. >>>>>>> I'm assuming that nothing really moves from the event time's >>>>>>> perspective. >>>>>>> >>>>>>> On Thu, May 6, 2021 at 8:58 AM Swagat Mishra <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Yes customer generator is setting the event timestamp correctly >>>>>>>> like I see below. I debugged and found that the events are getting >>>>>>>> late, so >>>>>>>> never executed. i.e,. in the window operator the method this >>>>>>>> .isWindowLate(actualWindow) is getting executed to false for the >>>>>>>> rest of the events except the first, hence the events are getting >>>>>>>> skipped, >>>>>>>> not able to figure out where exactly the issue is. >>>>>>>> >>>>>>>> i have removed evictot=r because I don't think I need it yet. >>>>>>>> >>>>>>>> stream looks like >>>>>>>> >>>>>>>> customerStream >>>>>>>> .assignTimestampsAndWatermarks(new CustomWatermarkStrategy()) >>>>>>>> .keyBy((KeySelector<Customer, String>) Customer::getIdentifier) >>>>>>>> >>>>>>>> .window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5)) >>>>>>>> .trigger(new EventTimeTrigger()) >>>>>>>> .process(new CustomAggregateFunction()); >>>>>>>> >>>>>>>> >>>>>>>> *Customer generator looks like:* >>>>>>>> >>>>>>>> while (isRunning) { >>>>>>>> Customer c = new Customer(CUSTOMER_KEY[counter % 5],* >>>>>>>> LocalTime.now()*, 1000); // that's the event time >>>>>>>> System.out.println("Writing customer: " + c); >>>>>>>> sourceContext.collect(c); >>>>>>>> //sourceContext.emitWatermark(new Watermark(c.getEventTime())); >>>>>>>> Thread.sleep(1000); >>>>>>>> counter++; >>>>>>>> if(counter % 11 == 0) { >>>>>>>> System.out.println("Sleeping for 10 seconds"); >>>>>>>> Thread.sleep(10000); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> Custom Watermark generator has this: >>>>>>>> >>>>>>>> ..... >>>>>>>> @Override >>>>>>>> public void onEvent(Customer customer, long l, WatermarkOutput >>>>>>>> watermarkOutput) { >>>>>>>> currentMaxTimestamp = Math.max(currentMaxTimestamp, >>>>>>>> customer.getEventTime() ); >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) { >>>>>>>> watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp)); >>>>>>>> >>>>>>>> } >>>>>>>> ..... >>>>>>>> >>>>>>>> trigger looks like: >>>>>>>> >>>>>>>> ------ >>>>>>>> >>>>>>>> >>>>>>>> @Override >>>>>>>> public TriggerResult onElement(Customer customer, long l, >>>>>>>> TimeWindow timeWindow, TriggerContext triggerContext) throws Exception >>>>>>>> { >>>>>>>> if (timeWindow.maxTimestamp() <= >>>>>>>> triggerContext.getCurrentWatermark()) { >>>>>>>> // if the watermark is already past the window fire >>>>>>>> immediately >>>>>>>> return TriggerResult.FIRE; >>>>>>>> } else { >>>>>>>> LOGGER.info("Max timestamp for customer: " + >>>>>>>> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp()); >>>>>>>> >>>>>>>> triggerContext.registerEventTimeTimer(customer.getLocalTime().plusSeconds(8).toNanoOfDay()); >>>>>>>> return TriggerResult.FIRE; >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public TriggerResult onEventTime(long time, TimeWindow timeWindow, >>>>>>>> TriggerContext triggerContext) { >>>>>>>> // if (timeWindow.maxTimestamp() > >>>>>>>> triggerContext.getCurrentWatermark()) { >>>>>>>> // >>>>>>>> triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp()); >>>>>>>> // return TriggerResult.CONTINUE; >>>>>>>> // } >>>>>>>> >>>>>>>> return time == timeWindow.maxTimestamp() ? >>>>>>>> TriggerResult.FIRE : >>>>>>>> TriggerResult.CONTINUE; >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> .... >>>>>>>> >>>>>>>> >>>>>>>> On Thu, May 6, 2021 at 12:02 PM Arvid Heise <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> Is your CustomerGenerator setting the event timestamp correctly? >>>>>>>>> Are your evictors evicting too early? >>>>>>>>> >>>>>>>>> You can try to add some debug output into the watermark assigner >>>>>>>>> and see if it's indeed progressing as expected. >>>>>>>>> >>>>>>>>> On Thu, May 6, 2021 at 12:48 AM Swagat Mishra <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> This seems to be working fine in processing time but doesn't work >>>>>>>>>> in event time. Is there an issue with the way the water mark is >>>>>>>>>> defined or >>>>>>>>>> do we need to set up timers? >>>>>>>>>> >>>>>>>>>> Please advise. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> WORKING: >>>>>>>>>> >>>>>>>>>> customerStream >>>>>>>>>> .keyBy((KeySelector<Customer, String>) >>>>>>>>>> Customer::getIdentifier) >>>>>>>>>> >>>>>>>>>> .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) >>>>>>>>>> .trigger(new EventTimeTrigger()) //EventTimeTrigger.create()) >>>>>>>>>> .process(new CustomAggregateFunction()); >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> NOT WORKING: >>>>>>>>>> >>>>>>>>>> customerStream >>>>>>>>>> >>>>>>>>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new >>>>>>>>>> WaterMarkAssigner())) >>>>>>>>>> .keyBy(Customer::getIdentifier) >>>>>>>>>> .window(EventTimeSessionWindows.withGap(Time.seconds(5))) >>>>>>>>>> .trigger(EventTimeTrigger.create()) >>>>>>>>>> .evictor(new CustomerEvictor()) >>>>>>>>>> .process(new CustomAggregateFunction()) >>>>>>>>>> .print(); >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, May 6, 2021 at 1:53 AM Sam <[email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Adding the code for CustomWatermarkGenerator >>>>>>>>>>> >>>>>>>>>>> ..... >>>>>>>>>>> @Override >>>>>>>>>>> public void onEvent(Customer customer, long l, WatermarkOutput >>>>>>>>>>> watermarkOutput) { >>>>>>>>>>> currentMaxTimestamp = Math.max(currentMaxTimestamp, >>>>>>>>>>> customer.getEventTime() ); >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> @Override >>>>>>>>>>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) { >>>>>>>>>>> watermarkOutput.emitWatermark(new >>>>>>>>>>> Watermark(currentMaxTimestamp)); >>>>>>>>>>> >>>>>>>>>>> } >>>>>>>>>>> ..... >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> Bit of background, I have a stream of customers who have >>>>>>>>>>>> purchased some product, reading these transactions on a KAFKA >>>>>>>>>>>> topic. I want >>>>>>>>>>>> to aggregate the number of products the customer has purchased in a >>>>>>>>>>>> particular duration ( say 10 seconds ) and write to a sink. >>>>>>>>>>>> >>>>>>>>>>>> I am using session windows to achieve the above. >>>>>>>>>>>> >>>>>>>>>>>> For test purposes, i have mocked up a customer stream and >>>>>>>>>>>> executed session windows like below. >>>>>>>>>>>> >>>>>>>>>>>> StreamExecutionEnvironment environment = >>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>>>> DataStream<Customer> customerStream = environment.addSource( new >>>>>>>>>>>> CustomerGenerator() ); >>>>>>>>>>>> >>>>>>>>>>>> customerStream >>>>>>>>>>>> >>>>>>>>>>>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7))) >>>>>>>>>>>> >>>>>>>>>>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new >>>>>>>>>>>> WaterMarkAssigner())) >>>>>>>>>>>> .keyBy(Customer::getIdentifier) >>>>>>>>>>>> .window(EventTimeSessionWindows.withGap(Time.seconds(5))) >>>>>>>>>>>> .trigger(EventTimeTrigger.create()) >>>>>>>>>>>> .evictor(new CustomerEvictor()) >>>>>>>>>>>> .process(new CustomAggregateFunction()) >>>>>>>>>>>> .print(); >>>>>>>>>>>> >>>>>>>>>>>> My watermark assigner looks like: >>>>>>>>>>>> >>>>>>>>>>>> public class WaterMarkAssigner implements >>>>>>>>>>>> WatermarkStrategy<Customer> { >>>>>>>>>>>> static final Logger logger = >>>>>>>>>>>> LoggerFactory.getLogger(WaterMarkAssigner.class); >>>>>>>>>>>> >>>>>>>>>>>> @Override >>>>>>>>>>>> public WatermarkGenerator<Customer> >>>>>>>>>>>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context >>>>>>>>>>>> context) { >>>>>>>>>>>> return new CustomWatermarkGenerator(); >>>>>>>>>>>> } >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> I notice that the evictor, and aggregation functions are getting >>>>>>>>>>>> called only once for the first customer in the stream. >>>>>>>>>>>> >>>>>>>>>>>> The data stream is generating customers at 1 seconds interval and >>>>>>>>>>>> there are 5 customer keys for which it's generating transactions. >>>>>>>>>>>> >>>>>>>>>>>> Am I doing something wrong with the above? >>>>>>>>>>>> >>>>>>>>>>>> I want to be able to capture the event on each transaction getting >>>>>>>>>>>> added and removed from the window so that I can perform the >>>>>>>>>>>> aggregation. >>>>>>>>>>>> >>>>>>>>>>>> please advise. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>
