Q Kang created FLINK-23721:
------------------------------

             Summary: Flink SQL state TTL has no effect when using 
non-incremental RocksDBStateBackend
                 Key: FLINK-23721
                 URL: https://issues.apache.org/jira/browse/FLINK-23721
             Project: Flink
          Issue Type: Bug
          Components: Runtime / State Backends, Table SQL / Runtime
    Affects Versions: 1.13.0
            Reporter: Q Kang


Take the following deduplication SQL program as an example:
{code:java}
SET table.exec.state.ttl=30s;

INSERT INTO tmp.blackhole_order_done_log
SELECT t.* FROM (
  SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC) AS 
rn
  FROM rtdw_ods.kafka_order_done_log
) AS t WHERE rn = 1;
{code}
When using RocksDBStateBackend with incremental checkpoint enabled, the size of 
deduplication state seems OK.

FlinkCompactionFilter is also working, regarding to logs below:
{code:java}
21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                       
     [] - RocksDB filter native code log: Call FlinkCompactionFilter::FilterV2 
- Key: , Data: 0000017B3481026D01, Value type: 0, State type: 1, TTL: 30000 
ms, timestamp_offset: 0
21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                       
     [] - RocksDB filter native code log: Last access timestamp: 1628673475181 
ms, ttlWithoutOverflow: 30000 ms, Current timestamp: 1628673701905 ms
21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                       
     [] - RocksDB filter native code log: Decision: 1
21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                       
     [] - RocksDB filter native code log: Call FlinkCompactionFilter::FilterV2 
- Key: , Data: 0000017B3484064901, Value type: 0, State type: 1, TTL: 30000 
ms, timestamp_offset: 0
21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                       
     [] - RocksDB filter native code log: Last access timestamp: 1628673672777 
ms, ttlWithoutOverflow: 30000 ms, Current timestamp: 1628673701905 ms
21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                       
     [] - RocksDB filter native code log: Decision: 0
21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                       
     [] - RocksDB filter native code log: Call FlinkCompactionFilter::FilterV2 
- Key: , Data: 0000017B3483341D01, Value type: 0, State type: 1, TTL: 30000 
ms, timestamp_offset: 0
21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                       
     [] - RocksDB filter native code log: Last access timestamp: 1628673618973 
ms, ttlWithoutOverflow: 30000 ms, Current timestamp: 1628673701905 ms
21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter                       
     [] - RocksDB filter native code log: Decision: 1
{code}
However, after turning off incremental checkpoint, the state TTL seems not 
effective at all: FlinkCompactionFilter logs are not printed, and the size of 
deduplication state grows steadily up to several GBs (Kafka traffic is somewhat 
heavy, at about 1K records per sec).

In contrast, FsStateBackend always works well.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to