Hi  Sachin,

`performing incremental aggregation using stateful processing` is same as
`windows with agg`, but former is more flexible.If flink window can not
satisfy your performance needs
,and your business logic has some features that can be customized for
optimization. You can choose the former.

Best,
Zhongqiang Gong

Sachin Mittal <sjmit...@gmail.com> 于2024年5月17日周五 19:39写道:

> Hi,
> I am doing the following
> 1. Use reduce function where the data type of output after windowing is
> the same as the input.
> 2. Where the output of data type after windowing is different from that of
> input I use the aggregate function. For example:
>
> SingleOutputStreamOperator<AggregatedData> data =
>     reducedPlayerStatsData
>         .keyBy(new KeySelector())
>         .window(
>             TumblingEventTimeWindows.of(Time.seconds(secs)))
>         .aggregate(new DataAggregator())
>         .name("aggregate");
>
> In this case data which is aggregated is of a different type than the
> input so I had to use aggregate function.
> However in cases where data is of the same type using reduce function is
> very simple to use.
> Is there any fundamental difference between aggregate and reduce function
> in terms of performance?
> 3. I have enable incremental checkpoints at flink conf level using:
> state.backend.type: "rocksdb"
> state.backend.incremental: "true"
>
> 4. I am really not sure how I can use TTL. I assumed that Flink would
> automatically clean the state of windows that are expired ? Is there any
> way I can use TTL in the steps I have mentioned.
> 5. When you talk about pre-aggregation is this what you mean, say first
> compute minute aggregation and use that as input for hour aggregation ? So
> my pipeline would be something like this:
>
> SingleOutputStreamOperator<ReducedData> reducedData =
>     data
>         .keyBy(new KeySelector())
>         .window(
>             TumblingEventTimeWindows.of(Time.seconds(60)))
>         .reduce(new DataReducer())        .window(
>
>             TumblingEventTimeWindows.of(Time.seconds(3600)))
>         .reduce(new DataReducer())        .name("reduce");
>
>
> I was thinking of performing incremental aggregation using stateful 
> processing.
>
> Basically read one record and reduce it and store it in state and then read 
> next and reduce that plus the current state and update the new reduced value 
> back in the state and so on.
>
> Fire the final reduced value from the state at the end of eventtime I 
> register to my event timer and then update the timer to next event time and 
> also clean the state.
>
> This way each state would always keep only one record, no matter for what 
> period we aggregate data for.
>
> Is this a better approach than windowing ?
>
>
> Thanks
> Sachin
>
>
> On Fri, May 17, 2024 at 1:14 PM gongzhongqiang <gongzhongqi...@apache.org>
> wrote:
>
>> Hi  Sachin,
>>
>> We can optimize this problem in the following ways:
>> -
>> use 
>> org.apache.flink.streaming.api.datastream.WindowedStream#aggregate(org.apache.flink.api.common.functions.AggregateFunction<T,ACC,R>)
>> to reduce number of data
>> - use TTL to clean data which are not need
>> - enble incremental checkpoint
>> - use
>> multi-level time window granularity for pre-aggregation can significantly 
>> improve performance and reduce computation latency
>>
>> Best,
>> Zhongqiang Gong
>>
>> Sachin Mittal <sjmit...@gmail.com> 于2024年5月17日周五 03:48写道:
>>
>>> Hi,
>>> My pipeline step is something like this:
>>>
>>> SingleOutputStreamOperator<ReducedData> reducedData =
>>>     data
>>>         .keyBy(new KeySelector())
>>>         .window(
>>>             TumblingEventTimeWindows.of(Time.seconds(secs)))
>>>         .reduce(new DataReducer())
>>>         .name("reduce");
>>>
>>>
>>> This works fine for secs = 300.
>>> However once I increase the time window to say 1 hour or 3600 the state
>>> size increases as now it has a lot more records to reduce.
>>>
>>> Hence I need to allocate much more memory to the task manager.
>>>
>>> However there is no upper limit to this memory allocated. If the volume
>>> of data increases by say 10 fold I would have no option but to again
>>> increase the memory.
>>>
>>> Is there a better way to perform long window aggregation so overall this
>>> step has a small memory footprint.
>>>
>>> Thanks
>>> Sachin
>>>
>>>

Reply via email to