[ 
https://issues.apache.org/jira/browse/FLINK-23721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17405782#comment-17405782
 ] 

Q Kang commented on FLINK-23721:
--------------------------------

[~yunta] [~jark] 

According to my observation, there are 2 `StateTtlConfig` objects on heap. One 
of them has TTL of 30000ms, and `OnCreateAndWrite` update type. The other has 
TTL of `Long.MAX_VALUE`ms, and `Disabled` update type. Whatever the state 
backend is, the jmap result are the same. 

It appears that `table.exec.state.ttl` has been set successfully. But in the 
case above, the state size with non-incremental RocksDB and FileSystem backends 
behave completely different. This phenomenon can be reproduced both in local 
environment and on production cluster (on YARN). We also checked the underlying 
source code, but no luck.

 

> Flink SQL state TTL has no effect when using non-incremental 
> RocksDBStateBackend
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-23721
>                 URL: https://issues.apache.org/jira/browse/FLINK-23721
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends, Table SQL / Runtime
>    Affects Versions: 1.13.0
>            Reporter: Q Kang
>            Priority: Major
>
> Take the following deduplication SQL program as an example:
> {code:java}
> SET table.exec.state.ttl=30s;
> INSERT INTO tmp.blackhole_order_done_log
> SELECT t.* FROM (
>   SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC) 
> AS rn
>   FROM rtdw_ods.kafka_order_done_log
> ) AS t WHERE rn = 1;
> {code}
> When using RocksDBStateBackend with incremental checkpoint enabled, the size 
> of deduplication state seems OK.
> FlinkCompactionFilter is also working, regarding to logs below:
> {code:java}
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                     
>        [] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 0000017B3481026D01, Value 
> type: 0, State type: 1, TTL: 30000 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                     
>        [] - RocksDB filter native code log: Last access timestamp: 
> 1628673475181 ms, ttlWithoutOverflow: 30000 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                     
>        [] - RocksDB filter native code log: Decision: 1
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                     
>        [] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 0000017B3484064901, Value 
> type: 0, State type: 1, TTL: 30000 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                     
>        [] - RocksDB filter native code log: Last access timestamp: 
> 1628673672777 ms, ttlWithoutOverflow: 30000 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                     
>        [] - RocksDB filter native code log: Decision: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                     
>        [] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 0000017B3483341D01, Value 
> type: 0, State type: 1, TTL: 30000 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                     
>        [] - RocksDB filter native code log: Last access timestamp: 
> 1628673618973 ms, ttlWithoutOverflow: 30000 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                     
>        [] - RocksDB filter native code log: Decision: 1
> {code}
> However, after turning off incremental checkpoint, the state TTL seems not 
> effective at all: FlinkCompactionFilter logs are not printed, and the size of 
> deduplication state grows steadily up to several GBs (Kafka traffic is 
> somewhat heavy, at about 1K records per sec).
> In contrast, FsStateBackend always works well.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to