[jira] [Commented] (FLINK-23721) Flink SQL state TTL has no effect when using non-incremental RocksDBStateBackend
[ https://issues.apache.org/jira/browse/FLINK-23721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550260#comment-17550260 ] Yun Tang commented on FLINK-23721: -- [~xiaokunBit], the managed memory is fully used as you use RocksDB state backend, which is not related with whether you have enabled TTL. > Flink SQL state TTL has no effect when using non-incremental > RocksDBStateBackend > > > Key: FLINK-23721 > URL: https://issues.apache.org/jira/browse/FLINK-23721 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends, Table SQL / Runtime >Affects Versions: 1.13.0 >Reporter: Q Kang >Priority: Major > Attachments: image-2022-06-02-17-59-38-542.png > > > Take the following deduplication SQL program as an example: > {code:java} > SET table.exec.state.ttl=30s; > INSERT INTO tmp.blackhole_order_done_log > SELECT t.* FROM ( > SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC) > AS rn > FROM rtdw_ods.kafka_order_done_log > ) AS t WHERE rn = 1; > {code} > When using RocksDBStateBackend with incremental checkpoint enabled, the size > of deduplication state seems OK. > FlinkCompactionFilter is also working, regarding to logs below: > {code:java} > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3481026D01, Value > type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Last access timestamp: > 1628673475181 ms, ttlWithoutOverflow: 3 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Decision: 1 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3484064901, Value > type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Last access timestamp: > 1628673672777 ms, ttlWithoutOverflow: 3 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Decision: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3483341D01, Value > type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Last access timestamp: > 1628673618973 ms, ttlWithoutOverflow: 3 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Decision: 1 > {code} > However, after turning off incremental checkpoint, the state TTL seems not > effective at all: FlinkCompactionFilter logs are not printed, and the size of > deduplication state grows steadily up to several GBs (Kafka traffic is > somewhat heavy, at about 1K records per sec). > In contrast, FsStateBackend always works well. > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-23721) Flink SQL state TTL has no effect when using non-incremental RocksDBStateBackend
[ https://issues.apache.org/jira/browse/FLINK-23721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17545393#comment-17545393 ] liangxiaokun commented on FLINK-23721: -- Hello! I have the same question with [~lmagics] In my main class,I used two StateTtlConfig,one is in datastream api which used mapstate.This is the code {code:java} MapStateDescriptor firstItemState = new MapStateDescriptor("firstState", String.class,String.class); StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.minutes(10)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); firstItemState.enableTimeToLive(ttlConfig); mapState = getRuntimeContext().getMapState(firstItemState); {code} another one is in sql api , {code:java} StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env, EnvironmentSettings.fromConfiguration(conf)); //ttl TableConfig config = streamTableEnvironment.getConfig(); config.setIdleStateRetention(Duration.ofSeconds(10));{code} I used RocksDBStateBackend {code:java} env.enableCheckpointing(4 * 6, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().setCheckpointTimeout(20 * 6); env.disableOperatorChaining(); env.setStateBackend(new FsStateBackend("hdfs:///user/flink_state/order_detail_realtime")); {code} But I found it seems didnt work,because in flink web ui,managed memory is full used !image-2022-06-02-17-59-38-542.png! > Flink SQL state TTL has no effect when using non-incremental > RocksDBStateBackend > > > Key: FLINK-23721 > URL: https://issues.apache.org/jira/browse/FLINK-23721 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends, Table SQL / Runtime >Affects Versions: 1.13.0 >Reporter: Q Kang >Priority: Major > Attachments: image-2022-06-02-17-59-38-542.png > > > Take the following deduplication SQL program as an example: > {code:java} > SET table.exec.state.ttl=30s; > INSERT INTO tmp.blackhole_order_done_log > SELECT t.* FROM ( > SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC) > AS rn > FROM rtdw_ods.kafka_order_done_log > ) AS t WHERE rn = 1; > {code} > When using RocksDBStateBackend with incremental checkpoint enabled, the size > of deduplication state seems OK. > FlinkCompactionFilter is also working, regarding to logs below: > {code:java} > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3481026D01, Value > type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Last access timestamp: > 1628673475181 ms, ttlWithoutOverflow: 3 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Decision: 1 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3484064901, Value > type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Last access timestamp: > 1628673672777 ms, ttlWithoutOverflow: 3 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Decision: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3483341D01, Value > type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Last access timestamp: > 1628673618973 ms, ttlWithoutOverflow: 3 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Decision: 1 > {code} > However, after turning off incremental checkpoint, the state TTL seems not > effective at all: FlinkCompactionFilter logs are not printed, and the size of > deduplication state grows steadily up to several GBs (Kafka traffic
[jira] [Commented] (FLINK-23721) Flink SQL state TTL has no effect when using non-incremental RocksDBStateBackend
[ https://issues.apache.org/jira/browse/FLINK-23721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17406527#comment-17406527 ] Yun Tang commented on FLINK-23721: -- [~lmagics] could you share the code example to reproduce this on local environment so that we could run it directly. > Flink SQL state TTL has no effect when using non-incremental > RocksDBStateBackend > > > Key: FLINK-23721 > URL: https://issues.apache.org/jira/browse/FLINK-23721 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends, Table SQL / Runtime >Affects Versions: 1.13.0 >Reporter: Q Kang >Priority: Major > > Take the following deduplication SQL program as an example: > {code:java} > SET table.exec.state.ttl=30s; > INSERT INTO tmp.blackhole_order_done_log > SELECT t.* FROM ( > SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC) > AS rn > FROM rtdw_ods.kafka_order_done_log > ) AS t WHERE rn = 1; > {code} > When using RocksDBStateBackend with incremental checkpoint enabled, the size > of deduplication state seems OK. > FlinkCompactionFilter is also working, regarding to logs below: > {code:java} > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3481026D01, Value > type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Last access timestamp: > 1628673475181 ms, ttlWithoutOverflow: 3 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Decision: 1 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3484064901, Value > type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Last access timestamp: > 1628673672777 ms, ttlWithoutOverflow: 3 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Decision: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3483341D01, Value > type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Last access timestamp: > 1628673618973 ms, ttlWithoutOverflow: 3 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Decision: 1 > {code} > However, after turning off incremental checkpoint, the state TTL seems not > effective at all: FlinkCompactionFilter logs are not printed, and the size of > deduplication state grows steadily up to several GBs (Kafka traffic is > somewhat heavy, at about 1K records per sec). > In contrast, FsStateBackend always works well. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23721) Flink SQL state TTL has no effect when using non-incremental RocksDBStateBackend
[ https://issues.apache.org/jira/browse/FLINK-23721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17405782#comment-17405782 ] Q Kang commented on FLINK-23721: [~yunta] [~jark] According to my observation, there are 2 `StateTtlConfig` objects on heap. One of them has TTL of 3ms, and `OnCreateAndWrite` update type. The other has TTL of `Long.MAX_VALUE`ms, and `Disabled` update type. Whatever the state backend is, the jmap result are the same. It appears that `table.exec.state.ttl` has been set successfully. But in the case above, the state size with non-incremental RocksDB and FileSystem backends behave completely different. This phenomenon can be reproduced both in local environment and on production cluster (on YARN). We also checked the underlying source code, but no luck. > Flink SQL state TTL has no effect when using non-incremental > RocksDBStateBackend > > > Key: FLINK-23721 > URL: https://issues.apache.org/jira/browse/FLINK-23721 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends, Table SQL / Runtime >Affects Versions: 1.13.0 >Reporter: Q Kang >Priority: Major > > Take the following deduplication SQL program as an example: > {code:java} > SET table.exec.state.ttl=30s; > INSERT INTO tmp.blackhole_order_done_log > SELECT t.* FROM ( > SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC) > AS rn > FROM rtdw_ods.kafka_order_done_log > ) AS t WHERE rn = 1; > {code} > When using RocksDBStateBackend with incremental checkpoint enabled, the size > of deduplication state seems OK. > FlinkCompactionFilter is also working, regarding to logs below: > {code:java} > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3481026D01, Value > type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Last access timestamp: > 1628673475181 ms, ttlWithoutOverflow: 3 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Decision: 1 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3484064901, Value > type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Last access timestamp: > 1628673672777 ms, ttlWithoutOverflow: 3 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Decision: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3483341D01, Value > type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Last access timestamp: > 1628673618973 ms, ttlWithoutOverflow: 3 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Decision: 1 > {code} > However, after turning off incremental checkpoint, the state TTL seems not > effective at all: FlinkCompactionFilter logs are not printed, and the size of > deduplication state grows steadily up to several GBs (Kafka traffic is > somewhat heavy, at about 1K records per sec). > In contrast, FsStateBackend always works well. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23721) Flink SQL state TTL has no effect when using non-incremental RocksDBStateBackend
[ https://issues.apache.org/jira/browse/FLINK-23721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17405676#comment-17405676 ] Jark Wu commented on FLINK-23721: - As far as I can tell, the {{table.exec.state.ttl}} should have taken effect. The {{DeduplicateFunction}} just simply apply {{StateTtlConfig}} with a configured TTL time to the underlying {{ValueState}}. It did expires state when incremental checkpoint is enabled, so from I can tell, it should works. > Flink SQL state TTL has no effect when using non-incremental > RocksDBStateBackend > > > Key: FLINK-23721 > URL: https://issues.apache.org/jira/browse/FLINK-23721 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends, Table SQL / Runtime >Affects Versions: 1.13.0 >Reporter: Q Kang >Priority: Major > > Take the following deduplication SQL program as an example: > {code:java} > SET table.exec.state.ttl=30s; > INSERT INTO tmp.blackhole_order_done_log > SELECT t.* FROM ( > SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC) > AS rn > FROM rtdw_ods.kafka_order_done_log > ) AS t WHERE rn = 1; > {code} > When using RocksDBStateBackend with incremental checkpoint enabled, the size > of deduplication state seems OK. > FlinkCompactionFilter is also working, regarding to logs below: > {code:java} > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3481026D01, Value > type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Last access timestamp: > 1628673475181 ms, ttlWithoutOverflow: 3 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Decision: 1 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3484064901, Value > type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Last access timestamp: > 1628673672777 ms, ttlWithoutOverflow: 3 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Decision: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3483341D01, Value > type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Last access timestamp: > 1628673618973 ms, ttlWithoutOverflow: 3 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Decision: 1 > {code} > However, after turning off incremental checkpoint, the state TTL seems not > effective at all: FlinkCompactionFilter logs are not printed, and the size of > deduplication state grows steadily up to several GBs (Kafka traffic is > somewhat heavy, at about 1K records per sec). > In contrast, FsStateBackend always works well. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23721) Flink SQL state TTL has no effect when using non-incremental RocksDBStateBackend
[ https://issues.apache.org/jira/browse/FLINK-23721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17405659#comment-17405659 ] Yun Tang commented on FLINK-23721: -- [~lmagics] From the code implementation, what you described is really weird. I think you could use jmap to dump the task manager to see what the configuration of `org.apache.flink.api.common.state.StateTtlConfig` is taking effect (maybe you have many states, please check StateTtlConfig as many as possible). BTW, [~jark] does SQL have additional condition to make the table.exec.state.ttl take effect? > Flink SQL state TTL has no effect when using non-incremental > RocksDBStateBackend > > > Key: FLINK-23721 > URL: https://issues.apache.org/jira/browse/FLINK-23721 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends, Table SQL / Runtime >Affects Versions: 1.13.0 >Reporter: Q Kang >Priority: Major > > Take the following deduplication SQL program as an example: > {code:java} > SET table.exec.state.ttl=30s; > INSERT INTO tmp.blackhole_order_done_log > SELECT t.* FROM ( > SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC) > AS rn > FROM rtdw_ods.kafka_order_done_log > ) AS t WHERE rn = 1; > {code} > When using RocksDBStateBackend with incremental checkpoint enabled, the size > of deduplication state seems OK. > FlinkCompactionFilter is also working, regarding to logs below: > {code:java} > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3481026D01, Value > type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Last access timestamp: > 1628673475181 ms, ttlWithoutOverflow: 3 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Decision: 1 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3484064901, Value > type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Last access timestamp: > 1628673672777 ms, ttlWithoutOverflow: 3 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Decision: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3483341D01, Value > type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Last access timestamp: > 1628673618973 ms, ttlWithoutOverflow: 3 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Decision: 1 > {code} > However, after turning off incremental checkpoint, the state TTL seems not > effective at all: FlinkCompactionFilter logs are not printed, and the size of > deduplication state grows steadily up to several GBs (Kafka traffic is > somewhat heavy, at about 1K records per sec). > In contrast, FsStateBackend always works well. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23721) Flink SQL state TTL has no effect when using non-incremental RocksDBStateBackend
[ https://issues.apache.org/jira/browse/FLINK-23721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17405637#comment-17405637 ] Q Kang commented on FLINK-23721: [~yunta] Sorry for the late reply. For RocksDB backend, we configured the following parameters: {code:java} state.backend: rocksdb state.backend.incremental: false state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED {code} For FileSystem backend, we just set `state.backend` to `filesystem` and nothing more. > Flink SQL state TTL has no effect when using non-incremental > RocksDBStateBackend > > > Key: FLINK-23721 > URL: https://issues.apache.org/jira/browse/FLINK-23721 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends, Table SQL / Runtime >Affects Versions: 1.13.0 >Reporter: Q Kang >Priority: Major > > Take the following deduplication SQL program as an example: > {code:java} > SET table.exec.state.ttl=30s; > INSERT INTO tmp.blackhole_order_done_log > SELECT t.* FROM ( > SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC) > AS rn > FROM rtdw_ods.kafka_order_done_log > ) AS t WHERE rn = 1; > {code} > When using RocksDBStateBackend with incremental checkpoint enabled, the size > of deduplication state seems OK. > FlinkCompactionFilter is also working, regarding to logs below: > {code:java} > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3481026D01, Value > type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Last access timestamp: > 1628673475181 ms, ttlWithoutOverflow: 3 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Decision: 1 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3484064901, Value > type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Last access timestamp: > 1628673672777 ms, ttlWithoutOverflow: 3 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Decision: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3483341D01, Value > type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Last access timestamp: > 1628673618973 ms, ttlWithoutOverflow: 3 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Decision: 1 > {code} > However, after turning off incremental checkpoint, the state TTL seems not > effective at all: FlinkCompactionFilter logs are not printed, and the size of > deduplication state grows steadily up to several GBs (Kafka traffic is > somewhat heavy, at about 1K records per sec). > In contrast, FsStateBackend always works well. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23721) Flink SQL state TTL has no effect when using non-incremental RocksDBStateBackend
[ https://issues.apache.org/jira/browse/FLINK-23721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17402322#comment-17402322 ] Yun Tang commented on FLINK-23721: -- [~lmagics] Whether enable incremental checkpoints has no realationship with RocksDB state TTL, could you share the configurations of your state backends? > Flink SQL state TTL has no effect when using non-incremental > RocksDBStateBackend > > > Key: FLINK-23721 > URL: https://issues.apache.org/jira/browse/FLINK-23721 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends, Table SQL / Runtime >Affects Versions: 1.13.0 >Reporter: Q Kang >Priority: Major > > Take the following deduplication SQL program as an example: > {code:java} > SET table.exec.state.ttl=30s; > INSERT INTO tmp.blackhole_order_done_log > SELECT t.* FROM ( > SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC) > AS rn > FROM rtdw_ods.kafka_order_done_log > ) AS t WHERE rn = 1; > {code} > When using RocksDBStateBackend with incremental checkpoint enabled, the size > of deduplication state seems OK. > FlinkCompactionFilter is also working, regarding to logs below: > {code:java} > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3481026D01, Value > type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Last access timestamp: > 1628673475181 ms, ttlWithoutOverflow: 3 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Decision: 1 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3484064901, Value > type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Last access timestamp: > 1628673672777 ms, ttlWithoutOverflow: 3 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Decision: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Call > FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3483341D01, Value > type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0 > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Last access timestamp: > 1628673618973 ms, ttlWithoutOverflow: 3 ms, Current timestamp: > 1628673701905 ms > 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter >[] - RocksDB filter native code log: Decision: 1 > {code} > However, after turning off incremental checkpoint, the state TTL seems not > effective at all: FlinkCompactionFilter logs are not printed, and the size of > deduplication state grows steadily up to several GBs (Kafka traffic is > somewhat heavy, at about 1K records per sec). > In contrast, FsStateBackend always works well. > -- This message was sent by Atlassian Jira (v8.3.4#803005)