[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541387#comment-16541387
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r201971238
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
--- End diff --

Got it, thanks for your explanation.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State 

[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541352#comment-16541352
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on the issue:

https://github.com/apache/flink/pull/6186
  
Hi @Aitozi, the main issue FLINK-9510 contains a design doc for this effort 
with roadmap


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541285#comment-16541285
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r201941972
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
--- End diff --

Hi @Aitozi, in case of current implementation of list state in Rocksdb you 
are right. But e.g. there was an effort to make lists scalable like maps in 
rocksdb, it could be lazy in this case. 

This TTL implementation does not make any assumptions of underlying state 
backend. The generic state user API returns `Iterabl

[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-07-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541073#comment-16541073
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6186
  
Hi, after read the whole implementation,  i found that the state is expired 
when it is accessed, When there is the dirty data store to state and never be 
queried, how does it can be expired. Or is there an undergoing work for this ? 
@azagrebin 


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-07-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541061#comment-16541061
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r201895445
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
--- End diff --

Hi @azagrebin , little doubt that you say the  

> return Iterable and avoid querying backend if not needed

But when deal with the ListState the `original.get()` has already query the 
original `Iterable` from RocksDB doesn't it ? Is this way just lazy query the 
iterable element in memory?


> Create wrapper with 

[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527547#comment-16527547
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6186


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527543#comment-16527543
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6186
  
Merging this.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527369#comment-16527369
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6186
  
Had one more minor comment. Besides, this looks good 👍 Nice job!


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527364#comment-16527364
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r199109435
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java 
---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps map state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry key of state with TTL
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlMapState
+   extends AbstractTtlState, Map>, 
InternalMapState>>
+   implements InternalMapState {
+   TtlMapState(
+   InternalMapState> original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(original, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public UV get(UK key) throws Exception {
+   return getWithTtlCheckAndUpdate(() -> original.get(key), v -> 
original.put(key, v), () -> original.remove(key));
+   }
+
+   @Override
+   public void put(UK key, UV value) throws Exception {
+   original.put(key, wrapWithTs(value));
+   }
+
+   @Override
+   public void putAll(Map map) throws Exception {
+   if (map == null) {
+   return;
+   }
+   Map> ttlMap = new HashMap<>();
--- End diff --

We can already initialize the new map with `map.size()`.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527302#comment-16527302
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r199082008
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
--- End diff --

My idea was to keep this method lazy when possible. Currently this 
implementation does not assume that underlying state returns `List` but rather 
works as if it is `Iterable` and lazily wraps it with `IteratorWithCleanup`. 
Only when it has to update timestamps on read, it materialises `Iterable` to 
`List`.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16526671#comment-16526671
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r198947650
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+
+/**
+ * This class wraps folding function with TTL logic.
+ *
+ * @param  Type of the values folded into the state
+ * @param  Type of the value in the state
+ *
+ * @deprecated use {@link TtlAggregateFunction} instead
+ */
+@Deprecated
+class TtlFoldFunction
+   extends AbstractTtlDecorator>
+   implements FoldFunction> {
+   TtlFoldFunction(FoldFunction original, TtlConfig config, 
TtlTimeProvider timeProvider) {
+   super(original, config, timeProvider);
+   }
+
+   @Override
+   public TtlValue fold(TtlValue accumulator, T value) throws 
Exception {
+   return wrapWithTs(original.fold(getUnexpried(accumulator), 
value));
--- End diff --

It should be covered with `updateExpired` in `testExactExpirationOnWrite`


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16526115#comment-16526115
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r198757244
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+
+/**
+ * This class wraps folding function with TTL logic.
+ *
+ * @param  Type of the values folded into the state
+ * @param  Type of the value in the state
+ *
+ * @deprecated use {@link TtlAggregateFunction} instead
+ */
+@Deprecated
+class TtlFoldFunction
+   extends AbstractTtlDecorator>
+   implements FoldFunction> {
+   TtlFoldFunction(FoldFunction original, TtlConfig config, 
TtlTimeProvider timeProvider) {
+   super(original, config, timeProvider);
+   }
+
+   @Override
+   public TtlValue fold(TtlValue accumulator, T value) throws 
Exception {
+   return wrapWithTs(original.fold(getUnexpried(accumulator), 
value));
--- End diff --

As I understand, the wrapped states should already provide the default 
values. My idea was to wrap the original default value [in TTL 
factory](https://github.com/apache/flink/pull/6196/commits/4dedb9a20244a2addd337617778b17fe8349#diff-13011fbe7c28b56b994783572b461aaeR174)
 with expiration timestamp `Long.MAX_VALUE`, basically never expiring. Good 
point about test cases for it, I will add them for appending states.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16526096#comment-16526096
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r198751723
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java 
---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps map state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry key of state with TTL
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlMapState
+   extends AbstractTtlState, Map>, 
InternalMapState>>
+   implements InternalMapState {
+   TtlMapState(
+   InternalMapState> original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(original, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public UV get(UK key) throws Exception {
+   return getWithTtlCheckAndUpdate(() -> original.get(key), v -> 
original.put(key, v), () -> original.remove(key));
+   }
+
+   @Override
+   public void put(UK key, UV value) throws Exception {
+   original.put(key, wrapWithTs(value));
+   }
+
+   @Override
+   public void putAll(Map map) throws Exception {
+   if (map == null) {
+   return;
+   }
+   Map> ttlMap = map.entrySet().stream()
+   .collect(Collectors.toMap(Map.Entry::getKey, e -> 
wrapWithTs(e.getValue(;
+   original.putAll(ttlMap);
+   }
+
+   @Override
+   public void remove(UK key) throws Exception {
+   original.remove(key);
+   }
+
+   @Override
+   public boolean contains(UK key) throws Exception {
+   return get(key) != null;
+   }
+
+   @Override
+   public Iterable> entries() throws Exception {
+   return entriesStream()::iterator;
+   }
+
+   private Stream> entriesStream() throws Exception {
+   Iterable>> withTs = 
original.entries();
+   withTs = withTs == null ? Collections.emptyList() : withTs;
+   return StreamSupport
+   .stream(withTs.spliterator(), false)
--- End diff --

As I understand, it depends on use case. If it is parallelizable, lazy 
operations over big collection like filter and map over lists, stream will give 
boost over loops but for short collections or non-parallelizable spliterators 
the overhead kills the performance. Though, it might be hard to predict the 
type of used spliterator. I agree the real benchmarking should be done to make 
sure.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For:

[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524864#comment-16524864
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r198439894
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
--- End diff --

Currently this class is used to wrap any object with TTL logic, not only 
state objects: e.g. aggregating functions. It can be injected as a "TTL time 
service" but then the common member `original` will have to be duplicated in 
function wrappers.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524863#comment-16524863
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r198439148
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateVisibility.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+/**
+ * This option configures whether to return expired user value or not.
+ */
+public enum TtlStateVisibility {
+   /** Return still available expired user value (not yet cleaned up). */
+   Relaxed,
+   /** Hide expired user value and behave as if it does not exist any 
more. */
+   Exact
--- End diff --

I think I will also rename enum entries.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524855#comment-16524855
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r198436490
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUpdateType.java
 ---
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+/**
+ * This option value configures when to prolong state TTL.
+ */
+public enum TtlUpdateType {
+   /** TTL is disabled. State does not expire. */
+   Disabled,
--- End diff --

This is more for internally named configuration, basically for later use in 
TTL wrapper factory to decide whether to wrap with TTL or not. Alternative 
would be probably to save null'ed, Optional config or negative TTL in state 
descriptor when configured w/o TTL by default. I found it more explicit, 
although, it faces user and I planned that he will rather probably just not 
configure TTL at all than use this option.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524835#comment-16524835
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r198431042
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+   final long ttl;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   this.ttl = config.getTtl().toMilliseconds();
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
+   }
+
+TtlValue wrapWithTs(V value) {
--- End diff --

It actually wraps with expiration timestamp, TTL is not stored with value.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524834#comment-16524834
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r198430741
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java 
---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Configuration of state TTL logic.
+ * TODO: builder
--- End diff --

Yes, config is only sketched. It is planned for later step when TTL will be 
activated in a State Descriptor API.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522079#comment-16522079
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6186
  
@StefanRRichter Thanks for you reply! I think that makes sense, there is 
still a workaround for the user to go. `+1` to implement the current approach 
firstly.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522049#comment-16522049
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6186
  
@sihuazhou I think your concern and suggestion can clearly make sense for 
some cases. However I don't think it should be a general default as well 
because I can also find cases where this is not what a user might want. For 
example, TTL could be used for compliance with some user data privacy law. In 
this case, the law does not care about technical details like restores but only 
about wall-clock time, e.g. from the last user interaction. 
As this is a new feature, there is also no regression and users can still 
go for a custom timer-based solution as alternative. I agree for some cases it 
makes sense to have this timeshift and also your outlined approach can make 
sense. However, I think this does not have to block this PR, because I would 
consider it a feature/improvement on top of this work. We could still target to 
have your suggested followup before the release.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522050#comment-16522050
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6186
  
@azagrebin I think the overall idea is well implemented. I just had a 
couple of comments inline.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522030#comment-16522030
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197719642
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.runtime.state.internal.InternalListState;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/** Test suite for {@link TtlListState}. */
+public class TtlListStateTest extends TtlStateTestBase, List, Iterable> {
+   @Override
+   TtlListState createState() {
+   return new TtlListState<>(new MockInternalListState<>(), 
ttlConfig, timeProvider, null);
+   }
+
+   @Override
+   void initTestValues() {
+   updater = v -> ttlState.addAll(v);
+   getter = () -> 
StreamSupport.stream(ttlState.get().spliterator(), 
false).collect(Collectors.toList());
+   originalGetter = () -> ttlState.original.get();
+
+   emptyValue = Collections.emptyList();
+
+   updateValue1 = Arrays.asList(5, 7, 10);
+   updateValue2 = Arrays.asList(8, 9, 11);
+
+   getValue1 = updateValue1;
+   getValue2 = updateValue2;
+   }
+
+   private static class MockInternalListState
+   extends MockInternalKvState>
+   implements InternalListState {
+
+   MockInternalListState() {
+   value = new ArrayList<>();
+   }
+
+   @Override
+   public void update(List elements) {
+   updateInternal(elements);
+   }
+
+   @Override
+   public void addAll(List elements) {
+   value.addAll(elements);
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) {
--- End diff --

Wouldn't it make sense to check that merging namespaces also works 
correctly with the TTL state and to fix validate the contract what has to 
happen to the timestamps in this case?


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522027#comment-16522027
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197719310
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+abstract class TtlStateTestBase {
--- End diff --

I think it would make sense to extend this general test a bit to consider 
multiple keys and namespaces. Ideally, a test should really test the full 
contract specification of the tested subject. What is mean is, you could 
currently pass this test even if TTL would accidentally clear all states on the 
timeout of one state, or maybe clear all the states in the same namespace. The 
mock states can easily be extended to truly scope values by key and namespace. 
Then the test can, for example, create two keys in the same namespace and two 
keys in a different namespace and check that their timeouts are isolated from 
each other and the interaction works as expected,


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522000#comment-16522000
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197714098
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
--- End diff --

Currently this could return `List` instead of `Iterable` and you 
might get around the instance of check.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16521801#comment-16521801
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6186
  
Maybe let me elaborate the TTL checking condition in detail, overall the 
checking condition contains two parts and looks like `(current_ts - update_ts) 
- time_shift_offset >= TTL`.

The `time_shift_offset` is the shift offset that we should applied when 
checking the TTL.

- For the records that the `update_ts` > `checkpoint_ts`, we could know 
they were created(or updated) after the last restoring so we don't need to 
apply any shift to it. So that shift offset is `0`.

- For the records that the `update_ts` <= `checkpoint_ts`, we could know 
they were created(or updated) before the last restoring so we need to apply the 
shift to it, the shift offset is `recovery_ts - checkpoint_ts`.

In our current code, we didn't do the time-align works, it equals to a 
special case of the above condition where the `time_shift_offset` is always `0`.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16521498#comment-16521498
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6186
  
I'm still a bit worried about the time-align problem on recovery(because 
I've met serval case that would become disaster on production if we don't do 
the time-align on recovery. (One instance: we used the DFS to store the 
checkpoint data, and the DFS went into safe-mode because of some problems, we 
took several hours to notice that and also took some times to address the 
issue. After addressing DFS's issue, user's jobs were resumed and begin to run 
correctly. In this case, if we don't do the time-align on recovery, then user's 
state maybe already totally expired(when TTL <= the `system down time`)).

I had a second thought on this problem, and I think maybe we could do that 
without a full scanning of the records, the approach is outlined below.

- 1. We need to remember the timestamp when performing the checkpoint, 
let's say it `checkpoint_ts`.
- 2. We also need to remember the timestamp when recovering from the 
checkpoint, let's say it `recovery_ts`.
- 3. For each record, we remember it's last update timestamp, let's say it 
`update_ts`.
- 5. And the current time stamp is `current_ts`.
- 4. Then we could use the follow condition `checkpoint_ts - update_ts + 
current_s - recovery_ts >= TTL` to check whether the record is expired. If it's 
true then record is expired, otherwise the record is still alive.

What do you think? @azagrebin ,  and @StefanRRichter would be really nice 
to learn your opinion about this problem.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520577#comment-16520577
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197503773
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+
+import java.util.Collection;
+
+/**
+ * This class wraps aggregating state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the value added to the state
+ * @param  The type of the accumulator (intermediate aggregate state).
+ * @param  Type of the value extracted from the state
+ *
+ */
+class TtlAggregatingState
+   extends AbstractTtlState, 
InternalAggregatingState, OUT>>
+   implements InternalAggregatingState {
+
+   TtlAggregatingState(
+   InternalAggregatingState, OUT> 
originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer valueSerializer,
+   TtlAggregateFunction aggregateFunction) {
+   super(originalState, config, timeProvider, valueSerializer);
+   aggregateFunction.stateClear = originalState::clear;
--- End diff --

Maybe it is better to pass a  `TtlAggregateFunctionBuilder` and then supply 
`stateClear` and `updater` before the object is created. I think then they can 
also become immutable. Similar changes in the other states.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520570#comment-16520570
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197502165
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+
+/**
+ * This class wraps folding function with TTL logic.
+ *
+ * @param  Type of the values folded into the state
+ * @param  Type of the value in the state
+ *
+ * @deprecated use {@link TtlAggregateFunction} instead
+ */
+@Deprecated
+class TtlFoldFunction
+   extends AbstractTtlDecorator>
+   implements FoldFunction> {
+   TtlFoldFunction(FoldFunction original, TtlConfig config, 
TtlTimeProvider timeProvider) {
+   super(original, config, timeProvider);
+   }
+
+   @Override
+   public TtlValue fold(TtlValue accumulator, T value) throws 
Exception {
+   return wrapWithTs(original.fold(getUnexpried(accumulator), 
value));
--- End diff --

I this is true, that is a valuable case to be tested for all the appending 
states that they work correctly.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520566#comment-16520566
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197501902
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+
+/**
+ * This class wraps folding function with TTL logic.
+ *
+ * @param  Type of the values folded into the state
+ * @param  Type of the value in the state
+ *
+ * @deprecated use {@link TtlAggregateFunction} instead
+ */
+@Deprecated
+class TtlFoldFunction
+   extends AbstractTtlDecorator>
+   implements FoldFunction> {
+   TtlFoldFunction(FoldFunction original, TtlConfig config, 
TtlTimeProvider timeProvider) {
+   super(original, config, timeProvider);
+   }
+
+   @Override
+   public TtlValue fold(TtlValue accumulator, T value) throws 
Exception {
+   return wrapWithTs(original.fold(getUnexpried(accumulator), 
value));
--- End diff --

I think that (similar to`TtlAggregationFunction`) you need to intercept 
`null` values of accumulator here and replace them by the default value.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520549#comment-16520549
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197498126
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java 
---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps map state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry key of state with TTL
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlMapState
+   extends AbstractTtlState, Map>, 
InternalMapState>>
+   implements InternalMapState {
+   TtlMapState(
+   InternalMapState> original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(original, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public UV get(UK key) throws Exception {
+   return getWithTtlCheckAndUpdate(() -> original.get(key), v -> 
original.put(key, v), () -> original.remove(key));
+   }
+
+   @Override
+   public void put(UK key, UV value) throws Exception {
+   original.put(key, wrapWithTs(value));
+   }
+
+   @Override
+   public void putAll(Map map) throws Exception {
+   if (map == null) {
+   return;
+   }
+   Map> ttlMap = map.entrySet().stream()
+   .collect(Collectors.toMap(Map.Entry::getKey, e -> 
wrapWithTs(e.getValue(;
+   original.putAll(ttlMap);
+   }
+
+   @Override
+   public void remove(UK key) throws Exception {
+   original.remove(key);
+   }
+
+   @Override
+   public boolean contains(UK key) throws Exception {
+   return get(key) != null;
+   }
+
+   @Override
+   public Iterable> entries() throws Exception {
+   return entriesStream()::iterator;
+   }
+
+   private Stream> entriesStream() throws Exception {
+   Iterable>> withTs = 
original.entries();
+   withTs = withTs == null ? Collections.emptyList() : withTs;
+   return StreamSupport
+   .stream(withTs.spliterator(), false)
+   .filter(this::unexpiredAndUpdateOrCleanup)
+   .map(TtlMapState::dropTs);
+   }
+
+   private boolean unexpiredAndUpdateOrCleanup(Map.Entry> 
e) {
+   UV unexpiredValue;
+   try {
+   unexpiredValue = getWithTtlCheckAndUpdate(
+   e::getValue,
+   v -> original.put(e.getKey(), v),
+   () -> original.remove(e.getKey()));
+   } catch (Exception ex) {
+   throw new FlinkRuntimeException(ex);
+   }
+   return unexpiredValue != null;
+   }
+
+   private static  Map.Entry dropTs(Map.Entry> e) {
--- End diff --

Again, to keep it more clear I would spell out `dropTs` as `unwrapTtlState` 
or something similar.


> Create wrapper with TTL logic for value state
> -

[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520540#comment-16520540
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197494851
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java 
---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps map state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry key of state with TTL
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlMapState
+   extends AbstractTtlState, Map>, 
InternalMapState>>
+   implements InternalMapState {
+   TtlMapState(
+   InternalMapState> original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(original, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public UV get(UK key) throws Exception {
+   return getWithTtlCheckAndUpdate(() -> original.get(key), v -> 
original.put(key, v), () -> original.remove(key));
+   }
+
+   @Override
+   public void put(UK key, UV value) throws Exception {
+   original.put(key, wrapWithTs(value));
+   }
+
+   @Override
+   public void putAll(Map map) throws Exception {
+   if (map == null) {
+   return;
+   }
+   Map> ttlMap = map.entrySet().stream()
+   .collect(Collectors.toMap(Map.Entry::getKey, e -> 
wrapWithTs(e.getValue(;
+   original.putAll(ttlMap);
+   }
+
+   @Override
+   public void remove(UK key) throws Exception {
+   original.remove(key);
+   }
+
+   @Override
+   public boolean contains(UK key) throws Exception {
+   return get(key) != null;
+   }
+
+   @Override
+   public Iterable> entries() throws Exception {
+   return entriesStream()::iterator;
+   }
+
+   private Stream> entriesStream() throws Exception {
+   Iterable>> withTs = 
original.entries();
+   withTs = withTs == null ? Collections.emptyList() : withTs;
+   return StreamSupport
+   .stream(withTs.spliterator(), false)
--- End diff --

I saw that some of the classes make heavy use of streams by getting 
spliterators from collections. While the code is concise, this creates some 
default adapter from iterator to spliterator under the hood. IIRC this can have 
significant performance impact, especially if used with hot code paths. State 
access in Flink can be considered a hot path for some cases. It is hard to 
quantify the impact just from looking at it, but when using this kind of api 
adapters in "low-level" classes, please be aware of the potential impact. We 
might want to have a look at the performance and tune if needed. Should be ok 
for now because there is no regression in existing code, but we might want to 
measure this for heap based state eventually.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.

[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520498#comment-16520498
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197482482
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+   final long ttl;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   this.ttl = config.getTtl().toMilliseconds();
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
+   }
+
+TtlValue wrapWithTs(V value) {
+   return wrapWithTs(value, newExpirationTimestamp());
+   }
+
+   static  TtlValue wrapWithTs(V value, long ts) {
+   return value == null ? null : new TtlValue<>(value, ts);
+   }
+
+TtlValue rewrapWithNewTs(TtlValue ttlValue) {
+   return wrapWithTs(ttlValue.getUserValue());
+   }
+
+   private long newExpirationTimestamp() {
+   long currentTs = timeProvider.currentTimestamp();
+   long ttlWithoutOverflow = currentTs > 0 ? 
Math.min(Long.MAX_VALUE - currentTs, ttl) : ttl;
+   return currentTs + ttlWithoutOverflow;
+   }
+
+
V getWithTtlCheckAndUpdate(
--- End diff --

Seems also like this method almost fits better into `AbstractTtlState`, for 
example you can access `clear()` directly.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V 

[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520492#comment-16520492
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197480035
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
--- End diff --

I wonder it is better to have this as abstract base class or prefer 
composition over inheritance for `AbstractTtlState`. It has no abstract methods 
and also no methods are overriden by subclasses, it could as well just be a 
reference of `AbstractTtlState` and not abstract.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520486#comment-16520486
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197477831
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateVisibility.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+/**
+ * This option configures whether to return expired user value or not.
--- End diff --

"can" be returned. we accept it in relaxed, we don't enforce it.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520477#comment-16520477
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197476710
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateVisibility.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+/**
+ * This option configures whether to return expired user value or not.
+ */
+public enum TtlStateVisibility {
+   /** Return still available expired user value (not yet cleaned up). */
+   Relaxed,
--- End diff --

Would explain this different in the comment. It means that expired state 
can be returned if it is not yet cleaned up.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520478#comment-16520478
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197476858
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateVisibility.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+/**
+ * This option configures whether to return expired user value or not.
+ */
+public enum TtlStateVisibility {
+   /** Return still available expired user value (not yet cleaned up). */
+   Relaxed,
+   /** Hide expired user value and behave as if it does not exist any 
more. */
+   Exact
--- End diff --

And here: expired state is never returned to the user. I think it does not 
matter if it is just hidden, or deleted.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520474#comment-16520474
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197475873
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUpdateType.java
 ---
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+/**
+ * This option value configures when to prolong state TTL.
+ */
+public enum TtlUpdateType {
+   /** TTL is disabled. State does not expire. */
+   Disabled,
--- End diff --

Seems like this is never needed? Why would somebody register TTL state and 
then declare it disabled?


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520469#comment-16520469
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197475080
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUpdateType.java
 ---
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+/**
+ * This option value configures when to prolong state TTL.
+ */
+public enum TtlUpdateType {
--- End diff --

Why not better declare the enums in the `TtlConfig` class where they belong 
to?


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520471#comment-16520471
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197475245
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeCharacteristic.java
 ---
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+/**
+ * This option configures time scale to use for ttl.
+ */
+public enum TtlTimeCharacteristic {
--- End diff --

Same here.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520470#comment-16520470
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197475154
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateVisibility.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+/**
+ * This option configures whether to return expired user value or not.
+ */
+public enum TtlStateVisibility {
--- End diff --

Same here.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520464#comment-16520464
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197474567
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java 
---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Configuration of state TTL logic.
+ * TODO: builder
--- End diff --

Was this forgotten or planned for a later commit?


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520409#comment-16520409
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197460522
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+   final long ttl;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   this.ttl = config.getTtl().toMilliseconds();
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
+   }
+
+TtlValue wrapWithTs(V value) {
--- End diff --

Better `wrapWithTtl`


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520406#comment-16520406
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197459764
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+   final long ttl;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   this.ttl = config.getTtl().toMilliseconds();
+   }
+
+V getUnexpried(TtlValue ttlValue) {
--- End diff --

This method does two things: checking the ttl and unwrapping the value. I 
would make this as two methods to separate concerns and make it more readable.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520401#comment-16520401
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197458219
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
--- End diff --

A bit more field comments might be good, especially about the exact 
meanings of those boolean flags. For example, it is not super obvious what 
`returnExpired` means for the code.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520397#comment-16520397
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197457017
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+   final long ttl;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   this.ttl = config.getTtl().toMilliseconds();
+   }
+
+V getUnexpried(TtlValue ttlValue) {
--- End diff --

typo in method name


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520159#comment-16520159
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on the issue:

https://github.com/apache/flink/pull/6186
  
Thanks for review @sihuazhou, I think now all concerns should have been 
addressed.
CI failure is unrelated, works in [my CI for the same 
commit](https://travis-ci.org/azagrebin/flink/builds/395137069?utm_source=github_status&utm_medium=notification).


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520168#comment-16520168
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6186
  
@azagrebin thanks for addressing the concerns, looks good from my side now. 
let's wait for @StefanRRichter 's review.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519551#comment-16519551
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197192870
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java 
---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Configuration of state TTL logic.
+ * TODO: builder
+ */
+public class TtlConfig {
+   private final TtlUpdateType ttlUpdateType;
+   private final TtlStateVisibility stateVisibility;
+   private final TtlTimeCharacteristic timeCharacteristic;
+   private final Time ttl;
+
+   public TtlConfig(
+   TtlUpdateType ttlUpdateType,
+   TtlStateVisibility stateVisibility,
+   TtlTimeCharacteristic timeCharacteristic,
+   Time ttl) {
+   Preconditions.checkNotNull(ttlUpdateType);
+   Preconditions.checkNotNull(stateVisibility);
+   Preconditions.checkNotNull(timeCharacteristic);
+   Preconditions.checkArgument(ttl.toMilliseconds() >= 0,
--- End diff --

Maybe we should pre check the `ttl` is not null, and I wonder does the 
`ttl.toMilliseconds() == 0` would make any sense?


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519539#comment-16519539
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197188305
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class wraps user value of state with TTL.
+ *
+ * @param  Type of the user value of state with TTL
+ */
+class TtlValue implements Serializable {
+   private final T userValue;
+   private final long expirationTimestamp;
--- End diff --

Okay, I see this is tricky, I agree that this should be addressed in 
another PR. We need to figure out a proper way to do that.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519511#comment-16519511
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197179238
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class wraps user value of state with TTL.
+ *
+ * @param  Type of the user value of state with TTL
+ */
+class TtlValue implements Serializable {
+   private final T userValue;
+   private final long expirationTimestamp;
--- End diff --

This operation fits more for checkpoint full scan restoration with custom 
transformation of each state entry where expiration timestamp is optionally 
prolonged for downtime. The same as cleanup of expired state during full scan. 
I think it should be another issue and PR, because how can wrappers distinguish 
between old and new state before and after restart?


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519301#comment-16519301
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197114442
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class wraps user value of state with TTL.
+ *
+ * @param  Type of the user value of state with TTL
+ */
+class TtlValue implements Serializable {
+   private final T userValue;
+   private final long expirationTimestamp;
--- End diff --

Additional, If we won't do the time align works on recovery, then what is 
the safe `TTL` value we should set for the a job? (this is the question that 
the users always ask us when they trying to use the `TTL`(we implemented it in 
a hacking way based on `TtlDB`) to control the state's size)


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519288#comment-16519288
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197111980
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class wraps user value of state with TTL.
+ *
+ * @param  Type of the user value of state with TTL
+ */
+class TtlValue implements Serializable {
+   private final T userValue;
+   private final long expirationTimestamp;
--- End diff --

I'm really not sure whether we should leave it for now. If we leave it for 
now, then it will be a headache problem on practical production.

As a very common situation there is a job, which reading data from kafka, 
and the user set the `TTL = 2hours` because he thinks that the data's latency 
is absolute less than 2 hours, this way they can use the TTL to safely control 
the whole state size, and got a exactly result. But, if he found that the job 
need to scale up, then he need to trigger a savepoint and rescale the job from 
it. but what if there's some problems that stop he recovering the job from the 
savepoint in a very short time, let's say he will took 30min to recover the 
job, then the result become inaccuracy.

Even the user never need to trigger a savepoint for any reason, what if the 
job means some problem(maybe some problem with some machine) and loop in 
"failed-restart-failed-..", after 2 hours we fixed the problem and the job 
automatically resume, but the state has all been expired. I think this is a 
disaster for the user.

Yes, when using the `EventTime` people this problem won't help, but the 
`ProccessTime` is a very common use case(In our production, most of the job's 
TimeCharacter is `ProccessTime`).

I know Flink's TimeService also didn't do the time align works on recovery, 
but state's TTL is a bit different with Timer. When registering a timer, what 
users offer to the API is a absolute time, but when setting the TTL, what users 
offer is just a relative time, it's us that convert the relative time to a 
absolute time to implement the TTL.



> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519279#comment-16519279
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197110068
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
+   return StreamSupport.stream(iterable.spliterator(), 
false).collect(Collectors.toList());
+   }
+
+   @Override
+   public void updateInternal(List valueToStore) throws Exception {
+   Preconditions.checkNotNull(valueToStore, "List of values to 
update cannot be null.");
+   original.addAll(withTs(valu

[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519277#comment-16519277
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197109577
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
+   }
+
+TtlValue wrapWithTs(V value) {
+   return wrapWithTs(value, newExpirationTimestamp());
+   }
+
+   static  TtlValue wrapWithTs(V value, long ts) {
+   return value == null ? null : new TtlValue<>(value, ts);
+   }
+
+TtlValue rewrapWithNewTs(TtlValue ttlValue) {
+   return wrapWithTs(ttlValue.getUserValue());
+   }
+
+   private long newExpirationTimestamp() {
+   long currentTs = timeProvider.currentTimestamp();
+   long ttl = config.getTtl().toMilliseconds();
--- End diff --

I think it will be optimised by JIT as an only usage, but I will do just in 
case


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by 

[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519276#comment-16519276
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197108498
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
--- End diff --

I think this is out of TTL scope now. Merge namespace methods are not part 
of API which immediately face user. It is used only internally for windowing, 
not sure if it needs TTL ever. Anyways read part of API should eventually evict 
expired state, merged or not. For aggregating states, merge already uses 
wrapped reading methods from aggregate functions underneath which should also 
speed up cleanup if ever used with `mergeNamespaces`.


> Create wrapper with TTL logic for value state
> 

[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519247#comment-16519247
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197102118
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
--- End diff --

Well, not sure, it matters at this point, it is private and used just once 
with a collected `LIst`. My thinking was that it is less verbose: 
`list.stream()` vs `StreamSupport.stream(iterable.spliterator(), false)`.


> Create wrapper with TTL logic for value state
> -
>
>

[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519187#comment-16519187
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197080576
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
--- End diff --

If we need to "update on read", then here is a bit confusion to me. 
Currently we attach TTL for every list item, so the "update on read" should 
scope to the list item, not the whole list. So, it makes me feel that an 
`iterable` for `updateTs` seems more reasonable. What do you think?


> Create wrapper with TTL logic for valu

[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519186#comment-16519186
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197079917
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class wraps user value of state with TTL.
+ *
+ * @param  Type of the user value of state with TTL
+ */
+class TtlValue implements Serializable {
+   private final T userValue;
+   private final long expirationTimestamp;
--- End diff --

I think it is a bit out of score for this PR. I thought about this concern 
but semantics of processing time is similar of real clock time and it does not 
stop if job is stopped. There is also event time option with more control over 
time. I would leave it for now. This is more about user migration of 
checkpoints. We can add a comment/open question to design doc about it.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519178#comment-16519178
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197077377
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
--- End diff --

In general the idea is to keep `get()` method as lazy as possible, return 
`Iterable` and avoid querying backend if not needed. At the moment list state 
internally stores `List` according to API, not `Iterable` (something to think 
about in future). So `getInternal()` returns `List` - collected `Iterable`. I 
agree it looks wei

[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518896#comment-16518896
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196998472
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
--- End diff --

If we've called `getInterval()` in `get()`, and make the `updateTs()` to 
accept `Iterable`, then this method seems could be removed(Or at least, we 
should add a check for if the `iterable` is assignable from `List`, if true we 
could cast it to List and return immediately).


> Create wrapper with TTL logic for value state
> -

[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518893#comment-16518893
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197001110
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
+
+import java.util.Collection;
+
+/**
+ * This class wraps reducing state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user value of state with TTL
+ */
+class TtlReducingState
+   extends AbstractTtlState, InternalReducingState>>
+   implements InternalReducingState {
+   TtlReducingState(
+   InternalReducingState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public T get() throws Exception {
+   return getInternal();
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   original.add(wrapWithTs(value, Long.MAX_VALUE));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
--- End diff --

Again, Should we also do the TTL check for original.mergeNamespaces()? 
Since we need to query the state when merging namespaces.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518892#comment-16518892
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196996839
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
--- End diff --

In this block, we need to iterate the `ttlValue` twice, one for `collect()` 
and one for `updateTs()`. If we could make the updateTs to accept `Iterable` as 
the argument, then we can avoiding the `collect()` here, this way we only need 
to iterate the `ttlValue` once.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518898#comment-16518898
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197001339
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+
+import java.util.Collection;
+
+/**
+ * This class wraps aggregating state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the value added to the state
+ * @param  The type of the accumulator (intermediate aggregate state).
+ * @param  Type of the value extracted from the state
+ *
+ */
+class TtlAggregatingState
+   extends AbstractTtlState, 
InternalAggregatingState, OUT>>
+   implements InternalAggregatingState {
+
+   TtlAggregatingState(
+   InternalAggregatingState, OUT> 
originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer valueSerializer,
+   TtlAggregateFunction aggregateFunction) {
+   super(originalState, config, timeProvider, valueSerializer);
+   aggregateFunction.stateClear = originalState::clear;
+   aggregateFunction.updater = originalState::updateInternal;
+   }
+
+   @Override
+   public OUT get() throws Exception {
+   return original.get();
+   }
+
+   @Override
+   public void add(IN value) throws Exception {
+   original.add(value);
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public ACC getInternal() throws Exception {
+   return getWithTtlCheckAndUpdate(original::getInternal, 
original::updateInternal);
+   }
+
+   @Override
+   public void updateInternal(ACC valueToStore) throws Exception {
+   original.updateInternal(wrapWithTs(valueToStore));
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
--- End diff --

Should we also do the TTL check for original.mergeNamespaces()? Since we 
need to query the state when merging namespaces.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518894#comment-16518894
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196995820
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
--- End diff --

Again, should we also do the `TTL` check for `original.mergeNamespaces()`? 
Since we need to query the state when merging namespaces.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>

[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518897#comment-16518897
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197004891
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class wraps user value of state with TTL.
+ *
+ * @param  Type of the user value of state with TTL
+ */
+class TtlValue implements Serializable {
+   private final T userValue;
+   private final long expirationTimestamp;
--- End diff --

The `expirationTimestamp` is an absolute timestamp, should we do the 
timestamp shift for `TtlValue` when checkpoint & recovery? For example, when 
user using the `ProcessTime` as the TimeCharacater, and set the `TTL = 10min`. 
For some reason, he triggers a savepoint, and after 11 min he recover the job 
from the savepoint, if we don't do the timestamp shift, then all the state will 
be expired.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518895#comment-16518895
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196996094
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
--- End diff --

This looks a bit weird, my gut feeling is that we should call 
`getInternal()` in `get()`(as we called `updateInternal()` in `update()` in 
this class), but here is reverse.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Pro

[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518810#comment-16518810
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196995095
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java 
---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Configuration of state TTL logic.
+ * TODO: builder
+ */
+public class TtlConfig {
+   private final TtlUpdateType ttlUpdateType;
+   private final TtlStateVisibility stateVisibility;
+   private final TtlTimeCharacteristic timeCharacteristic;
+   private final Time ttl;
+
+   public TtlConfig(
+   TtlUpdateType ttlUpdateType,
+   TtlStateVisibility stateVisibility,
+   TtlTimeCharacteristic timeCharacteristic,
+   Time ttl) {
+   Preconditions.checkNotNull(ttlUpdateType);
+   Preconditions.checkNotNull(stateVisibility);
+   Preconditions.checkNotNull(timeCharacteristic);
--- End diff --

Maybe we should also check that the `ttl` is greater than 0?


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518274#comment-16518274
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196827863
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
--- End diff --

Oh sorry, my bad, I'm misunderstand...


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518267#comment-16518267
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196826339
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
--- End diff --

The `ttlValue` is changed before the lambda from return statement so it is 
not effectively immutable any more to be used in lambda, that is why 
`finalResult` is formally needed to avoid compilation error.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518244#comment-16518244
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196820474
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
+   }
+
+TtlValue wrapWithTs(V value) {
+   return wrapWithTs(value, newExpirationTimestamp());
+   }
+
+   static  TtlValue wrapWithTs(V value, long ts) {
+   return value == null ? null : new TtlValue<>(value, ts);
+   }
+
+TtlValue rewrapWithNewTs(TtlValue ttlValue) {
+   return wrapWithTs(ttlValue.getUserValue());
+   }
+
+   private long newExpirationTimestamp() {
+   long currentTs = timeProvider.currentTimestamp();
+   long ttl = config.getTtl().toMilliseconds();
--- End diff --

This will be called a lot often, so does it make sense to introduce a field 
to remember the `config.getTtl().toMilliseconds()`?


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by

[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518243#comment-16518243
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196809755
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
+   }
+
+TtlValue wrapWithTs(V value) {
+   return wrapWithTs(value, newExpirationTimestamp());
+   }
+
+   static  TtlValue wrapWithTs(V value, long ts) {
+   return value == null ? null : new TtlValue<>(value, ts);
+   }
+
+TtlValue rewrapWithNewTs(TtlValue ttlValue) {
+   return wrapWithTs(ttlValue.getUserValue());
+   }
+
+   private long newExpirationTimestamp() {
+   return timeProvider.currentTimestamp() + 
config.getTtl().toMilliseconds();
--- End diff --

This will be called a lot often, so does it make sense to introduce a field 
to remember the `config.getTtl().toMilliseconds()`?


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> 

[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518242#comment-16518242
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196816259
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
+   return StreamSupport.stream(iterable.spliterator(), 
false).collect(Collectors.toList());
+   }
+
+   @Override
+   public void updateInternal(List valueToStore) throws Exception {
+   Preconditions.checkNotNull(valueToStore, "List of values to 
update cannot be null.");
+   original.addAll(withTs(valu

[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518241#comment-16518241
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196817846
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
--- End diff --

The var `finalResult` looks like redundant or I'm misunderstand.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518240#comment-16518240
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196819320
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
--- End diff --

In general Flink allows to operate in negative range for event time, but 
the overflow in case of very big TTL should be checked. TTL makes sense only 
non-negative. I have added fix for it.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518200#comment-16518200
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196791555
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java 
---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Configuration of state TTL logic.
+ * TODO: builder
+ */
+public class TtlConfig {
+   private final TtlUpdateType ttlUpdateType;
+   private final TtlStateVisibility stateVisibility;
+   private final TtlTimeCharacteristic timeCharacteristic;
+   private final Time ttl;
+
+   public TtlConfig(
+   TtlUpdateType ttlUpdateType,
+   TtlStateVisibility stateVisibility,
+   TtlTimeCharacteristic timeCharacteristic,
+   Time ttl) {
+   Preconditions.checkNotNull(ttlUpdateType);
+   Preconditions.checkNotNull(stateVisibility);
+   Preconditions.checkNotNull(timeCharacteristic);
--- End diff --

Why not checking for `ttl`?


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518155#comment-16518155
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196786370
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
--- End diff --

Does it make sense to never expire the value when the 
`ttValue.getExpirationTimestamp()` return negative?


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518152#comment-16518152
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196784275
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
--- End diff --

This looks like a bit problematic, because the 
`ttlValue.getExpirationTimestamp()` might be negative. E.g when the user 
provide `Long.MAX_VALUE` as the TTL value, what he expected is that the value 
should never be expired, but according to the current code, it will immediately 
expired.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518002#comment-16518002
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on the issue:

https://github.com/apache/flink/pull/6186
  
cc @StefanRRichter 


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518001#comment-16518001
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

GitHub user azagrebin opened a pull request:

https://github.com/apache/flink/pull/6186

[FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

## What is the purpose of the change

This PR introduces TTL logic wrappers for state objects.

## Brief change log

Added
  - sketch of TtlConfig
  - AbstractTtlWrapper and AbstractTtlState
  - concrete TTL wrappers for state objects

## Verifying this change

Unit tests for state objects TTL wrappers

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (not applicable at this step)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/azagrebin/flink FLINK-9515

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6186.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6186


commit 62faa8ee220c21fa824fec690073c27a0a994be5
Author: Andrey Zagrebin 
Date:   2018-06-04T15:28:40Z

[FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers




> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)