[jira] [Commented] (FLINK-23721) Flink SQL state TTL has no effect when using non-incremental RocksDBStateBackend

2022-06-05 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550260#comment-17550260
 ] 

Yun Tang commented on FLINK-23721:
--

[~xiaokunBit], the managed memory is fully used as you use RocksDB state 
backend, which is not related with whether you have enabled TTL.

> Flink SQL state TTL has no effect when using non-incremental 
> RocksDBStateBackend
> 
>
> Key: FLINK-23721
> URL: https://issues.apache.org/jira/browse/FLINK-23721
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Table SQL / Runtime
>Affects Versions: 1.13.0
>Reporter: Q Kang
>Priority: Major
> Attachments: image-2022-06-02-17-59-38-542.png
>
>
> Take the following deduplication SQL program as an example:
> {code:java}
> SET table.exec.state.ttl=30s;
> INSERT INTO tmp.blackhole_order_done_log
> SELECT t.* FROM (
>   SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC) 
> AS rn
>   FROM rtdw_ods.kafka_order_done_log
> ) AS t WHERE rn = 1;
> {code}
> When using RocksDBStateBackend with incremental checkpoint enabled, the size 
> of deduplication state seems OK.
> FlinkCompactionFilter is also working, regarding to logs below:
> {code:java}
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3481026D01, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673475181 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 1
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3484064901, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673672777 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3483341D01, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673618973 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 1
> {code}
> However, after turning off incremental checkpoint, the state TTL seems not 
> effective at all: FlinkCompactionFilter logs are not printed, and the size of 
> deduplication state grows steadily up to several GBs (Kafka traffic is 
> somewhat heavy, at about 1K records per sec).
> In contrast, FsStateBackend always works well.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-23721) Flink SQL state TTL has no effect when using non-incremental RocksDBStateBackend

2022-06-02 Thread liangxiaokun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17545393#comment-17545393
 ] 

liangxiaokun commented on FLINK-23721:
--

Hello! I have the same question with [~lmagics]
In my main class,I used two StateTtlConfig,one is  in datastream api which used 
mapstate.This is the code 

 
{code:java}
MapStateDescriptor firstItemState = new 
MapStateDescriptor("firstState", String.class,String.class);
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(10))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
firstItemState.enableTimeToLive(ttlConfig);
mapState = getRuntimeContext().getMapState(firstItemState); {code}
 

another one is  in sql  api ,

 
{code:java}
StreamTableEnvironment streamTableEnvironment = 
StreamTableEnvironment.create(env, EnvironmentSettings.fromConfiguration(conf));

//ttl
   TableConfig config = streamTableEnvironment.getConfig();
  config.setIdleStateRetention(Duration.ofSeconds(10));{code}
 

 

I used RocksDBStateBackend 

 
{code:java}
env.enableCheckpointing(4 * 6, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setCheckpointTimeout(20 * 6);
env.disableOperatorChaining();
env.setStateBackend(new 
FsStateBackend("hdfs:///user/flink_state/order_detail_realtime"));
{code}
But I found it seems didnt work,because in flink web ui,managed memory is full 
used

 

!image-2022-06-02-17-59-38-542.png!

 

> Flink SQL state TTL has no effect when using non-incremental 
> RocksDBStateBackend
> 
>
> Key: FLINK-23721
> URL: https://issues.apache.org/jira/browse/FLINK-23721
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Table SQL / Runtime
>Affects Versions: 1.13.0
>Reporter: Q Kang
>Priority: Major
> Attachments: image-2022-06-02-17-59-38-542.png
>
>
> Take the following deduplication SQL program as an example:
> {code:java}
> SET table.exec.state.ttl=30s;
> INSERT INTO tmp.blackhole_order_done_log
> SELECT t.* FROM (
>   SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC) 
> AS rn
>   FROM rtdw_ods.kafka_order_done_log
> ) AS t WHERE rn = 1;
> {code}
> When using RocksDBStateBackend with incremental checkpoint enabled, the size 
> of deduplication state seems OK.
> FlinkCompactionFilter is also working, regarding to logs below:
> {code:java}
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3481026D01, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673475181 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 1
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3484064901, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673672777 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3483341D01, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673618973 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 1
> {code}
> However, after turning off incremental checkpoint, the state TTL seems not 
> effective at all: FlinkCompactionFilter logs are not printed, and the size of 
> deduplication state grows steadily up to several GBs (Kafka traffic 

[jira] [Commented] (FLINK-23721) Flink SQL state TTL has no effect when using non-incremental RocksDBStateBackend

2021-08-29 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17406527#comment-17406527
 ] 

Yun Tang commented on FLINK-23721:
--

[~lmagics] could you share the code example to reproduce this on local 
environment so that we could run it directly.

> Flink SQL state TTL has no effect when using non-incremental 
> RocksDBStateBackend
> 
>
> Key: FLINK-23721
> URL: https://issues.apache.org/jira/browse/FLINK-23721
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Table SQL / Runtime
>Affects Versions: 1.13.0
>Reporter: Q Kang
>Priority: Major
>
> Take the following deduplication SQL program as an example:
> {code:java}
> SET table.exec.state.ttl=30s;
> INSERT INTO tmp.blackhole_order_done_log
> SELECT t.* FROM (
>   SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC) 
> AS rn
>   FROM rtdw_ods.kafka_order_done_log
> ) AS t WHERE rn = 1;
> {code}
> When using RocksDBStateBackend with incremental checkpoint enabled, the size 
> of deduplication state seems OK.
> FlinkCompactionFilter is also working, regarding to logs below:
> {code:java}
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3481026D01, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673475181 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 1
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3484064901, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673672777 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3483341D01, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673618973 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 1
> {code}
> However, after turning off incremental checkpoint, the state TTL seems not 
> effective at all: FlinkCompactionFilter logs are not printed, and the size of 
> deduplication state grows steadily up to several GBs (Kafka traffic is 
> somewhat heavy, at about 1K records per sec).
> In contrast, FsStateBackend always works well.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23721) Flink SQL state TTL has no effect when using non-incremental RocksDBStateBackend

2021-08-27 Thread Q Kang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17405782#comment-17405782
 ] 

Q Kang commented on FLINK-23721:


[~yunta] [~jark] 

According to my observation, there are 2 `StateTtlConfig` objects on heap. One 
of them has TTL of 3ms, and `OnCreateAndWrite` update type. The other has 
TTL of `Long.MAX_VALUE`ms, and `Disabled` update type. Whatever the state 
backend is, the jmap result are the same. 

It appears that `table.exec.state.ttl` has been set successfully. But in the 
case above, the state size with non-incremental RocksDB and FileSystem backends 
behave completely different. This phenomenon can be reproduced both in local 
environment and on production cluster (on YARN). We also checked the underlying 
source code, but no luck.

 

> Flink SQL state TTL has no effect when using non-incremental 
> RocksDBStateBackend
> 
>
> Key: FLINK-23721
> URL: https://issues.apache.org/jira/browse/FLINK-23721
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Table SQL / Runtime
>Affects Versions: 1.13.0
>Reporter: Q Kang
>Priority: Major
>
> Take the following deduplication SQL program as an example:
> {code:java}
> SET table.exec.state.ttl=30s;
> INSERT INTO tmp.blackhole_order_done_log
> SELECT t.* FROM (
>   SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC) 
> AS rn
>   FROM rtdw_ods.kafka_order_done_log
> ) AS t WHERE rn = 1;
> {code}
> When using RocksDBStateBackend with incremental checkpoint enabled, the size 
> of deduplication state seems OK.
> FlinkCompactionFilter is also working, regarding to logs below:
> {code:java}
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3481026D01, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673475181 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 1
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3484064901, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673672777 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3483341D01, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673618973 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 1
> {code}
> However, after turning off incremental checkpoint, the state TTL seems not 
> effective at all: FlinkCompactionFilter logs are not printed, and the size of 
> deduplication state grows steadily up to several GBs (Kafka traffic is 
> somewhat heavy, at about 1K records per sec).
> In contrast, FsStateBackend always works well.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23721) Flink SQL state TTL has no effect when using non-incremental RocksDBStateBackend

2021-08-27 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17405676#comment-17405676
 ] 

Jark Wu commented on FLINK-23721:
-

As far as I can tell, the {{table.exec.state.ttl}} should have taken effect. 
The {{DeduplicateFunction}} just simply apply {{StateTtlConfig}} with a 
configured TTL time to the underlying {{ValueState}}. 
It did expires state when incremental checkpoint is enabled, so from I can 
tell, it should works. 

> Flink SQL state TTL has no effect when using non-incremental 
> RocksDBStateBackend
> 
>
> Key: FLINK-23721
> URL: https://issues.apache.org/jira/browse/FLINK-23721
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Table SQL / Runtime
>Affects Versions: 1.13.0
>Reporter: Q Kang
>Priority: Major
>
> Take the following deduplication SQL program as an example:
> {code:java}
> SET table.exec.state.ttl=30s;
> INSERT INTO tmp.blackhole_order_done_log
> SELECT t.* FROM (
>   SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC) 
> AS rn
>   FROM rtdw_ods.kafka_order_done_log
> ) AS t WHERE rn = 1;
> {code}
> When using RocksDBStateBackend with incremental checkpoint enabled, the size 
> of deduplication state seems OK.
> FlinkCompactionFilter is also working, regarding to logs below:
> {code:java}
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3481026D01, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673475181 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 1
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3484064901, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673672777 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3483341D01, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673618973 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 1
> {code}
> However, after turning off incremental checkpoint, the state TTL seems not 
> effective at all: FlinkCompactionFilter logs are not printed, and the size of 
> deduplication state grows steadily up to several GBs (Kafka traffic is 
> somewhat heavy, at about 1K records per sec).
> In contrast, FsStateBackend always works well.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23721) Flink SQL state TTL has no effect when using non-incremental RocksDBStateBackend

2021-08-27 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17405659#comment-17405659
 ] 

Yun Tang commented on FLINK-23721:
--

[~lmagics] From the code implementation, what you described is really weird. I 
think you could use jmap to dump the task manager to see what the configuration 
of `org.apache.flink.api.common.state.StateTtlConfig` is taking effect (maybe 
you have many states, please check StateTtlConfig as many as possible).
BTW, [~jark] does SQL have additional condition to make the  
table.exec.state.ttl take effect?

> Flink SQL state TTL has no effect when using non-incremental 
> RocksDBStateBackend
> 
>
> Key: FLINK-23721
> URL: https://issues.apache.org/jira/browse/FLINK-23721
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Table SQL / Runtime
>Affects Versions: 1.13.0
>Reporter: Q Kang
>Priority: Major
>
> Take the following deduplication SQL program as an example:
> {code:java}
> SET table.exec.state.ttl=30s;
> INSERT INTO tmp.blackhole_order_done_log
> SELECT t.* FROM (
>   SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC) 
> AS rn
>   FROM rtdw_ods.kafka_order_done_log
> ) AS t WHERE rn = 1;
> {code}
> When using RocksDBStateBackend with incremental checkpoint enabled, the size 
> of deduplication state seems OK.
> FlinkCompactionFilter is also working, regarding to logs below:
> {code:java}
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3481026D01, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673475181 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 1
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3484064901, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673672777 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3483341D01, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673618973 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 1
> {code}
> However, after turning off incremental checkpoint, the state TTL seems not 
> effective at all: FlinkCompactionFilter logs are not printed, and the size of 
> deduplication state grows steadily up to several GBs (Kafka traffic is 
> somewhat heavy, at about 1K records per sec).
> In contrast, FsStateBackend always works well.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23721) Flink SQL state TTL has no effect when using non-incremental RocksDBStateBackend

2021-08-27 Thread Q Kang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17405637#comment-17405637
 ] 

Q Kang commented on FLINK-23721:


[~yunta] Sorry for the late reply. For RocksDB backend, we configured the 
following parameters:
{code:java}
state.backend: rocksdb
state.backend.incremental: false
state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED
{code}
For FileSystem backend, we just set `state.backend` to `filesystem` and nothing 
more.

 

> Flink SQL state TTL has no effect when using non-incremental 
> RocksDBStateBackend
> 
>
> Key: FLINK-23721
> URL: https://issues.apache.org/jira/browse/FLINK-23721
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Table SQL / Runtime
>Affects Versions: 1.13.0
>Reporter: Q Kang
>Priority: Major
>
> Take the following deduplication SQL program as an example:
> {code:java}
> SET table.exec.state.ttl=30s;
> INSERT INTO tmp.blackhole_order_done_log
> SELECT t.* FROM (
>   SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC) 
> AS rn
>   FROM rtdw_ods.kafka_order_done_log
> ) AS t WHERE rn = 1;
> {code}
> When using RocksDBStateBackend with incremental checkpoint enabled, the size 
> of deduplication state seems OK.
> FlinkCompactionFilter is also working, regarding to logs below:
> {code:java}
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3481026D01, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673475181 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 1
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3484064901, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673672777 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3483341D01, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673618973 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 1
> {code}
> However, after turning off incremental checkpoint, the state TTL seems not 
> effective at all: FlinkCompactionFilter logs are not printed, and the size of 
> deduplication state grows steadily up to several GBs (Kafka traffic is 
> somewhat heavy, at about 1K records per sec).
> In contrast, FsStateBackend always works well.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23721) Flink SQL state TTL has no effect when using non-incremental RocksDBStateBackend

2021-08-20 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17402322#comment-17402322
 ] 

Yun Tang commented on FLINK-23721:
--

[~lmagics] Whether enable incremental checkpoints has no realationship with 
RocksDB state TTL, could you share the configurations of your state backends?

> Flink SQL state TTL has no effect when using non-incremental 
> RocksDBStateBackend
> 
>
> Key: FLINK-23721
> URL: https://issues.apache.org/jira/browse/FLINK-23721
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends, Table SQL / Runtime
>Affects Versions: 1.13.0
>Reporter: Q Kang
>Priority: Major
>
> Take the following deduplication SQL program as an example:
> {code:java}
> SET table.exec.state.ttl=30s;
> INSERT INTO tmp.blackhole_order_done_log
> SELECT t.* FROM (
>   SELECT *,ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime ASC) 
> AS rn
>   FROM rtdw_ods.kafka_order_done_log
> ) AS t WHERE rn = 1;
> {code}
> When using RocksDBStateBackend with incremental checkpoint enabled, the size 
> of deduplication state seems OK.
> FlinkCompactionFilter is also working, regarding to logs below:
> {code:java}
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3481026D01, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673475181 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 1
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3484064901, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673672777 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Call 
> FlinkCompactionFilter::FilterV2 - Key: , Data: 017B3483341D01, Value 
> type: 0, State type: 1, TTL: 3 ms, timestamp_offset: 0
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Last access timestamp: 
> 1628673618973 ms, ttlWithoutOverflow: 3 ms, Current timestamp: 
> 1628673701905 ms
> 21-08-11 17:21:42 DEBUG org.rocksdb.FlinkCompactionFilter 
>[] - RocksDB filter native code log: Decision: 1
> {code}
> However, after turning off incremental checkpoint, the state TTL seems not 
> effective at all: FlinkCompactionFilter logs are not printed, and the size of 
> deduplication state grows steadily up to several GBs (Kafka traffic is 
> somewhat heavy, at about 1K records per sec).
> In contrast, FsStateBackend always works well.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)