Hi Chesnay, I know it is an issue, And won't be fixed because of window merging feature in case of session window. But I am looking if someone has implemented aggregation function using ProcessFunction and process() method instead of AggregationFunction and aggregate() method. I hope you got my point.
Thanks & Regards Gaurav Luthra On Fri, Sep 28, 2018 at 4:22 PM Chesnay Schepler <ches...@apache.org> wrote: > Please see: https://issues.apache.org/jira/browse/FLINK-10250 > > On 28.09.2018 11:27, vino yang wrote: > > Hi Gaurav, > > Yes, you are right. It is really not allowed to use RichFunction. I will > Ping Timo, he may give you a more professional answer. > > Thanks, vino. > > Gaurav Luthra <gauravluthra6...@gmail.com> 于2018年9月28日周五 下午4:27写道: > >> Hi Vino, >> >> Kindly check below flink code. >> >> package org.apache.flink.streaming.api.datastream.WindowedStream >> >> @PublicEvolving >> public <ACC, R> SingleOutputStreamOperator<R> >> aggregate(AggregateFunction<T, ACC, R> function) { >> checkNotNull(function, "function"); >> >> if (*function instanceof RichFunction*) { >> throw new *UnsupportedOperationException("This aggregation function >> cannot be a RichFunction.")*; >> } >> >> TypeInformation<ACC> accumulatorType = >> TypeExtractor.getAggregateFunctionAccumulatorType( >> function, input.getType(), null, false); >> >> TypeInformation<R> resultType = >> TypeExtractor.getAggregateFunctionReturnType( >> function, input.getType(), null, false); >> >> return aggregate(function, accumulatorType, resultType); >> } >> >> >> Kindly, check above snapshot of flink;s aggregate() method, that got >> applied on windowed stream. >> >> Thanks & Regards >> Gaurav Luthra >> Mob:- +91-9901945206 >> >> >> On Fri, Sep 28, 2018 at 1:40 PM vino yang <yanghua1...@gmail.com> wrote: >> >>> Hi Gaurav, >>> >>> This is very strange, can you share your code and specific exceptions? >>> Under normal circumstances, it should not throw an exception. >>> >>> Thanks, vino. >>> >>> Gaurav Luthra <gauravluthra6...@gmail.com> 于2018年9月28日周五 下午3:27写道: >>> >>>> Hi Vino, >>>> >>>> RichAggregateFunction can surely access the state. But the problem is, >>>> In aggregate() method we can not use RichAggregateFunction. >>>> If we use then it throws exception. >>>> >>>> So, the option is to use AggregateFunction (not Rich) with aggregate() >>>> method on windowed stream. Now, In AggregateFunction, we cannot access >>>> RuntimeContext. Hence we can not use state. >>>> >>>> Thanks & Regards >>>> Gaurav >>>> >>>> >>>> >>>> On Fri, 28 Sep, 2018, 12:40 PM vino yang, <yanghua1...@gmail.com> >>>> wrote: >>>> >>>>> Hi Gaurav, >>>>> >>>>> Why do you think the RichAggregateFunction cannot access the State >>>>> API? >>>>> RichAggregateFunction inherits from AbstractRichFunction (it provides >>>>> a RuntimeContext that allows you to access the state API). >>>>> >>>>> Thanks, vino. >>>>> >>>>> Gaurav Luthra <gauravluthra6...@gmail.com> 于2018年9月28日周五 下午1:38写道: >>>>> >>>>>> Hi, >>>>>> >>>>>> As we are aware, Currently we cannot use RichAggregateFunction in >>>>>> aggregate() method upon windowed stream. So, To access the state in >>>>>> your >>>>>> customAggregateFunction, you can implement it using a ProcessFuntion. >>>>>> This issue is faced by many developers. >>>>>> So, someone must have implemented or tried to implement it. So, >>>>>> kindly share >>>>>> your feedback on this. >>>>>> As I need to implement this. >>>>>> >>>>>> Thanks & Regards >>>>>> Gaurav Luthra >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Sent from: >>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>>>> >>>>> >