Hi Fabian,

In this case, how do we emit tumbling window when there are no events? 
Otherwise it’s not possible to emulate a sliding window in process function and 
move the buffer ring every 5 mins when there are no events. 

Yes I can create a periodic source function but how can it be associated with 
all the keyed windows. 

Thanks. 

Best,

> On 2 Aug 2019, at 12:49, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Ok, I won't go into the implementation detail.
> 
> The idea is to track all products that were observed in the last five minutes 
> (i.e., unique product ids) in a five minute tumbling window.
> Every five minutes, the observed products are send to a process function that 
> collects the data of the last 24 hours and updates the current result by 
> adding the data of the latest 5 minutes and removing the data of the 5 
> minutes that fell out of the 24 hour window.
> 
> I don't know your exact business logic, but this is the rough scheme that I 
> would follow.
> 
> Cheers, Fabian
> 
>> Am Fr., 2. Aug. 2019 um 12:25 Uhr schrieb Ahmad Hassan 
>> <ahmad.has...@gmail.com>:
>> Hi Fabian,
>> 
>> Thanks for this detail. However, our pipeline is keeping track of list of 
>> products seen in 24 hour with 5 min slide (288 windows). 
>> 
>> inStream
>> .filter(Objects::nonNull)
>> .keyBy(TENANT)
>> .window(SlidingProcessingTimeWindows.of(Time.minutes(24), Time.minutes(5)))
>> .trigger(TimeTrigger.create())
>> .evictor(CountEvictor.of(1))
>> .process(new MetricProcessWindowFunction());
>> 
>> Trigger just fires for onElement and MetricProcessWindowFunction just store 
>> stats for each product within MapState
>> and emit only if it reaches expiry. Evictor just empty the window as all 
>> products state is within MapState. Flink 1.7.0 checkpointing just hangs and 
>> expires while processing our pipeline. 
>> 
>> However, with your proposed solution, how would we be able to achieve this 
>> sliding window mechanism of emitting 24 hour window every 5 minute using 
>> processfunction ?
>> 
>> Best,  
>> 
>> 
>>> On Fri, 2 Aug 2019 at 09:48, Fabian Hueske <fhue...@gmail.com> wrote:
>>> Hi Ahmad,
>>> 
>>> First of all, you need to preaggregate the data in a 5 minute tumbling 
>>> window. For example, if your aggregation function is count or sum, this is 
>>> simple.
>>> You have a 5 min tumbling window that just emits a count or sum every 5 
>>> minutes.
>>> 
>>> The ProcessFunction then has a MapState<Integer, IntermediateAgg> (called 
>>> buffer). IntermediateAgg is the result type of the tumbling window and the 
>>> MapState is used like an array with the Integer key being the position 
>>> pointer to the value. You will only use the pointers 0 to 287 to store the 
>>> 288 intermediate aggregation values and use the MapState as a ring buffer. 
>>> For that you need a ValueState<Integer> (called pointer) that is a pointer 
>>> to the position that is overwritten next. Finally, you have a 
>>> ValueState<Result> (called result) that stores the result of the last 
>>> window.
>>> 
>>> When the ProcessFunction receives a new intermediate result, it will 
>>> perform the following steps:
>>> 
>>> 1) get the oldest intermediate result: buffer.get(pointer)
>>> 2) override the oldest intermediate result by the newly received 
>>> intermediate result: buffer.put(pointer, new-intermediate-result)
>>> 3) increment the pointer by 1 and reset it to 0 if it became 288
>>> 4) subtract the oldest intermediate result from the result
>>> 5) add the newly received intermediate result to the result. Update the 
>>> result state and emit the result
>>> 
>>> Note, this only works for certain aggregation functions. Depending on the 
>>> function, you cannot pre-aggregate which is a hard requirement for this 
>>> approach.
>>> 
>>> Best, Fabian
>>> 
>>>> Am Do., 1. Aug. 2019 um 20:00 Uhr schrieb Ahmad Hassan 
>>>> <ahmad.has...@gmail.com>:
>>>> 
>>>> Hi Fabian,
>>>> 
>>>> > On 4 Jul 2018, at 11:39, Fabian Hueske <fhue...@gmail.com> wrote:
>>>> > 
>>>> > - Pre-aggregate records in a 5 minute Tumbling window. However, 
>>>> > pre-aggregation does not work for FoldFunctions.
>>>> > - Implement the window as a custom ProcessFunction that maintains a 
>>>> > state of 288 events and aggregates and retracts the pre-aggregated 
>>>> > records.
>>>> > 
>>>> > Best, Fabian
>>>> 
>>>> We are finally implementing processFunction to replace Flink Sliding 
>>>> Window. Please can you elaborate how can we implement the sliding window 
>>>> as processfunction like you explained above. I am struggling to understand 
>>>> how will I keep track of what events belong to which window. We have 24hr 
>>>> running sliding window with 5 min slide (288 windows). How do I emulate 
>>>> 288 windows in processfunction with 5 min slide?
>>>> 
>>>> 288 sliding windows cause flink checkpoints to hang and never finish even 
>>>> in an hour even with MapState RocksDB. So we decide to get rid of sliding 
>>>> window and use process function to implement sliding window logic. 
>>>> 
>>>> Best,

Reply via email to