Hi Gaurav,

I’m curious - for your use case, what are the windowing & aggregation 
requirements?

E.g. is it a 10 second sliding window?

And what’s the aggregation you’re trying to do?

Thanks,

— Ken


> On Sep 28, 2018, at 4:00 AM, Gaurav Luthra <gauravluthra6...@gmail.com> wrote:
> 
> Hi Chesnay,
> 
> I know it is an issue, And won't be fixed because of window merging feature 
> in case of session window.
> But I am looking if someone has implemented aggregation function using 
> ProcessFunction and process() method instead of AggregationFunction and 
> aggregate() method.
> I hope you got my point.
> 
> Thanks & Regards
> Gaurav Luthra
> 
> 
> 
> On Fri, Sep 28, 2018 at 4:22 PM Chesnay Schepler <ches...@apache.org 
> <mailto:ches...@apache.org>> wrote:
> Please see: https://issues.apache.org/jira/browse/FLINK-10250 
> <https://issues.apache.org/jira/browse/FLINK-10250>
> 
> On 28.09.2018 11:27, vino yang wrote:
>> Hi Gaurav,
>> 
>> Yes, you are right. It is really not allowed to use RichFunction. I will 
>> Ping Timo, he may give you a more professional answer.
>> 
>> Thanks, vino.
>> 
>> Gaurav Luthra <gauravluthra6...@gmail.com 
>> <mailto:gauravluthra6...@gmail.com>> 于2018年9月28日周五 下午4:27写道:
>> Hi Vino,
>> 
>> Kindly check below flink code.
>> 
>> package org.apache.flink.streaming.api.datastream.WindowedStream
>> 
>>      @PublicEvolving
>>      public <ACC, R> SingleOutputStreamOperator<R> 
>> aggregate(AggregateFunction<T, ACC, R> function) {
>>              checkNotNull(function, "function");
>> 
>>              if (function instanceof RichFunction) {
>>                      throw new UnsupportedOperationException("This 
>> aggregation function cannot be a RichFunction.");
>>              }
>> 
>>              TypeInformation<ACC> accumulatorType = 
>> TypeExtractor.getAggregateFunctionAccumulatorType(
>>                              function, input.getType(), null, false);
>> 
>>              TypeInformation<R> resultType = 
>> TypeExtractor.getAggregateFunctionReturnType(
>>                              function, input.getType(), null, false);
>> 
>>              return aggregate(function, accumulatorType, resultType);
>>      }
>> 
>> 
>> Kindly, check above snapshot of flink;s aggregate() method, that got applied 
>> on windowed stream.
>> 
>> Thanks & Regards
>> Gaurav Luthra
>> Mob:- +91-9901945206
>> 
>> 
>> On Fri, Sep 28, 2018 at 1:40 PM vino yang <yanghua1...@gmail.com 
>> <mailto:yanghua1...@gmail.com>> wrote:
>> Hi Gaurav,
>> 
>> This is very strange, can you share your code and specific exceptions? Under 
>> normal circumstances, it should not throw an exception.
>> 
>> Thanks, vino.
>> 
>> Gaurav Luthra <gauravluthra6...@gmail.com 
>> <mailto:gauravluthra6...@gmail.com>> 于2018年9月28日周五 下午3:27写道:
>> Hi Vino,
>> 
>> RichAggregateFunction can surely access the state. But the problem is, In 
>> aggregate() method we can not use RichAggregateFunction.
>> If we use then it throws exception.
>> 
>> So, the option is to use AggregateFunction (not Rich) with aggregate() 
>> method on windowed stream. Now, In AggregateFunction, we cannot access 
>> RuntimeContext. Hence we can not use state.
>> 
>> Thanks & Regards
>> Gaurav
>> 
>> 
>> 
>> On Fri, 28 Sep, 2018, 12:40 PM vino yang, <yanghua1...@gmail.com 
>> <mailto:yanghua1...@gmail.com>> wrote:
>> Hi Gaurav,
>> 
>> Why do you think the RichAggregateFunction cannot access the State API? 
>> RichAggregateFunction inherits from AbstractRichFunction (it provides a 
>> RuntimeContext that allows you to access the state API).
>> 
>> Thanks, vino.
>> 
>> Gaurav Luthra <gauravluthra6...@gmail.com 
>> <mailto:gauravluthra6...@gmail.com>> 于2018年9月28日周五 下午1:38写道:
>> Hi,
>> 
>> As we are aware, Currently we cannot use RichAggregateFunction in
>> aggregate() method upon windowed stream. So, To access the state in your
>> customAggregateFunction, you can implement it using a ProcessFuntion.
>> This issue is faced by many developers.
>> So, someone must have implemented or tried to implement it. So, kindly share
>> your feedback on this.
>> As I need to implement this.
>> 
>> Thanks & Regards
>> Gaurav Luthra

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply via email to