[ https://issues.apache.org/jira/browse/FLINK-23721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17545393#comment-17545393 ]
liangxiaokun commented on FLINK-23721: -------------------------------------- Hello! I have the same question with [~lmagics] In my main class,I used two StateTtlConfig,one is in datastream api which used mapstate.This is the code {code:java} MapStateDescriptor<String,String> firstItemState = new MapStateDescriptor<String,String>("firstState", String.class,String.class); StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.minutes(10)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); firstItemState.enableTimeToLive(ttlConfig); mapState = getRuntimeContext().getMapState(firstItemState); {code} another one is in sql api , {code:java} StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env, EnvironmentSettings.fromConfiguration(conf)); //ttl TableConfig config = streamTableEnvironment.getConfig(); config.setIdleStateRetention(Duration.ofSeconds(10));{code} I used RocksDBStateBackend {code:java} env.enableCheckpointing(4 * 60000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().setCheckpointTimeout(20 * 60000); env.disableOperatorChaining(); env.setStateBackend(new FsStateBackend("hdfs:///user/flink_state/order_detail_realtime")); {code} But I found it seems didnt work,because in flink web ui,managed memory is full used !image-2022-06-02-17-59-38-542.png! > Flink SQL state TTL has no effect when using non-incremental > RocksDBStateBackend > -------------------------------------------------------------------------------- > > Key: FLINK-23721 > URL: https://issues.apache.org/jira/browse/FLINK-23721 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends, Table SQL / Runtime > Affects Versions: 1.13.0 > Reporter: Q Kang > Priority: Major > Attachments: image-2022-06-02-17-59-38-542.png > > > Take the following deduplication SQL program as an example: > {code:java} > SET table.exec.state.ttl=30s; > INSERT INTO tmp.blackhole_order_done_log > SELECT t.* FROM ( > SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC) > AS rn > FROM rtdw_ods.kafka_order_done_log > ) AS t WHERE rn = 1; > {code} > When using RocksDBStateBackend with incremental checkpoint enabled, the size > of deduplication state seems OK. > FlinkCompactionFilter is also working, regarding to logs below: > {code:java} > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter > [] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 0000017B3481026D01, Value > type: 0, State type: 1, TTL: 30000 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter > [] - RocksDB filter native code log: Last access timestamp: > 1628673475181 ms, ttlWithoutOverflow: 30000 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter > [] - RocksDB filter native code log: Decision: 1 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter > [] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 0000017B3484064901, Value > type: 0, State type: 1, TTL: 30000 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter > [] - RocksDB filter native code log: Last access timestamp: > 1628673672777 ms, ttlWithoutOverflow: 30000 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter > [] - RocksDB filter native code log: Decision: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter > [] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 0000017B3483341D01, Value > type: 0, State type: 1, TTL: 30000 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter > [] - RocksDB filter native code log: Last access timestamp: > 1628673618973 ms, ttlWithoutOverflow: 30000 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter > [] - RocksDB filter native code log: Decision: 1 > {code} > However, after turning off incremental checkpoint, the state TTL seems not > effective at all: FlinkCompactionFilter logs are not printed, and the size of > deduplication state grows steadily up to several GBs (Kafka traffic is > somewhat heavy, at about 1K records per sec). > In contrast, FsStateBackend always works well. > -- This message was sent by Atlassian Jira (v8.20.7#820007)