Re: Behaviour of Process Window Function

2018-09-10 Thread Harshvardhan Agrawal
Hi,

Our application is financial data enrichment. What we want to do is that we
want to first key the positions by Account Number and then window them.
Within a window I want to get all the unique products across all the
accounts and make an external service call to hydrate the cache for that
window. We also want to be able to handle cases where it is possible that a
product could be owned by multiple accounts in a window in which case we
don't want to be making the external call multiple times. In my case, all
the incremental aggregation really does is that it collects all the unqiue
product keys in a set and then it supplies the set to the process window
function which takes care of hydrating and clearing the Guava cache.

Is there any way we could share some managed state across multiple keys? If
not, I will have to use Guava cache and I won't be able to take the benefit
of async checkpointing.

On Fri, Sep 7, 2018 at 9:49 AM Hequn Cheng  wrote:

> Hi Harshvardhan,
>
> *> 1) Does the state in the process window function qualify as KeyedState
> or OperatorState? *
> KeyedState
>
> *> We want to be able to rehydrate the guava cache at the beginning of
> each window by making an external rest call and clear the cache at the end
> of that respective window. How can we enforce this behaviour in Flink?*
> Why do you want to clear cache after window if the cache is shared across
> all keys. Do you want to load cache per key?
> If you want to aggregate elements incrementally, I think it is hard to get
> start and end in `ProcessWindowFunction` or in `IncrementalAggregation`
> function. However, I think we can get start and end in the trigger
> function, i.e., do cache load and clear in the trigger function.
>
> Best, Hequn
>
>
> On Fri, Sep 7, 2018 at 11:28 AM vino yang  wrote:
>
>> Hi Harshvardhan,
>>
>> 1) Yes, ProcessWindowFunction extends AbstractRichFunction, through
>> getRuntimeContext,you can access keyed state API.
>> 2) ProcessWindowFunction has given you considerable flexibility, you can
>> based on processing time / event time / timer / it's clear method /
>> customized implementation, the specific design depends on your business
>> logic, how long you need to save the cache.
>>
>> Thanks, vino.
>>
>> Harshvardhan Agrawal  于2018年9月6日周四
>> 下午10:10写道:
>>
>>> Hello,
>>>
>>> We have a Flink pipeline where we are windowing our data after a keyBy.
>>> i.e.
>>> myStream.keyBy().window().process(MyIncrementalAggregation(),
>>> MyProcessFunction()).
>>>
>>> I have two questions about the above line of code:
>>> 1) Does the state in the process window function qualify as KeyedState
>>> or OperatorState? If operator state, can we access KeyedState from the
>>> Process Window function?
>>> 2) We also have certain reference information that we want to share
>>> across all keys in the process window function. We are currently storing
>>> all that info in a Guava cache. We want to be able to rehydrate the guava
>>> cache at the beginning of each window by making an external rest call and
>>> clear the cache at the end of that respective window. How can we enforce
>>> this behaviour in Flink? Do I need to use a timerservice for this where the
>>> callback will be a window.maxtimestamp() or just clearing the cache in the
>>> clear method will do the trick?
>>>
>>> --
>>>
>>> *Regards,Harshvardhan Agrawal*
>>>
>>

-- 

*Regards,Harshvardhan Agrawal*
*267.991.6618 | LinkedIn *


Re: Behaviour of Process Window Function

2018-09-07 Thread Hequn Cheng
Hi Harshvardhan,

*> 1) Does the state in the process window function qualify as KeyedState
or OperatorState? *
KeyedState

*> We want to be able to rehydrate the guava cache at the beginning of each
window by making an external rest call and clear the cache at the end of
that respective window. How can we enforce this behaviour in Flink?*
Why do you want to clear cache after window if the cache is shared across
all keys. Do you want to load cache per key?
If you want to aggregate elements incrementally, I think it is hard to get
start and end in `ProcessWindowFunction` or in `IncrementalAggregation`
function. However, I think we can get start and end in the trigger
function, i.e., do cache load and clear in the trigger function.

Best, Hequn


On Fri, Sep 7, 2018 at 11:28 AM vino yang  wrote:

> Hi Harshvardhan,
>
> 1) Yes, ProcessWindowFunction extends AbstractRichFunction, through
> getRuntimeContext,you can access keyed state API.
> 2) ProcessWindowFunction has given you considerable flexibility, you can
> based on processing time / event time / timer / it's clear method /
> customized implementation, the specific design depends on your business
> logic, how long you need to save the cache.
>
> Thanks, vino.
>
> Harshvardhan Agrawal  于2018年9月6日周四
> 下午10:10写道:
>
>> Hello,
>>
>> We have a Flink pipeline where we are windowing our data after a keyBy.
>> i.e.
>> myStream.keyBy().window().process(MyIncrementalAggregation(),
>> MyProcessFunction()).
>>
>> I have two questions about the above line of code:
>> 1) Does the state in the process window function qualify as KeyedState or
>> OperatorState? If operator state, can we access KeyedState from the Process
>> Window function?
>> 2) We also have certain reference information that we want to share
>> across all keys in the process window function. We are currently storing
>> all that info in a Guava cache. We want to be able to rehydrate the guava
>> cache at the beginning of each window by making an external rest call and
>> clear the cache at the end of that respective window. How can we enforce
>> this behaviour in Flink? Do I need to use a timerservice for this where the
>> callback will be a window.maxtimestamp() or just clearing the cache in the
>> clear method will do the trick?
>>
>> --
>>
>> *Regards,Harshvardhan Agrawal*
>>
>


Re: Behaviour of Process Window Function

2018-09-06 Thread vino yang
Hi Harshvardhan,

1) Yes, ProcessWindowFunction extends AbstractRichFunction, through
getRuntimeContext,you can access keyed state API.
2) ProcessWindowFunction has given you considerable flexibility, you can
based on processing time / event time / timer / it's clear method /
customized implementation, the specific design depends on your business
logic, how long you need to save the cache.

Thanks, vino.

Harshvardhan Agrawal  于2018年9月6日周四 下午10:10写道:

> Hello,
>
> We have a Flink pipeline where we are windowing our data after a keyBy.
> i.e.
> myStream.keyBy().window().process(MyIncrementalAggregation(),
> MyProcessFunction()).
>
> I have two questions about the above line of code:
> 1) Does the state in the process window function qualify as KeyedState or
> OperatorState? If operator state, can we access KeyedState from the Process
> Window function?
> 2) We also have certain reference information that we want to share across
> all keys in the process window function. We are currently storing all that
> info in a Guava cache. We want to be able to rehydrate the guava cache at
> the beginning of each window by making an external rest call and clear the
> cache at the end of that respective window. How can we enforce this
> behaviour in Flink? Do I need to use a timerservice for this where the
> callback will be a window.maxtimestamp() or just clearing the cache in the
> clear method will do the trick?
>
> --
>
> *Regards,Harshvardhan Agrawal*
>


Behaviour of Process Window Function

2018-09-06 Thread Harshvardhan Agrawal
Hello,

We have a Flink pipeline where we are windowing our data after a keyBy. i.e.
myStream.keyBy().window().process(MyIncrementalAggregation(),
MyProcessFunction()).

I have two questions about the above line of code:
1) Does the state in the process window function qualify as KeyedState or
OperatorState? If operator state, can we access KeyedState from the Process
Window function?
2) We also have certain reference information that we want to share across
all keys in the process window function. We are currently storing all that
info in a Guava cache. We want to be able to rehydrate the guava cache at
the beginning of each window by making an external rest call and clear the
cache at the end of that respective window. How can we enforce this
behaviour in Flink? Do I need to use a timerservice for this where the
callback will be a window.maxtimestamp() or just clearing the cache in the
clear method will do the trick?

-- 

*Regards,Harshvardhan Agrawal*