Thank you Fabian. This works really well.

Best Regards,

On Fri, 16 Aug 2019 at 09:22, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Ahmad,
>
> The ProcessFunction should not rely on new records to come (i..e, do the
> processsing in the onElement() method) but rather register a timer every 5
> minutes and perform the processing when the timer fires in onTimer().
> Essentially, you'd only collect data the data in `processElement()` and
> process in `onTimer()`.
> You need to make sure that you have timers registered, as long as there's
> data in the ring buffer.
>
> Best, Fabian
>
> Am Do., 15. Aug. 2019 um 19:20 Uhr schrieb Ahmad Hassan <
> ahmad.has...@gmail.com>:
>
>> Hi Fabian,
>>
>> In this case, how do we emit tumbling window when there are no events?
>> Otherwise it’s not possible to emulate a sliding window in process function
>> and move the buffer ring every 5 mins when there are no events.
>>
>> Yes I can create a periodic source function but how can it be associated
>> with all the keyed windows.
>>
>> Thanks.
>>
>> Best,
>>
>> On 2 Aug 2019, at 12:49, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>> Ok, I won't go into the implementation detail.
>>
>> The idea is to track all products that were observed in the last five
>> minutes (i.e., unique product ids) in a five minute tumbling window.
>> Every five minutes, the observed products are send to a process function
>> that collects the data of the last 24 hours and updates the current result
>> by adding the data of the latest 5 minutes and removing the data of the 5
>> minutes that fell out of the 24 hour window.
>>
>> I don't know your exact business logic, but this is the rough scheme that
>> I would follow.
>>
>> Cheers, Fabian
>>
>> Am Fr., 2. Aug. 2019 um 12:25 Uhr schrieb Ahmad Hassan <
>> ahmad.has...@gmail.com>:
>>
>>> Hi Fabian,
>>>
>>> Thanks for this detail. However, our pipeline is keeping track of list
>>> of products seen in 24 hour with 5 min slide (288 windows).
>>>
>>> inStream
>>>
>>> .filter(Objects::*nonNull*)
>>>
>>> .keyBy(*TENANT*)
>>>
>>> .window(SlidingProcessingTimeWindows.*of*(Time.*minutes*(24), Time.
>>> *minutes*(5)))
>>>
>>> .trigger(TimeTrigger.*create*())
>>>
>>> .evictor(CountEvictor.*of*(1))
>>>
>>> .process(*new* MetricProcessWindowFunction());
>>>
>>>
>>> Trigger just fires for onElement and MetricProcessWindowFunction just
>>> store stats for each product within MapState
>>>
>>> and emit only if it reaches expiry. Evictor just empty the window as all
>>> products state is within MapState. Flink 1.7.0 checkpointing just hangs and
>>> expires while processing our pipeline.
>>>
>>>
>>> However, with your proposed solution, how would we be able to achieve
>>> this sliding window mechanism of emitting 24 hour window every 5 minute
>>> using processfunction ?
>>>
>>>
>>> Best,
>>>
>>>
>>> On Fri, 2 Aug 2019 at 09:48, Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>>> Hi Ahmad,
>>>>
>>>> First of all, you need to preaggregate the data in a 5 minute tumbling
>>>> window. For example, if your aggregation function is count or sum, this is
>>>> simple.
>>>> You have a 5 min tumbling window that just emits a count or sum every 5
>>>> minutes.
>>>>
>>>> The ProcessFunction then has a MapState<Integer, IntermediateAgg>
>>>> (called buffer). IntermediateAgg is the result type of the tumbling window
>>>> and the MapState is used like an array with the Integer key being the
>>>> position pointer to the value. You will only use the pointers 0 to 287 to
>>>> store the 288 intermediate aggregation values and use the MapState as a
>>>> ring buffer. For that you need a ValueState<Integer> (called pointer) that
>>>> is a pointer to the position that is overwritten next. Finally, you have a
>>>> ValueState<Result> (called result) that stores the result of the last
>>>> window.
>>>>
>>>> When the ProcessFunction receives a new intermediate result, it will
>>>> perform the following steps:
>>>>
>>>> 1) get the oldest intermediate result: buffer.get(pointer)
>>>> 2) override the oldest intermediate result by the newly received
>>>> intermediate result: buffer.put(pointer, new-intermediate-result)
>>>> 3) increment the pointer by 1 and reset it to 0 if it became 288
>>>> 4) subtract the oldest intermediate result from the result
>>>> 5) add the newly received intermediate result to the result. Update the
>>>> result state and emit the result
>>>>
>>>> Note, this only works for certain aggregation functions. Depending on
>>>> the function, you cannot pre-aggregate which is a hard requirement for this
>>>> approach.
>>>>
>>>> Best, Fabian
>>>>
>>>> Am Do., 1. Aug. 2019 um 20:00 Uhr schrieb Ahmad Hassan <
>>>> ahmad.has...@gmail.com>:
>>>>
>>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> > On 4 Jul 2018, at 11:39, Fabian Hueske <fhue...@gmail.com> wrote:
>>>>> >
>>>>> > - Pre-aggregate records in a 5 minute Tumbling window. However,
>>>>> pre-aggregation does not work for FoldFunctions.
>>>>> > - Implement the window as a custom ProcessFunction that maintains a
>>>>> state of 288 events and aggregates and retracts the pre-aggregated 
>>>>> records.
>>>>> >
>>>>> > Best, Fabian
>>>>>
>>>>> We are finally implementing processFunction to replace Flink Sliding
>>>>> Window. Please can you elaborate how can we implement the sliding window 
>>>>> as
>>>>> processfunction like you explained above. I am struggling to understand 
>>>>> how
>>>>> will I keep track of what events belong to which window. We have 24hr
>>>>> running sliding window with 5 min slide (288 windows). How do I emulate 
>>>>> 288
>>>>> windows in processfunction with 5 min slide?
>>>>>
>>>>> 288 sliding windows cause flink checkpoints to hang and never finish
>>>>> even in an hour even with MapState RocksDB. So we decide to get rid of
>>>>> sliding window and use process function to implement sliding window logic.
>>>>>
>>>>> Best,
>>>>
>>>>

Reply via email to