[ 
https://issues.apache.org/jira/browse/FLINK-18119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136362#comment-17136362
 ] 

Hyeonseop Lee commented on FLINK-18119:
---------------------------------------

[~libenchao] {{RowTimeRange*Unbounded*PrecedingFunction}} is not the case. 
{{RowTimeRange*Bounded*PrecedingFunction}} in blink runtime 1.11 still has the 
issue.

It performs cleanup using processing timer when proper {{minRetentionTime}} and 
{{maxRetentionTime}} are configured, but what I want to improve is to retract 
records that is no longer required even the state retention is not set 
(indefinite).

In my case, I first tried to set non-zero {{minRetentionTime}} to enable 
cleanup by retention, but that was applied to whole query and ended up with the 
retract stream instead of append stream. I understand setting state retention 
can be a walkaround to prevent OOM but I think functions must keep state as 
efficiently as possible.

> Fix unlimitedly growing state for time range bounded over aggregate
> -------------------------------------------------------------------
>
>                 Key: FLINK-18119
>                 URL: https://issues.apache.org/jira/browse/FLINK-18119
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Runtime
>    Affects Versions: 1.10.1
>            Reporter: Hyeonseop Lee
>            Priority: Major
>
> For time range bounded over aggregation in streaming query, like below,
> {code:java}
> table
>   .window(Over.partitionBy 'a orderBy 'rowtime preceding 1.hour as 'w)
>   .groupBy('w)
>   .select('a, aggregateFunction('b))
> {code}
> the operator must hold incoming records over the preceding time range in the 
> state, but older records are no longer required and can be cleaned up.
> Current implementation cleans the old records up only when newer records come 
> in and so the operator knows that enough time has passed. However, the clean 
> up never happens unless a new record with the same key comes in and this 
> causes a state that perhaps will never be cleaned up, which leads to an 
> unlimitedly growing state especially when the keyspace mutates over time.
> Since aggregate over bounded preceding time interval doesn't require old 
> records by its nature, we can improve this by adding a timer that notifies 
> the operator to clean up old records, resulting in no changes in query result 
> or severe performance degrade.
> This is a distinct feature from state retention: state retention is to forget 
> some states that are expected to be less important to reduce state memory, so 
> it possibly changes query results. Enabling and disabling state retention 
> both make sense with this change.
> This issue applies to both row time range bound and proc time range bound. 
> That is, we are going to have changes in both 
> RowTimeRangeBoundedPrecedingFunction and 
> ProcTimeRangeBoundedPrecedingFunction in flink-table-runtime-blink. I already 
> have a running-in-production version with this change and would be glad to 
> contribute.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to