[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541387#comment-16541387 ] ASF GitHub Bot commented on FLINK-9514: --- Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r201971238 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { --- End diff -- Got it, thanks for your explanation. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541352#comment-16541352 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on the issue: https://github.com/apache/flink/pull/6186 Hi @Aitozi, the main issue FLINK-9510 contains a design doc for this effort with roadmap > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541285#comment-16541285 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r201941972 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { --- End diff -- Hi @Aitozi, in case of current implementation of list state in Rocksdb you are right. But e.g. there was an effort to make lists scalable like maps in rocksdb, it could be lazy in this case. This TTL implementation does not make any assumptions of underlying state backend. The generic state user API returns `Iterabl
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541073#comment-16541073 ] ASF GitHub Bot commented on FLINK-9514: --- Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6186 Hi, after read the whole implementation, i found that the state is expired when it is accessed, When there is the dirty data store to state and never be queried, how does it can be expired. Or is there an undergoing work for this ? @azagrebin > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541061#comment-16541061 ] ASF GitHub Bot commented on FLINK-9514: --- Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r201895445 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { --- End diff -- Hi @azagrebin , little doubt that you say the > return Iterable and avoid querying backend if not needed But when deal with the ListState the `original.get()` has already query the original `Iterable` from RocksDB doesn't it ? Is this way just lazy query the iterable element in memory? > Create wrapper with
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527547#comment-16527547 ] ASF GitHub Bot commented on FLINK-9514: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6186 > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527543#comment-16527543 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6186 Merging this. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527369#comment-16527369 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6186 Had one more minor comment. Besides, this looks good 👍 Nice job! > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527364#comment-16527364 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r199109435 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.AbstractMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +/** + * This class wraps map state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry key of state with TTL + * @param Type of the user entry value of state with TTL + */ +class TtlMapState + extends AbstractTtlState, Map>, InternalMapState>> + implements InternalMapState { + TtlMapState( + InternalMapState> original, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(original, config, timeProvider, valueSerializer); + } + + @Override + public UV get(UK key) throws Exception { + return getWithTtlCheckAndUpdate(() -> original.get(key), v -> original.put(key, v), () -> original.remove(key)); + } + + @Override + public void put(UK key, UV value) throws Exception { + original.put(key, wrapWithTs(value)); + } + + @Override + public void putAll(Map map) throws Exception { + if (map == null) { + return; + } + Map> ttlMap = new HashMap<>(); --- End diff -- We can already initialize the new map with `map.size()`. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527302#comment-16527302 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r199082008 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { --- End diff -- My idea was to keep this method lazy when possible. Currently this implementation does not assume that underlying state returns `List` but rather works as if it is `Iterable` and lazily wraps it with `IteratorWithCleanup`. Only when it has to update timestamps on read, it materialises `Iterable` to `List`. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16526671#comment-16526671 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r198947650 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.functions.FoldFunction; + +/** + * This class wraps folding function with TTL logic. + * + * @param Type of the values folded into the state + * @param Type of the value in the state + * + * @deprecated use {@link TtlAggregateFunction} instead + */ +@Deprecated +class TtlFoldFunction + extends AbstractTtlDecorator> + implements FoldFunction> { + TtlFoldFunction(FoldFunction original, TtlConfig config, TtlTimeProvider timeProvider) { + super(original, config, timeProvider); + } + + @Override + public TtlValue fold(TtlValue accumulator, T value) throws Exception { + return wrapWithTs(original.fold(getUnexpried(accumulator), value)); --- End diff -- It should be covered with `updateExpired` in `testExactExpirationOnWrite` > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16526115#comment-16526115 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r198757244 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.functions.FoldFunction; + +/** + * This class wraps folding function with TTL logic. + * + * @param Type of the values folded into the state + * @param Type of the value in the state + * + * @deprecated use {@link TtlAggregateFunction} instead + */ +@Deprecated +class TtlFoldFunction + extends AbstractTtlDecorator> + implements FoldFunction> { + TtlFoldFunction(FoldFunction original, TtlConfig config, TtlTimeProvider timeProvider) { + super(original, config, timeProvider); + } + + @Override + public TtlValue fold(TtlValue accumulator, T value) throws Exception { + return wrapWithTs(original.fold(getUnexpried(accumulator), value)); --- End diff -- As I understand, the wrapped states should already provide the default values. My idea was to wrap the original default value [in TTL factory](https://github.com/apache/flink/pull/6196/commits/4dedb9a20244a2addd337617778b17fe8349#diff-13011fbe7c28b56b994783572b461aaeR174) with expiration timestamp `Long.MAX_VALUE`, basically never expiring. Good point about test cases for it, I will add them for appending states. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16526096#comment-16526096 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r198751723 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.AbstractMap; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +/** + * This class wraps map state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry key of state with TTL + * @param Type of the user entry value of state with TTL + */ +class TtlMapState + extends AbstractTtlState, Map>, InternalMapState>> + implements InternalMapState { + TtlMapState( + InternalMapState> original, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(original, config, timeProvider, valueSerializer); + } + + @Override + public UV get(UK key) throws Exception { + return getWithTtlCheckAndUpdate(() -> original.get(key), v -> original.put(key, v), () -> original.remove(key)); + } + + @Override + public void put(UK key, UV value) throws Exception { + original.put(key, wrapWithTs(value)); + } + + @Override + public void putAll(Map map) throws Exception { + if (map == null) { + return; + } + Map> ttlMap = map.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> wrapWithTs(e.getValue(; + original.putAll(ttlMap); + } + + @Override + public void remove(UK key) throws Exception { + original.remove(key); + } + + @Override + public boolean contains(UK key) throws Exception { + return get(key) != null; + } + + @Override + public Iterable> entries() throws Exception { + return entriesStream()::iterator; + } + + private Stream> entriesStream() throws Exception { + Iterable>> withTs = original.entries(); + withTs = withTs == null ? Collections.emptyList() : withTs; + return StreamSupport + .stream(withTs.spliterator(), false) --- End diff -- As I understand, it depends on use case. If it is parallelizable, lazy operations over big collection like filter and map over lists, stream will give boost over loops but for short collections or non-parallelizable spliterators the overhead kills the performance. Though, it might be hard to predict the type of used spliterator. I agree the real benchmarking should be done to make sure. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For:
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524864#comment-16524864 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r198439894 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { --- End diff -- Currently this class is used to wrap any object with TTL logic, not only state objects: e.g. aggregating functions. It can be injected as a "TTL time service" but then the common member `original` will have to be duplicated in function wrappers. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524863#comment-16524863 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r198439148 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateVisibility.java --- @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +/** + * This option configures whether to return expired user value or not. + */ +public enum TtlStateVisibility { + /** Return still available expired user value (not yet cleaned up). */ + Relaxed, + /** Hide expired user value and behave as if it does not exist any more. */ + Exact --- End diff -- I think I will also rename enum entries. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524855#comment-16524855 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r198436490 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUpdateType.java --- @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +/** + * This option value configures when to prolong state TTL. + */ +public enum TtlUpdateType { + /** TTL is disabled. State does not expire. */ + Disabled, --- End diff -- This is more for internally named configuration, basically for later use in TTL wrapper factory to decide whether to wrap with TTL or not. Alternative would be probably to save null'ed, Optional config or negative TTL in state descriptor when configured w/o TTL by default. I found it more explicit, although, it faces user and I planned that he will rather probably just not configure TTL at all than use this option. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524835#comment-16524835 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r198431042 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + final long ttl; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + this.ttl = config.getTtl().toMilliseconds(); + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); + } + +TtlValue wrapWithTs(V value) { --- End diff -- It actually wraps with expiration timestamp, TTL is not stored with value. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524834#comment-16524834 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r198430741 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.util.Preconditions; + +/** + * Configuration of state TTL logic. + * TODO: builder --- End diff -- Yes, config is only sketched. It is planned for later step when TTL will be activated in a State Descriptor API. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522079#comment-16522079 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6186 @StefanRRichter Thanks for you reply! I think that makes sense, there is still a workaround for the user to go. `+1` to implement the current approach firstly. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522049#comment-16522049 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6186 @sihuazhou I think your concern and suggestion can clearly make sense for some cases. However I don't think it should be a general default as well because I can also find cases where this is not what a user might want. For example, TTL could be used for compliance with some user data privacy law. In this case, the law does not care about technical details like restores but only about wall-clock time, e.g. from the last user interaction. As this is a new feature, there is also no regression and users can still go for a custom timer-based solution as alternative. I agree for some cases it makes sense to have this timeshift and also your outlined approach can make sense. However, I think this does not have to block this PR, because I would consider it a feature/improvement on top of this work. We could still target to have your suggested followup before the release. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522050#comment-16522050 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6186 @azagrebin I think the overall idea is well implemented. I just had a couple of comments inline. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522030#comment-16522030 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197719642 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.runtime.state.internal.InternalListState; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** Test suite for {@link TtlListState}. */ +public class TtlListStateTest extends TtlStateTestBase, List, Iterable> { + @Override + TtlListState createState() { + return new TtlListState<>(new MockInternalListState<>(), ttlConfig, timeProvider, null); + } + + @Override + void initTestValues() { + updater = v -> ttlState.addAll(v); + getter = () -> StreamSupport.stream(ttlState.get().spliterator(), false).collect(Collectors.toList()); + originalGetter = () -> ttlState.original.get(); + + emptyValue = Collections.emptyList(); + + updateValue1 = Arrays.asList(5, 7, 10); + updateValue2 = Arrays.asList(8, 9, 11); + + getValue1 = updateValue1; + getValue2 = updateValue2; + } + + private static class MockInternalListState + extends MockInternalKvState> + implements InternalListState { + + MockInternalListState() { + value = new ArrayList<>(); + } + + @Override + public void update(List elements) { + updateInternal(elements); + } + + @Override + public void addAll(List elements) { + value.addAll(elements); + } + + @Override + public void mergeNamespaces(N target, Collection sources) { --- End diff -- Wouldn't it make sense to check that merging namespaces also works correctly with the TTL state and to fix validate the contract what has to happen to the timestamps in this case? > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522027#comment-16522027 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197719310 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +abstract class TtlStateTestBase { --- End diff -- I think it would make sense to extend this general test a bit to consider multiple keys and namespaces. Ideally, a test should really test the full contract specification of the tested subject. What is mean is, you could currently pass this test even if TTL would accidentally clear all states on the timeout of one state, or maybe clear all the states in the same namespace. The mock states can easily be extended to truly scope values by key and namespace. Then the test can, for example, create two keys in the same namespace and two keys in a different namespace and check that their timeouts are isolated from each other and the interaction works as expected, > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522000#comment-16522000 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197714098 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { --- End diff -- Currently this could return `List` instead of `Iterable` and you might get around the instance of check. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16521801#comment-16521801 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6186 Maybe let me elaborate the TTL checking condition in detail, overall the checking condition contains two parts and looks like `(current_ts - update_ts) - time_shift_offset >= TTL`. The `time_shift_offset` is the shift offset that we should applied when checking the TTL. - For the records that the `update_ts` > `checkpoint_ts`, we could know they were created(or updated) after the last restoring so we don't need to apply any shift to it. So that shift offset is `0`. - For the records that the `update_ts` <= `checkpoint_ts`, we could know they were created(or updated) before the last restoring so we need to apply the shift to it, the shift offset is `recovery_ts - checkpoint_ts`. In our current code, we didn't do the time-align works, it equals to a special case of the above condition where the `time_shift_offset` is always `0`. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16521498#comment-16521498 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6186 I'm still a bit worried about the time-align problem on recovery(because I've met serval case that would become disaster on production if we don't do the time-align on recovery. (One instance: we used the DFS to store the checkpoint data, and the DFS went into safe-mode because of some problems, we took several hours to notice that and also took some times to address the issue. After addressing DFS's issue, user's jobs were resumed and begin to run correctly. In this case, if we don't do the time-align on recovery, then user's state maybe already totally expired(when TTL <= the `system down time`)). I had a second thought on this problem, and I think maybe we could do that without a full scanning of the records, the approach is outlined below. - 1. We need to remember the timestamp when performing the checkpoint, let's say it `checkpoint_ts`. - 2. We also need to remember the timestamp when recovering from the checkpoint, let's say it `recovery_ts`. - 3. For each record, we remember it's last update timestamp, let's say it `update_ts`. - 5. And the current time stamp is `current_ts`. - 4. Then we could use the follow condition `checkpoint_ts - update_ts + current_s - recovery_ts >= TTL` to check whether the record is expired. If it's true then record is expired, otherwise the record is still alive. What do you think? @azagrebin , and @StefanRRichter would be really nice to learn your opinion about this problem. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520577#comment-16520577 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197503773 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalAggregatingState; + +import java.util.Collection; + +/** + * This class wraps aggregating state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the value added to the state + * @param The type of the accumulator (intermediate aggregate state). + * @param Type of the value extracted from the state + * + */ +class TtlAggregatingState + extends AbstractTtlState, InternalAggregatingState, OUT>> + implements InternalAggregatingState { + + TtlAggregatingState( + InternalAggregatingState, OUT> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer valueSerializer, + TtlAggregateFunction aggregateFunction) { + super(originalState, config, timeProvider, valueSerializer); + aggregateFunction.stateClear = originalState::clear; --- End diff -- Maybe it is better to pass a `TtlAggregateFunctionBuilder` and then supply `stateClear` and `updater` before the object is created. I think then they can also become immutable. Similar changes in the other states. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520570#comment-16520570 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197502165 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.functions.FoldFunction; + +/** + * This class wraps folding function with TTL logic. + * + * @param Type of the values folded into the state + * @param Type of the value in the state + * + * @deprecated use {@link TtlAggregateFunction} instead + */ +@Deprecated +class TtlFoldFunction + extends AbstractTtlDecorator> + implements FoldFunction> { + TtlFoldFunction(FoldFunction original, TtlConfig config, TtlTimeProvider timeProvider) { + super(original, config, timeProvider); + } + + @Override + public TtlValue fold(TtlValue accumulator, T value) throws Exception { + return wrapWithTs(original.fold(getUnexpried(accumulator), value)); --- End diff -- I this is true, that is a valuable case to be tested for all the appending states that they work correctly. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520566#comment-16520566 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197501902 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.functions.FoldFunction; + +/** + * This class wraps folding function with TTL logic. + * + * @param Type of the values folded into the state + * @param Type of the value in the state + * + * @deprecated use {@link TtlAggregateFunction} instead + */ +@Deprecated +class TtlFoldFunction + extends AbstractTtlDecorator> + implements FoldFunction> { + TtlFoldFunction(FoldFunction original, TtlConfig config, TtlTimeProvider timeProvider) { + super(original, config, timeProvider); + } + + @Override + public TtlValue fold(TtlValue accumulator, T value) throws Exception { + return wrapWithTs(original.fold(getUnexpried(accumulator), value)); --- End diff -- I think that (similar to`TtlAggregationFunction`) you need to intercept `null` values of accumulator here and replace them by the default value. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520549#comment-16520549 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197498126 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.AbstractMap; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +/** + * This class wraps map state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry key of state with TTL + * @param Type of the user entry value of state with TTL + */ +class TtlMapState + extends AbstractTtlState, Map>, InternalMapState>> + implements InternalMapState { + TtlMapState( + InternalMapState> original, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(original, config, timeProvider, valueSerializer); + } + + @Override + public UV get(UK key) throws Exception { + return getWithTtlCheckAndUpdate(() -> original.get(key), v -> original.put(key, v), () -> original.remove(key)); + } + + @Override + public void put(UK key, UV value) throws Exception { + original.put(key, wrapWithTs(value)); + } + + @Override + public void putAll(Map map) throws Exception { + if (map == null) { + return; + } + Map> ttlMap = map.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> wrapWithTs(e.getValue(; + original.putAll(ttlMap); + } + + @Override + public void remove(UK key) throws Exception { + original.remove(key); + } + + @Override + public boolean contains(UK key) throws Exception { + return get(key) != null; + } + + @Override + public Iterable> entries() throws Exception { + return entriesStream()::iterator; + } + + private Stream> entriesStream() throws Exception { + Iterable>> withTs = original.entries(); + withTs = withTs == null ? Collections.emptyList() : withTs; + return StreamSupport + .stream(withTs.spliterator(), false) + .filter(this::unexpiredAndUpdateOrCleanup) + .map(TtlMapState::dropTs); + } + + private boolean unexpiredAndUpdateOrCleanup(Map.Entry> e) { + UV unexpiredValue; + try { + unexpiredValue = getWithTtlCheckAndUpdate( + e::getValue, + v -> original.put(e.getKey(), v), + () -> original.remove(e.getKey())); + } catch (Exception ex) { + throw new FlinkRuntimeException(ex); + } + return unexpiredValue != null; + } + + private static Map.Entry dropTs(Map.Entry> e) { --- End diff -- Again, to keep it more clear I would spell out `dropTs` as `unwrapTtlState` or something similar. > Create wrapper with TTL logic for value state > -
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520540#comment-16520540 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197494851 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.AbstractMap; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +/** + * This class wraps map state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry key of state with TTL + * @param Type of the user entry value of state with TTL + */ +class TtlMapState + extends AbstractTtlState, Map>, InternalMapState>> + implements InternalMapState { + TtlMapState( + InternalMapState> original, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(original, config, timeProvider, valueSerializer); + } + + @Override + public UV get(UK key) throws Exception { + return getWithTtlCheckAndUpdate(() -> original.get(key), v -> original.put(key, v), () -> original.remove(key)); + } + + @Override + public void put(UK key, UV value) throws Exception { + original.put(key, wrapWithTs(value)); + } + + @Override + public void putAll(Map map) throws Exception { + if (map == null) { + return; + } + Map> ttlMap = map.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> wrapWithTs(e.getValue(; + original.putAll(ttlMap); + } + + @Override + public void remove(UK key) throws Exception { + original.remove(key); + } + + @Override + public boolean contains(UK key) throws Exception { + return get(key) != null; + } + + @Override + public Iterable> entries() throws Exception { + return entriesStream()::iterator; + } + + private Stream> entriesStream() throws Exception { + Iterable>> withTs = original.entries(); + withTs = withTs == null ? Collections.emptyList() : withTs; + return StreamSupport + .stream(withTs.spliterator(), false) --- End diff -- I saw that some of the classes make heavy use of streams by getting spliterators from collections. While the code is concise, this creates some default adapter from iterator to spliterator under the hood. IIRC this can have significant performance impact, especially if used with hot code paths. State access in Flink can be considered a hot path for some cases. It is hard to quantify the impact just from looking at it, but when using this kind of api adapters in "low-level" classes, please be aware of the potential impact. We might want to have a look at the performance and tune if needed. Should be ok for now because there is no regression in existing code, but we might want to measure this for heap based state eventually. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520498#comment-16520498 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197482482 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + final long ttl; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + this.ttl = config.getTtl().toMilliseconds(); + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); + } + +TtlValue wrapWithTs(V value) { + return wrapWithTs(value, newExpirationTimestamp()); + } + + static TtlValue wrapWithTs(V value, long ts) { + return value == null ? null : new TtlValue<>(value, ts); + } + +TtlValue rewrapWithNewTs(TtlValue ttlValue) { + return wrapWithTs(ttlValue.getUserValue()); + } + + private long newExpirationTimestamp() { + long currentTs = timeProvider.currentTimestamp(); + long ttlWithoutOverflow = currentTs > 0 ? Math.min(Long.MAX_VALUE - currentTs, ttl) : ttl; + return currentTs + ttlWithoutOverflow; + } + + V getWithTtlCheckAndUpdate( --- End diff -- Seems also like this method almost fits better into `AbstractTtlState`, for example you can access `clear()` directly. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520492#comment-16520492 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197480035 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { --- End diff -- I wonder it is better to have this as abstract base class or prefer composition over inheritance for `AbstractTtlState`. It has no abstract methods and also no methods are overriden by subclasses, it could as well just be a reference of `AbstractTtlState` and not abstract. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520486#comment-16520486 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197477831 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateVisibility.java --- @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +/** + * This option configures whether to return expired user value or not. --- End diff -- "can" be returned. we accept it in relaxed, we don't enforce it. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520477#comment-16520477 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197476710 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateVisibility.java --- @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +/** + * This option configures whether to return expired user value or not. + */ +public enum TtlStateVisibility { + /** Return still available expired user value (not yet cleaned up). */ + Relaxed, --- End diff -- Would explain this different in the comment. It means that expired state can be returned if it is not yet cleaned up. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520478#comment-16520478 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197476858 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateVisibility.java --- @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +/** + * This option configures whether to return expired user value or not. + */ +public enum TtlStateVisibility { + /** Return still available expired user value (not yet cleaned up). */ + Relaxed, + /** Hide expired user value and behave as if it does not exist any more. */ + Exact --- End diff -- And here: expired state is never returned to the user. I think it does not matter if it is just hidden, or deleted. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520474#comment-16520474 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197475873 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUpdateType.java --- @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +/** + * This option value configures when to prolong state TTL. + */ +public enum TtlUpdateType { + /** TTL is disabled. State does not expire. */ + Disabled, --- End diff -- Seems like this is never needed? Why would somebody register TTL state and then declare it disabled? > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520469#comment-16520469 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197475080 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUpdateType.java --- @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +/** + * This option value configures when to prolong state TTL. + */ +public enum TtlUpdateType { --- End diff -- Why not better declare the enums in the `TtlConfig` class where they belong to? > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520471#comment-16520471 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197475245 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeCharacteristic.java --- @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +/** + * This option configures time scale to use for ttl. + */ +public enum TtlTimeCharacteristic { --- End diff -- Same here. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520470#comment-16520470 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197475154 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateVisibility.java --- @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +/** + * This option configures whether to return expired user value or not. + */ +public enum TtlStateVisibility { --- End diff -- Same here. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520464#comment-16520464 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197474567 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.util.Preconditions; + +/** + * Configuration of state TTL logic. + * TODO: builder --- End diff -- Was this forgotten or planned for a later commit? > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520409#comment-16520409 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197460522 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + final long ttl; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + this.ttl = config.getTtl().toMilliseconds(); + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); + } + +TtlValue wrapWithTs(V value) { --- End diff -- Better `wrapWithTtl` > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520406#comment-16520406 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197459764 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + final long ttl; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + this.ttl = config.getTtl().toMilliseconds(); + } + +V getUnexpried(TtlValue ttlValue) { --- End diff -- This method does two things: checking the ttl and unwrapping the value. I would make this as two methods to separate concerns and make it more readable. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520401#comment-16520401 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197458219 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; --- End diff -- A bit more field comments might be good, especially about the exact meanings of those boolean flags. For example, it is not super obvious what `returnExpired` means for the code. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520397#comment-16520397 ] ASF GitHub Bot commented on FLINK-9514: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197457017 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + final long ttl; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + this.ttl = config.getTtl().toMilliseconds(); + } + +V getUnexpried(TtlValue ttlValue) { --- End diff -- typo in method name > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520159#comment-16520159 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on the issue: https://github.com/apache/flink/pull/6186 Thanks for review @sihuazhou, I think now all concerns should have been addressed. CI failure is unrelated, works in [my CI for the same commit](https://travis-ci.org/azagrebin/flink/builds/395137069?utm_source=github_status&utm_medium=notification). > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520168#comment-16520168 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6186 @azagrebin thanks for addressing the concerns, looks good from my side now. let's wait for @StefanRRichter 's review. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519551#comment-16519551 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197192870 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.util.Preconditions; + +/** + * Configuration of state TTL logic. + * TODO: builder + */ +public class TtlConfig { + private final TtlUpdateType ttlUpdateType; + private final TtlStateVisibility stateVisibility; + private final TtlTimeCharacteristic timeCharacteristic; + private final Time ttl; + + public TtlConfig( + TtlUpdateType ttlUpdateType, + TtlStateVisibility stateVisibility, + TtlTimeCharacteristic timeCharacteristic, + Time ttl) { + Preconditions.checkNotNull(ttlUpdateType); + Preconditions.checkNotNull(stateVisibility); + Preconditions.checkNotNull(timeCharacteristic); + Preconditions.checkArgument(ttl.toMilliseconds() >= 0, --- End diff -- Maybe we should pre check the `ttl` is not null, and I wonder does the `ttl.toMilliseconds() == 0` would make any sense? > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519539#comment-16519539 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197188305 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * This class wraps user value of state with TTL. + * + * @param Type of the user value of state with TTL + */ +class TtlValue implements Serializable { + private final T userValue; + private final long expirationTimestamp; --- End diff -- Okay, I see this is tricky, I agree that this should be addressed in another PR. We need to figure out a proper way to do that. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519511#comment-16519511 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197179238 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * This class wraps user value of state with TTL. + * + * @param Type of the user value of state with TTL + */ +class TtlValue implements Serializable { + private final T userValue; + private final long expirationTimestamp; --- End diff -- This operation fits more for checkpoint full scan restoration with custom transformation of each state entry where expiration timestamp is optionally prolonged for downtime. The same as cleanup of expired state during full scan. I think it should be another issue and PR, because how can wrappers distinguish between old and new state before and after restart? > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519301#comment-16519301 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197114442 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * This class wraps user value of state with TTL. + * + * @param Type of the user value of state with TTL + */ +class TtlValue implements Serializable { + private final T userValue; + private final long expirationTimestamp; --- End diff -- Additional, If we won't do the time align works on recovery, then what is the safe `TTL` value we should set for the a job? (this is the question that the users always ask us when they trying to use the `TTL`(we implemented it in a hacking way based on `TtlDB`) to control the state's size) > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519288#comment-16519288 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197111980 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * This class wraps user value of state with TTL. + * + * @param Type of the user value of state with TTL + */ +class TtlValue implements Serializable { + private final T userValue; + private final long expirationTimestamp; --- End diff -- I'm really not sure whether we should leave it for now. If we leave it for now, then it will be a headache problem on practical production. As a very common situation there is a job, which reading data from kafka, and the user set the `TTL = 2hours` because he thinks that the data's latency is absolute less than 2 hours, this way they can use the TTL to safely control the whole state size, and got a exactly result. But, if he found that the job need to scale up, then he need to trigger a savepoint and rescale the job from it. but what if there's some problems that stop he recovering the job from the savepoint in a very short time, let's say he will took 30min to recover the job, then the result become inaccuracy. Even the user never need to trigger a savepoint for any reason, what if the job means some problem(maybe some problem with some machine) and loop in "failed-restart-failed-..", after 2 hours we fixed the problem and the job automatically resume, but the state has all been expired. I think this is a disaster for the user. Yes, when using the `EventTime` people this problem won't help, but the `ProccessTime` is a very common use case(In our production, most of the job's TimeCharacter is `ProccessTime`). I know Flink's TimeService also didn't do the time align works on recovery, but state's TTL is a bit different with Timer. When registering a timer, what users offer to the API is a absolute time, but when setting the TTL, what users offer is just a relative time, it's us that convert the relative time to a absolute time to implement the TTL. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519279#comment-16519279 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197110068 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { + return StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList()); + } + + @Override + public void updateInternal(List valueToStore) throws Exception { + Preconditions.checkNotNull(valueToStore, "List of values to update cannot be null."); + original.addAll(withTs(valu
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519277#comment-16519277 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197109577 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); + } + +TtlValue wrapWithTs(V value) { + return wrapWithTs(value, newExpirationTimestamp()); + } + + static TtlValue wrapWithTs(V value, long ts) { + return value == null ? null : new TtlValue<>(value, ts); + } + +TtlValue rewrapWithNewTs(TtlValue ttlValue) { + return wrapWithTs(ttlValue.getUserValue()); + } + + private long newExpirationTimestamp() { + long currentTs = timeProvider.currentTimestamp(); + long ttl = config.getTtl().toMilliseconds(); --- End diff -- I think it will be optimised by JIT as an only usage, but I will do just in case > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519276#comment-16519276 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197108498 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); --- End diff -- I think this is out of TTL scope now. Merge namespace methods are not part of API which immediately face user. It is used only internally for windowing, not sure if it needs TTL ever. Anyways read part of API should eventually evict expired state, merged or not. For aggregating states, merge already uses wrapped reading methods from aggregate functions underneath which should also speed up cleanup if ever used with `mergeNamespaces`. > Create wrapper with TTL logic for value state >
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519247#comment-16519247 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197102118 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { --- End diff -- Well, not sure, it matters at this point, it is private and used just once with a collected `LIst`. My thinking was that it is less verbose: `list.stream()` vs `StreamSupport.stream(iterable.spliterator(), false)`. > Create wrapper with TTL logic for value state > - > >
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519187#comment-16519187 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197080576 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { --- End diff -- If we need to "update on read", then here is a bit confusion to me. Currently we attach TTL for every list item, so the "update on read" should scope to the list item, not the whole list. So, it makes me feel that an `iterable` for `updateTs` seems more reasonable. What do you think? > Create wrapper with TTL logic for valu
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519186#comment-16519186 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197079917 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * This class wraps user value of state with TTL. + * + * @param Type of the user value of state with TTL + */ +class TtlValue implements Serializable { + private final T userValue; + private final long expirationTimestamp; --- End diff -- I think it is a bit out of score for this PR. I thought about this concern but semantics of processing time is similar of real clock time and it does not stop if job is stopped. There is also event time option with more control over time. I would leave it for now. This is more about user migration of checkpoints. We can add a comment/open question to design doc about it. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519178#comment-16519178 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197077377 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { --- End diff -- In general the idea is to keep `get()` method as lazy as possible, return `Iterable` and avoid querying backend if not needed. At the moment list state internally stores `List` according to API, not `Iterable` (something to think about in future). So `getInternal()` returns `List` - collected `Iterable`. I agree it looks wei
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518896#comment-16518896 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196998472 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { --- End diff -- If we've called `getInterval()` in `get()`, and make the `updateTs()` to accept `Iterable`, then this method seems could be removed(Or at least, we should add a check for if the `iterable` is assignable from `List`, if true we could cast it to List and return immediately). > Create wrapper with TTL logic for value state > -
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518893#comment-16518893 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197001110 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalReducingState; + +import java.util.Collection; + +/** + * This class wraps reducing state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user value of state with TTL + */ +class TtlReducingState + extends AbstractTtlState, InternalReducingState>> + implements InternalReducingState { + TtlReducingState( + InternalReducingState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public T get() throws Exception { + return getInternal(); + } + + @Override + public void add(T value) throws Exception { + original.add(wrapWithTs(value, Long.MAX_VALUE)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); --- End diff -- Again, Should we also do the TTL check for original.mergeNamespaces()? Since we need to query the state when merging namespaces. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518892#comment-16518892 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196996839 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); --- End diff -- In this block, we need to iterate the `ttlValue` twice, one for `collect()` and one for `updateTs()`. If we could make the updateTs to accept `Iterable` as the argument, then we can avoiding the `collect()` here, this way we only need to iterate the `ttlValue` once. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518898#comment-16518898 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197001339 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalAggregatingState; + +import java.util.Collection; + +/** + * This class wraps aggregating state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the value added to the state + * @param The type of the accumulator (intermediate aggregate state). + * @param Type of the value extracted from the state + * + */ +class TtlAggregatingState + extends AbstractTtlState, InternalAggregatingState, OUT>> + implements InternalAggregatingState { + + TtlAggregatingState( + InternalAggregatingState, OUT> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer valueSerializer, + TtlAggregateFunction aggregateFunction) { + super(originalState, config, timeProvider, valueSerializer); + aggregateFunction.stateClear = originalState::clear; + aggregateFunction.updater = originalState::updateInternal; + } + + @Override + public OUT get() throws Exception { + return original.get(); + } + + @Override + public void add(IN value) throws Exception { + original.add(value); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public ACC getInternal() throws Exception { + return getWithTtlCheckAndUpdate(original::getInternal, original::updateInternal); + } + + @Override + public void updateInternal(ACC valueToStore) throws Exception { + original.updateInternal(wrapWithTs(valueToStore)); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); --- End diff -- Should we also do the TTL check for original.mergeNamespaces()? Since we need to query the state when merging namespaces. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518894#comment-16518894 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196995820 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); --- End diff -- Again, should we also do the `TTL` check for `original.mergeNamespaces()`? Since we need to query the state when merging namespaces. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518897#comment-16518897 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197004891 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * This class wraps user value of state with TTL. + * + * @param Type of the user value of state with TTL + */ +class TtlValue implements Serializable { + private final T userValue; + private final long expirationTimestamp; --- End diff -- The `expirationTimestamp` is an absolute timestamp, should we do the timestamp shift for `TtlValue` when checkpoint & recovery? For example, when user using the `ProcessTime` as the TimeCharacater, and set the `TTL = 10min`. For some reason, he triggers a savepoint, and after 11 min he recover the job from the savepoint, if we don't do the timestamp shift, then all the state will be expired. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518895#comment-16518895 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196996094 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); --- End diff -- This looks a bit weird, my gut feeling is that we should call `getInternal()` in `get()`(as we called `updateInternal()` in `update()` in this class), but here is reverse. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Pro
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518810#comment-16518810 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196995095 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.util.Preconditions; + +/** + * Configuration of state TTL logic. + * TODO: builder + */ +public class TtlConfig { + private final TtlUpdateType ttlUpdateType; + private final TtlStateVisibility stateVisibility; + private final TtlTimeCharacteristic timeCharacteristic; + private final Time ttl; + + public TtlConfig( + TtlUpdateType ttlUpdateType, + TtlStateVisibility stateVisibility, + TtlTimeCharacteristic timeCharacteristic, + Time ttl) { + Preconditions.checkNotNull(ttlUpdateType); + Preconditions.checkNotNull(stateVisibility); + Preconditions.checkNotNull(timeCharacteristic); --- End diff -- Maybe we should also check that the `ttl` is greater than 0? > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518274#comment-16518274 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196827863 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; --- End diff -- Oh sorry, my bad, I'm misunderstand... > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518267#comment-16518267 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196826339 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; --- End diff -- The `ttlValue` is changed before the lambda from return statement so it is not effectively immutable any more to be used in lambda, that is why `finalResult` is formally needed to avoid compilation error. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518244#comment-16518244 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196820474 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); + } + +TtlValue wrapWithTs(V value) { + return wrapWithTs(value, newExpirationTimestamp()); + } + + static TtlValue wrapWithTs(V value, long ts) { + return value == null ? null : new TtlValue<>(value, ts); + } + +TtlValue rewrapWithNewTs(TtlValue ttlValue) { + return wrapWithTs(ttlValue.getUserValue()); + } + + private long newExpirationTimestamp() { + long currentTs = timeProvider.currentTimestamp(); + long ttl = config.getTtl().toMilliseconds(); --- End diff -- This will be called a lot often, so does it make sense to introduce a field to remember the `config.getTtl().toMilliseconds()`? > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518243#comment-16518243 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196809755 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); + } + +TtlValue wrapWithTs(V value) { + return wrapWithTs(value, newExpirationTimestamp()); + } + + static TtlValue wrapWithTs(V value, long ts) { + return value == null ? null : new TtlValue<>(value, ts); + } + +TtlValue rewrapWithNewTs(TtlValue ttlValue) { + return wrapWithTs(ttlValue.getUserValue()); + } + + private long newExpirationTimestamp() { + return timeProvider.currentTimestamp() + config.getTtl().toMilliseconds(); --- End diff -- This will be called a lot often, so does it make sense to introduce a field to remember the `config.getTtl().toMilliseconds()`? > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL >
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518242#comment-16518242 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196816259 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { + return StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList()); + } + + @Override + public void updateInternal(List valueToStore) throws Exception { + Preconditions.checkNotNull(valueToStore, "List of values to update cannot be null."); + original.addAll(withTs(valu
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518241#comment-16518241 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196817846 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; --- End diff -- The var `finalResult` looks like redundant or I'm misunderstand. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518240#comment-16518240 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196819320 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); --- End diff -- In general Flink allows to operate in negative range for event time, but the overflow in case of very big TTL should be checked. TTL makes sense only non-negative. I have added fix for it. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518200#comment-16518200 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196791555 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.util.Preconditions; + +/** + * Configuration of state TTL logic. + * TODO: builder + */ +public class TtlConfig { + private final TtlUpdateType ttlUpdateType; + private final TtlStateVisibility stateVisibility; + private final TtlTimeCharacteristic timeCharacteristic; + private final Time ttl; + + public TtlConfig( + TtlUpdateType ttlUpdateType, + TtlStateVisibility stateVisibility, + TtlTimeCharacteristic timeCharacteristic, + Time ttl) { + Preconditions.checkNotNull(ttlUpdateType); + Preconditions.checkNotNull(stateVisibility); + Preconditions.checkNotNull(timeCharacteristic); --- End diff -- Why not checking for `ttl`? > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518155#comment-16518155 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196786370 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); --- End diff -- Does it make sense to never expire the value when the `ttValue.getExpirationTimestamp()` return negative? > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518152#comment-16518152 ] ASF GitHub Bot commented on FLINK-9514: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196784275 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); --- End diff -- This looks like a bit problematic, because the `ttlValue.getExpirationTimestamp()` might be negative. E.g when the user provide `Long.MAX_VALUE` as the TTL value, what he expected is that the value should never be expired, but according to the current code, it will immediately expired. > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518002#comment-16518002 ] ASF GitHub Bot commented on FLINK-9514: --- Github user azagrebin commented on the issue: https://github.com/apache/flink/pull/6186 cc @StefanRRichter > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state
[ https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518001#comment-16518001 ] ASF GitHub Bot commented on FLINK-9514: --- GitHub user azagrebin opened a pull request: https://github.com/apache/flink/pull/6186 [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers ## What is the purpose of the change This PR introduces TTL logic wrappers for state objects. ## Brief change log Added - sketch of TtlConfig - AbstractTtlWrapper and AbstractTtlState - concrete TTL wrappers for state objects ## Verifying this change Unit tests for state objects TTL wrappers ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (not applicable at this step) You can merge this pull request into a Git repository by running: $ git pull https://github.com/azagrebin/flink FLINK-9515 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6186.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6186 commit 62faa8ee220c21fa824fec690073c27a0a994be5 Author: Andrey Zagrebin Date: 2018-06-04T15:28:40Z [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers > Create wrapper with TTL logic for value state > - > > Key: FLINK-9514 > URL: https://issues.apache.org/jira/browse/FLINK-9514 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Fix For: 1.6.0 > > > TTL state decorator uses original state with packed TTL and add TTL logic > using time provider: > {code:java} > TtlValueState implements ValueState { > ValueState> underlyingState; > InternalTimeService timeProvider; > V value() { > TtlValue valueWithTtl = underlyingState.get(); > // ttl logic here (e.g. update timestamp) > return valueWithTtl.getValue(); > } > void update() { ... underlyingState.update(valueWithTtl) ... } > } > {code} > TTL decorators are apply to state produced by normal state binder in its TTL > wrapper from FLINK-9513 -- This message was sent by Atlassian JIRA (v7.6.3#76005)