Hi Chesnay,

I know it is an issue, And won't be fixed because of window merging feature
in case of session window.
But I am looking if someone has implemented aggregation function using
ProcessFunction and process() method instead of AggregationFunction and
aggregate() method.
I hope you got my point.

Thanks & Regards
Gaurav Luthra



On Fri, Sep 28, 2018 at 4:22 PM Chesnay Schepler <ches...@apache.org> wrote:

> Please see: https://issues.apache.org/jira/browse/FLINK-10250
>
> On 28.09.2018 11:27, vino yang wrote:
>
> Hi Gaurav,
>
> Yes, you are right. It is really not allowed to use RichFunction. I will
> Ping Timo, he may give you a more professional answer.
>
> Thanks, vino.
>
> Gaurav Luthra <gauravluthra6...@gmail.com> 于2018年9月28日周五 下午4:27写道:
>
>> Hi Vino,
>>
>> Kindly check below flink code.
>>
>> package org.apache.flink.streaming.api.datastream.WindowedStream
>>
>> @PublicEvolving
>> public <ACC, R> SingleOutputStreamOperator<R>
>> aggregate(AggregateFunction<T, ACC, R> function) {
>> checkNotNull(function, "function");
>>
>> if (*function instanceof RichFunction*) {
>> throw new *UnsupportedOperationException("This aggregation function
>> cannot be a RichFunction.")*;
>> }
>>
>> TypeInformation<ACC> accumulatorType =
>> TypeExtractor.getAggregateFunctionAccumulatorType(
>> function, input.getType(), null, false);
>>
>> TypeInformation<R> resultType =
>> TypeExtractor.getAggregateFunctionReturnType(
>> function, input.getType(), null, false);
>>
>> return aggregate(function, accumulatorType, resultType);
>> }
>>
>>
>> Kindly, check above snapshot of flink;s aggregate() method, that got
>> applied on windowed stream.
>>
>> Thanks & Regards
>> Gaurav Luthra
>> Mob:- +91-9901945206
>>
>>
>> On Fri, Sep 28, 2018 at 1:40 PM vino yang <yanghua1...@gmail.com> wrote:
>>
>>> Hi Gaurav,
>>>
>>> This is very strange, can you share your code and specific exceptions?
>>> Under normal circumstances, it should not throw an exception.
>>>
>>> Thanks, vino.
>>>
>>> Gaurav Luthra <gauravluthra6...@gmail.com> 于2018年9月28日周五 下午3:27写道:
>>>
>>>> Hi Vino,
>>>>
>>>> RichAggregateFunction can surely access the state. But the problem is,
>>>> In aggregate() method we can not use RichAggregateFunction.
>>>> If we use then it throws exception.
>>>>
>>>> So, the option is to use AggregateFunction (not Rich) with aggregate()
>>>> method on windowed stream. Now, In AggregateFunction, we cannot access
>>>> RuntimeContext. Hence we can not use state.
>>>>
>>>> Thanks & Regards
>>>> Gaurav
>>>>
>>>>
>>>>
>>>> On Fri, 28 Sep, 2018, 12:40 PM vino yang, <yanghua1...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Gaurav,
>>>>>
>>>>> Why do you think the RichAggregateFunction cannot access the State
>>>>> API?
>>>>> RichAggregateFunction inherits from AbstractRichFunction (it provides
>>>>> a RuntimeContext that allows you to access the state API).
>>>>>
>>>>> Thanks, vino.
>>>>>
>>>>> Gaurav Luthra <gauravluthra6...@gmail.com> 于2018年9月28日周五 下午1:38写道:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> As we are aware, Currently we cannot use RichAggregateFunction in
>>>>>> aggregate() method upon windowed stream. So, To access the state in
>>>>>> your
>>>>>> customAggregateFunction, you can implement it using a ProcessFuntion.
>>>>>> This issue is faced by many developers.
>>>>>> So, someone must have implemented or tried to implement it. So,
>>>>>> kindly share
>>>>>> your feedback on this.
>>>>>> As I need to implement this.
>>>>>>
>>>>>> Thanks & Regards
>>>>>> Gaurav Luthra
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Sent from:
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>>
>>>>>
>

Reply via email to