[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-07-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532509#comment-16532509
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6196


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom TypeInfo
>  originalStateWithTtl = binder.createValueState(valueDesc);
>      return new TtlValueState(originalStateWithTtl, timeProvider);
> }
>   // List, Map, ...
> }
> {code}
> TTL serializer should add expiration timestamp



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-07-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532426#comment-16532426
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6196
  
I found there is still a small issue with the equals/hashCode but will just 
fix it before merging.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom TypeInfo
>  originalStateWithTtl = binder.createValueState(valueDesc);
>      return new TtlValueState(originalStateWithTtl, timeProvider);
> }
>   // List, Map, ...
> }
> {code}
> TTL serializer should add expiration timestamp



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-07-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532419#comment-16532419
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6196
  
LGTM 👍 merging.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom TypeInfo
>  originalStateWithTtl = binder.createValueState(valueDesc);
>      return new TtlValueState(originalStateWithTtl, timeProvider);
> }
>   // List, Map, ...
> }
> {code}
> TTL serializer should add expiration timestamp



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531726#comment-16531726
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199894398
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -272,4 +254,60 @@ public int getVersion() {
previousSerializersAndConfigs.get(index).f0, 
UnloadableDummyTypeSerializer.class,
previousSerializersAndConfigs.get(index).f1, 
fieldSerializers[index]);
}
+
+   /** This class holds composite serializer parameters which can be 
precomputed in advanced for better performance. */
+   protected static class PrecomputedParameters implements Serializable {
+   /** Whether target type is immutable. */
+   final boolean immutableTargetType;
+
+   /** Whether target type and its fields are immutable. */
+   final boolean immutable;
+
+   /** Byte length of target object in serialized form. */
+   private final int length;
+
+   /** Whether any field serializer is stateful. */
+   final boolean stateful;
+
+   final int hashCode;
--- End diff --

I wonder if this should be `transient` in a serializable class, the hash 
code could be based on object identity.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom TypeInfo
>  originalStateWithTtl = binder.createValueState(valueDesc);
>      return new TtlValueState(originalStateWithTtl, timeProvider);
> }
>   // List, Map, ...
> }
> {code}
> TTL serializer should add expiration timestamp



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531724#comment-16531724
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199894217
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -272,4 +254,60 @@ public int getVersion() {
previousSerializersAndConfigs.get(index).f0, 
UnloadableDummyTypeSerializer.class,
previousSerializersAndConfigs.get(index).f1, 
fieldSerializers[index]);
}
+
+   /** This class holds composite serializer parameters which can be 
precomputed in advanced for better performance. */
+   protected static class PrecomputedParameters implements Serializable {
--- End diff --

If this is serializable, we should add a version uid.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom TypeInfo
>  originalStateWithTtl = binder.createValueState(valueDesc);
>      return new TtlValueState(originalStateWithTtl, timeProvider);
> }
>   // List, Map, ...
> }
> {code}
> TTL serializer should add expiration timestamp



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531285#comment-16531285
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6196
  
Had a few more comments, but they all are basically optimizations. I leave 
it up to you if you still want to address all or some of them. Please let me 
know. Otherwise, we can merge this. 👍 


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom TypeInfo
>  originalStateWithTtl = binder.createValueState(valueDesc);
>      return new TtlValueState(originalStateWithTtl, timeProvider);
> }
>   // List, Map, ...
> }
> {code}
> TTL serializer should add expiration timestamp



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531277#comment-16531277
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199783587
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a composite type using array of its field 
serializers.
+ * Fields are indexed the same way as their serializers.
+ *
+ * @param  type of custom serialized value
+ */
+public abstract class CompositeSerializer extends TypeSerializer {
+   private static final long serialVersionUID = 1L;
+
+   /** Serializers for fields which constitute T. */
+   protected final TypeSerializer[] fieldSerializers;
+
+   /** Whether T is an immutable type. */
+   final boolean immutableTargetType;
+
+   /** Byte length of target object in serialized form. */
+   private final int length;
+
+   /** Whether any field serializer is stateful. */
+   private final boolean stateful;
+
+   private final int hashCode;
+
+   @SuppressWarnings("unchecked")
+   protected CompositeSerializer(boolean immutableTargetType, 
TypeSerializer ... fieldSerializers) {
+   Preconditions.checkNotNull(fieldSerializers);
+   
Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
+   this.immutableTargetType = immutableTargetType &&
+   
Arrays.stream(fieldSerializers).allMatch(TypeSerializer::isImmutableType);
+   this.fieldSerializers = (TypeSerializer[]) 
fieldSerializers;
+   this.length = calcLength();
+   this.stateful = isStateful();
+   this.hashCode = Arrays.hashCode(fieldSerializers);
+   }
+
+   private boolean isStateful() {
+   TypeSerializer[] duplicatedSerializers = 
duplicateFieldSerializers();
+   return IntStream.range(0, fieldSerializers.length)
+   .anyMatch(i -> fieldSerializers[i] != 
duplicatedSerializers[i]);
+   }
+
+   /** Create new instance from its fields.  */
+   public abstract T createInstance(@Nonnull Object ... values);
+
+   /** Modify field of existing instance. Supported only by mutable types. 
*/
+   protected abstract void setField(@Nonnull T value, int index, Object 
fieldValue);
+
+   /** Get field of existing instance. */
+   protected abstract Object getField(@Nonnull T value, int index);
+
+   /** Factory for concrete serializer. */
+   protected abstract CompositeSerializer 
createSerializerInstance(TypeSerializer ... originalSerializers);
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return stateful ? 
createSerializerInstance(duplicateFieldSerializers()) : this;
--- End diff --

Another small point here for `createSerializerInstance(...)`: we have no 
(non-public) constructor that can also take all boolean flags, length, and 
(maybe) hash directly. So if we copy the serializer, I guess it always goes 
through the whole process again to figure this out, but we could just copy it 
from the previous instance.


> Wrap state binder with TTL logic
> --

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531270#comment-16531270
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199782860
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a composite type using array of its field 
serializers.
+ * Fields are indexed the same way as their serializers.
+ *
+ * @param  type of custom serialized value
+ */
+public abstract class CompositeSerializer extends TypeSerializer {
+   private static final long serialVersionUID = 1L;
+
+   /** Serializers for fields which constitute T. */
+   protected final TypeSerializer[] fieldSerializers;
+
+   /** Whether T is an immutable type. */
+   final boolean immutableTargetType;
+
+   /** Byte length of target object in serialized form. */
+   private final int length;
+
+   /** Whether any field serializer is stateful. */
+   private final boolean stateful;
+
+   private final int hashCode;
+
+   @SuppressWarnings("unchecked")
+   protected CompositeSerializer(boolean immutableTargetType, 
TypeSerializer ... fieldSerializers) {
+   Preconditions.checkNotNull(fieldSerializers);
+   
Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
+   this.immutableTargetType = immutableTargetType &&
+   
Arrays.stream(fieldSerializers).allMatch(TypeSerializer::isImmutableType);
+   this.fieldSerializers = (TypeSerializer[]) 
fieldSerializers;
+   this.length = calcLength();
+   this.stateful = isStateful();
+   this.hashCode = Arrays.hashCode(fieldSerializers);
+   }
+
+   private boolean isStateful() {
+   TypeSerializer[] duplicatedSerializers = 
duplicateFieldSerializers();
--- End diff --

The flag for `isStateful()` is the only one that I suggested as a candidate 
for lazy init when `duplicate()` is called for the first time. Reason is that 
duplicating some types of inner serializers can sometimes be a bit expensive. 
But again, I feel that this can also be changed in followup work, if needed.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerSe

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531265#comment-16531265
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199782303
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a composite type using array of its field 
serializers.
+ * Fields are indexed the same way as their serializers.
+ *
+ * @param  type of custom serialized value
+ */
+public abstract class CompositeSerializer extends TypeSerializer {
+   private static final long serialVersionUID = 1L;
+
+   /** Serializers for fields which constitute T. */
+   protected final TypeSerializer[] fieldSerializers;
+
+   /** Whether T is an immutable type. */
+   final boolean immutableTargetType;
+
+   /** Byte length of target object in serialized form. */
+   private final int length;
+
+   /** Whether any field serializer is stateful. */
+   private final boolean stateful;
+
+   private final int hashCode;
+
+   @SuppressWarnings("unchecked")
+   protected CompositeSerializer(boolean immutableTargetType, 
TypeSerializer ... fieldSerializers) {
+   Preconditions.checkNotNull(fieldSerializers);
+   
Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
+   this.immutableTargetType = immutableTargetType &&
+   
Arrays.stream(fieldSerializers).allMatch(TypeSerializer::isImmutableType);
+   this.fieldSerializers = (TypeSerializer[]) 
fieldSerializers;
+   this.length = calcLength();
+   this.stateful = isStateful();
+   this.hashCode = Arrays.hashCode(fieldSerializers);
--- End diff --

I think up to this point, the code is iterating `fieldSerializers` 5 times 
(null checks, immutable check, length calc, stateful check, and hash code 
computation. It could be done in one iteration, but since this method should 
typically not be called in hot loops, this is an optional improvement.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-07-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531159#comment-16531159
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user azagrebin commented on the issue:

https://github.com/apache/flink/pull/6196
  
@StefanRRichter 
I added more precomputed fields to `CompositeSerializer` constructor and 
included `TtlStateFactory` in TTL tests.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom TypeInfo
>  originalStateWithTtl = binder.createValueState(valueDesc);
>      return new TtlValueState(originalStateWithTtl, timeProvider);
> }
>   // List, Map, ...
> }
> {code}
> TTL serializer should add expiration timestamp



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-07-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529828#comment-16529828
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199485075
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a composite type using array of its field 
serializers.
+ * Fields are indexed the same way as their serializers.
+ *
+ * @param  type of custom serialized value
+ */
+public abstract class CompositeSerializer extends TypeSerializer {
+   private static final long serialVersionUID = 1L;
+
+   protected final TypeSerializer[] fieldSerializers;
+   final boolean isImmutableTargetType;
--- End diff --

I think in Java code style, a boolean field name should not be prefixed 
with `is...`, only the getter should be prefixed with `is...`


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom TypeInfo
>  originalStateWithTtl = binder.createValueState(valueDesc);
>      return new TtlValueState(originalStateWithTtl, timeProvider);
> }
>   // List, Map, ...
> }
> {code}
> TTL serializer should add expiration timestamp



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-07-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529800#comment-16529800
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6196
  
I had a few more comments, in particular some improvements for the new 
serializer. I think when those are addressed this is good to merge.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom TypeInfo
>  originalStateWithTtl = binder.createValueState(valueDesc);
>      return new TtlValueState(originalStateWithTtl, timeProvider);
> }
>   // List, Map, ...
> }
> {code}
> TTL serializer should add expiration timestamp



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-07-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529797#comment-16529797
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199477483
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+@SuppressWarnings("unchecked")
+public class TtlStateFactory {
--- End diff --

It might make sense to also have a test when we introduce a new class that 
provides some kind of "service", e.g. to check that all the types are correctly 
mapped and also prevent that somebody breaks the mapping by accident.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom TypeInfo
>  originalStateWithTtl = binder.createValueState(valueDesc);
>      return new TtlValueState(originalStateWithTtl, timeProvider);
> }
>   // List, Map, ...
> }
> {code}
> TTL serializer should add expiration timestamp



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-07-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529794#comment-16529794
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199476870
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+@SuppressWarnings("unchecked")
--- End diff --

I would not suppress warnings in the scope of a full class, better more 
fine grained on methods.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom TypeInfo
>  originalStateWithTtl = binder.createValueState(valueDesc);
>      return new TtlValueState(originalStateWithTtl, timeProvider);
> }
>   // List, Map, ...
> }
> {code}
> TTL serializer should add expiration timestamp



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-07-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529780#comment-16529780
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199474873
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a composite type using array of its field 
serializers.
+ * Fields are indexed the same way as their serializers.
+ *
+ * @param  type of custom serialized value
+ */
+public abstract class CompositeSerializer extends TypeSerializer {
+   private static final long serialVersionUID = 1L;
+
+   protected final TypeSerializer[] fieldSerializers;
+   final boolean isImmutableTargetType;
+   private final int length;
+
+   @SuppressWarnings("unchecked")
+   protected CompositeSerializer(boolean isImmutableTargetType, 
TypeSerializer ... fieldSerializers) {
+   Preconditions.checkNotNull(fieldSerializers);
+   
Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
+   this.isImmutableTargetType = isImmutableTargetType;
+   this.fieldSerializers = (TypeSerializer[]) 
fieldSerializers;
+   this.length = calcLength();
+   }
+
+   /** Create new instance from its fields.  */
+   public abstract T createInstance(@Nonnull Object ... values);
+
+   /** Modify field of existing instance. Supported only by mutable types. 
*/
+   protected abstract void setField(@Nonnull T value, int index, Object 
fieldValue);
+
+   /** Get field of existing instance. */
+   protected abstract Object getField(@Nonnull T value, int index);
+
+   /** Factory for concrete serializer. */
+   protected abstract CompositeSerializer 
createSerializerInstance(TypeSerializer ... originalSerializers);
+
+   @Override
+   public CompositeSerializer duplicate() {
+   TypeSerializer[] duplicatedSerializers = new 
TypeSerializer[fieldSerializers.length];
+   boolean stateful = false;
+   for (int index = 0; index < fieldSerializers.length; index++) {
+   duplicatedSerializers[index] = 
fieldSerializers[index].duplicate();
+   if (fieldSerializers[index] != 
duplicatedSerializers[index]) {
+   stateful = true;
+   }
+   }
+   return stateful ? 
createSerializerInstance(duplicatedSerializers) : this;
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   for (TypeSerializer fieldSerializer : fieldSerializers) 
{
--- End diff --

We can compute many things like length, immutability, etc already in the 
constructor. Statelessness is the one thing that we might want to figure out 
and remember on the first attempt.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Prior

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-07-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529776#comment-16529776
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199474421
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a composite type using array of its field 
serializers.
+ * Fields are indexed the same way as their serializers.
+ *
+ * @param  type of custom serialized value
+ */
+public abstract class CompositeSerializer extends TypeSerializer {
+   private static final long serialVersionUID = 1L;
+
+   protected final TypeSerializer[] fieldSerializers;
+   final boolean isImmutableTargetType;
+   private final int length;
+
+   @SuppressWarnings("unchecked")
+   protected CompositeSerializer(boolean isImmutableTargetType, 
TypeSerializer ... fieldSerializers) {
+   Preconditions.checkNotNull(fieldSerializers);
+   
Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
+   this.isImmutableTargetType = isImmutableTargetType;
+   this.fieldSerializers = (TypeSerializer[]) 
fieldSerializers;
+   this.length = calcLength();
+   }
+
+   /** Create new instance from its fields.  */
+   public abstract T createInstance(@Nonnull Object ... values);
+
+   /** Modify field of existing instance. Supported only by mutable types. 
*/
+   protected abstract void setField(@Nonnull T value, int index, Object 
fieldValue);
+
+   /** Get field of existing instance. */
+   protected abstract Object getField(@Nonnull T value, int index);
+
+   /** Factory for concrete serializer. */
+   protected abstract CompositeSerializer 
createSerializerInstance(TypeSerializer ... originalSerializers);
+
+   @Override
+   public CompositeSerializer duplicate() {
+   TypeSerializer[] duplicatedSerializers = new 
TypeSerializer[fieldSerializers.length];
+   boolean stateful = false;
+   for (int index = 0; index < fieldSerializers.length; index++) {
+   duplicatedSerializers[index] = 
fieldSerializers[index].duplicate();
+   if (fieldSerializers[index] != 
duplicatedSerializers[index]) {
+   stateful = true;
+   }
+   }
+   return stateful ? 
createSerializerInstance(duplicatedSerializers) : this;
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   for (TypeSerializer fieldSerializer : fieldSerializers) 
{
+   if (!fieldSerializer.isImmutableType()) {
+   return false;
+   }
+   }
+   return isImmutableTargetType;
+   }
+
+   @Override
+   public T createInstance() {
+   Object[] fields = new Object[fieldSerializers.length];
+   for (int index = 0; index < fieldSerializers.length; index++) {
+   fields[index] = 
fieldSerializers[index].createInstance();
+   }
+   return createInstance(fields);
+   }
+
+   @Override
+   public T copy(T from) 

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-07-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529777#comment-16529777
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199474494
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a composite type using array of its field 
serializers.
+ * Fields are indexed the same way as their serializers.
+ *
+ * @param  type of custom serialized value
+ */
+public abstract class CompositeSerializer extends TypeSerializer {
+   private static final long serialVersionUID = 1L;
+
+   protected final TypeSerializer[] fieldSerializers;
+   final boolean isImmutableTargetType;
+   private final int length;
+
+   @SuppressWarnings("unchecked")
+   protected CompositeSerializer(boolean isImmutableTargetType, 
TypeSerializer ... fieldSerializers) {
+   Preconditions.checkNotNull(fieldSerializers);
+   
Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
+   this.isImmutableTargetType = isImmutableTargetType;
+   this.fieldSerializers = (TypeSerializer[]) 
fieldSerializers;
+   this.length = calcLength();
+   }
+
+   /** Create new instance from its fields.  */
+   public abstract T createInstance(@Nonnull Object ... values);
+
+   /** Modify field of existing instance. Supported only by mutable types. 
*/
+   protected abstract void setField(@Nonnull T value, int index, Object 
fieldValue);
+
+   /** Get field of existing instance. */
+   protected abstract Object getField(@Nonnull T value, int index);
+
+   /** Factory for concrete serializer. */
+   protected abstract CompositeSerializer 
createSerializerInstance(TypeSerializer ... originalSerializers);
+
+   @Override
+   public CompositeSerializer duplicate() {
+   TypeSerializer[] duplicatedSerializers = new 
TypeSerializer[fieldSerializers.length];
+   boolean stateful = false;
+   for (int index = 0; index < fieldSerializers.length; index++) {
+   duplicatedSerializers[index] = 
fieldSerializers[index].duplicate();
+   if (fieldSerializers[index] != 
duplicatedSerializers[index]) {
+   stateful = true;
+   }
+   }
+   return stateful ? 
createSerializerInstance(duplicatedSerializers) : this;
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   for (TypeSerializer fieldSerializer : fieldSerializers) 
{
+   if (!fieldSerializer.isImmutableType()) {
+   return false;
+   }
+   }
+   return isImmutableTargetType;
+   }
+
+   @Override
+   public T createInstance() {
+   Object[] fields = new Object[fieldSerializers.length];
+   for (int index = 0; index < fieldSerializers.length; index++) {
+   fields[index] = 
fieldSerializers[index].createInstance();
+   }
+   return createInstance(fields);
+   }
+
+   @Override
+   public T copy(T from) 

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-07-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529773#comment-16529773
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199474258
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a composite type using array of its field 
serializers.
+ * Fields are indexed the same way as their serializers.
+ *
+ * @param  type of custom serialized value
+ */
+public abstract class CompositeSerializer extends TypeSerializer {
+   private static final long serialVersionUID = 1L;
+
+   protected final TypeSerializer[] fieldSerializers;
+   final boolean isImmutableTargetType;
+   private final int length;
+
+   @SuppressWarnings("unchecked")
+   protected CompositeSerializer(boolean isImmutableTargetType, 
TypeSerializer ... fieldSerializers) {
+   Preconditions.checkNotNull(fieldSerializers);
+   
Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
+   this.isImmutableTargetType = isImmutableTargetType;
+   this.fieldSerializers = (TypeSerializer[]) 
fieldSerializers;
+   this.length = calcLength();
+   }
+
+   /** Create new instance from its fields.  */
+   public abstract T createInstance(@Nonnull Object ... values);
+
+   /** Modify field of existing instance. Supported only by mutable types. 
*/
+   protected abstract void setField(@Nonnull T value, int index, Object 
fieldValue);
+
+   /** Get field of existing instance. */
+   protected abstract Object getField(@Nonnull T value, int index);
+
+   /** Factory for concrete serializer. */
+   protected abstract CompositeSerializer 
createSerializerInstance(TypeSerializer ... originalSerializers);
+
+   @Override
+   public CompositeSerializer duplicate() {
+   TypeSerializer[] duplicatedSerializers = new 
TypeSerializer[fieldSerializers.length];
+   boolean stateful = false;
+   for (int index = 0; index < fieldSerializers.length; index++) {
+   duplicatedSerializers[index] = 
fieldSerializers[index].duplicate();
+   if (fieldSerializers[index] != 
duplicatedSerializers[index]) {
+   stateful = true;
+   }
+   }
+   return stateful ? 
createSerializerInstance(duplicatedSerializers) : this;
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   for (TypeSerializer fieldSerializer : fieldSerializers) 
{
--- End diff --

Why not compute this once in the constructor and remember in a boolean 
flag??


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-07-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529772#comment-16529772
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199474152
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a composite type using array of its field 
serializers.
+ * Fields are indexed the same way as their serializers.
+ *
+ * @param  type of custom serialized value
+ */
+public abstract class CompositeSerializer extends TypeSerializer {
+   private static final long serialVersionUID = 1L;
+
+   protected final TypeSerializer[] fieldSerializers;
+   final boolean isImmutableTargetType;
+   private final int length;
+
+   @SuppressWarnings("unchecked")
+   protected CompositeSerializer(boolean isImmutableTargetType, 
TypeSerializer ... fieldSerializers) {
+   Preconditions.checkNotNull(fieldSerializers);
+   
Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
+   this.isImmutableTargetType = isImmutableTargetType;
+   this.fieldSerializers = (TypeSerializer[]) 
fieldSerializers;
+   this.length = calcLength();
+   }
+
+   /** Create new instance from its fields.  */
+   public abstract T createInstance(@Nonnull Object ... values);
+
+   /** Modify field of existing instance. Supported only by mutable types. 
*/
+   protected abstract void setField(@Nonnull T value, int index, Object 
fieldValue);
+
+   /** Get field of existing instance. */
+   protected abstract Object getField(@Nonnull T value, int index);
+
+   /** Factory for concrete serializer. */
+   protected abstract CompositeSerializer 
createSerializerInstance(TypeSerializer ... originalSerializers);
+
+   @Override
+   public CompositeSerializer duplicate() {
+   TypeSerializer[] duplicatedSerializers = new 
TypeSerializer[fieldSerializers.length];
+   boolean stateful = false;
+   for (int index = 0; index < fieldSerializers.length; index++) {
+   duplicatedSerializers[index] = 
fieldSerializers[index].duplicate();
+   if (fieldSerializers[index] != 
duplicatedSerializers[index]) {
--- End diff --

I wonder if we need to do these checks every time `duplicate()` is called? 
We could check it once, remember if all field serializer are stateless and from 
that point return `this` immediately.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-07-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529696#comment-16529696
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user azagrebin commented on the issue:

https://github.com/apache/flink/pull/6196
  
Regarding CI, seems to be unrelated test 
`YARNSessionCapacitySchedulerITCase`, [passed in my 
CI](https://travis-ci.org/azagrebin/flink/builds/398352328)


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom TypeInfo
>  originalStateWithTtl = binder.createValueState(valueDesc);
>      return new TtlValueState(originalStateWithTtl, timeProvider);
> }
>   // List, Map, ...
> }
> {code}
> TTL serializer should add expiration timestamp



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16528012#comment-16528012
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199231639
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
--- End diff --

I added checks in `createStateAndWrapWithTtlIfEnabled`, the only user of 
this private constructor.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue seria

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16528009#comment-16528009
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user azagrebin commented on the issue:

https://github.com/apache/flink/pull/6196
  
@StefanRRichter @sihuazhou 
Thanks guys for the helpful review. 
I refactored the `CompositeSerializer` to rely rather on loops than streams 
and added tests for it.
Please, have a look.

cc @twalthr 
Maybe, the implementation of `CompositeSerializer` in this PR could be 
potentially interesting for other complex types, e.g. tuples or pojos allowing 
code reuse where it is possible without performance loss.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom TypeInfo
>  originalStateWithTtl = binder.createValueState(valueDesc);
>      return new TtlValueState(originalStateWithTtl, timeProvider);
> }
>   // List, Map, ...
> }
> {code}
> TTL serializer should add expiration timestamp



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527993#comment-16527993
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199228508
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
+   this.originalStateFactory = originalStateFactory;
+   this.ttlConfig = ttlConfig;
+   this.timeProvider = timeProvider;
+   this.stateFactories = createStateFactories();
+   }
+
+   private Map, StateFactory> 
createStateFactories() {
+   return Stream.of(
+   Tuple2.of(ValueStateDescriptor.class, (StateFactory) 
this::createValueState),
+   Tuple2.of(ListStateDescriptor.class, (StateFactory) 
this::createListState),
+   Tuple2.of(MapStateDescriptor.class, (StateFactory) 
this::createMapState),
+   Tuple2.of(ReducingStateDescriptor.class, (StateFactory) 
this::createReducingState),
+   Tuple2.of(AggregatingStateDescriptor.class, 
(StateFactory) this::createAggregatingState),
+   Tuple2.of(FoldingStateDescriptor.class, (StateFactory) 
this::createFoldingState)
+   ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+   }
+
+   private interface StateFactory {
+IS create(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception;
+   }
+
+   priv

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527988#comment-16527988
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199227970
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
+   this.originalStateFactory = originalStateFactory;
+   this.ttlConfig = ttlConfig;
+   this.timeProvider = timeProvider;
+   this.stateFactories = createStateFactories();
+   }
+
+   private Map, StateFactory> 
createStateFactories() {
+   return Stream.of(
+   Tuple2.of(ValueStateDescriptor.class, (StateFactory) 
this::createValueState),
+   Tuple2.of(ListStateDescriptor.class, (StateFactory) 
this::createListState),
+   Tuple2.of(MapStateDescriptor.class, (StateFactory) 
this::createMapState),
+   Tuple2.of(ReducingStateDescriptor.class, (StateFactory) 
this::createReducingState),
+   Tuple2.of(AggregatingStateDescriptor.class, 
(StateFactory) this::createAggregatingState),
+   Tuple2.of(FoldingStateDescriptor.class, (StateFactory) 
this::createFoldingState)
+   ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+   }
+
+   private interface StateFactory {
+IS create(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception;
+   }
+
+   priv

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527987#comment-16527987
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199227770
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
+   this.originalStateFactory = originalStateFactory;
+   this.ttlConfig = ttlConfig;
+   this.timeProvider = timeProvider;
+   this.stateFactories = createStateFactories();
+   }
+
+   private Map, StateFactory> 
createStateFactories() {
+   return Stream.of(
+   Tuple2.of(ValueStateDescriptor.class, (StateFactory) 
this::createValueState),
+   Tuple2.of(ListStateDescriptor.class, (StateFactory) 
this::createListState),
+   Tuple2.of(MapStateDescriptor.class, (StateFactory) 
this::createMapState),
+   Tuple2.of(ReducingStateDescriptor.class, (StateFactory) 
this::createReducingState),
+   Tuple2.of(AggregatingStateDescriptor.class, 
(StateFactory) this::createAggregatingState),
+   Tuple2.of(FoldingStateDescriptor.class, (StateFactory) 
this::createFoldingState)
+   ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+   }
+
+   private interface StateFactory {
+IS create(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception;
+   }
+
+   priv

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527982#comment-16527982
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199226982
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
+   this.originalStateFactory = originalStateFactory;
+   this.ttlConfig = ttlConfig;
+   this.timeProvider = timeProvider;
+   this.stateFactories = createStateFactories();
+   }
+
+   private Map, StateFactory> 
createStateFactories() {
+   return Stream.of(
+   Tuple2.of(ValueStateDescriptor.class, (StateFactory) 
this::createValueState),
+   Tuple2.of(ListStateDescriptor.class, (StateFactory) 
this::createListState),
+   Tuple2.of(MapStateDescriptor.class, (StateFactory) 
this::createMapState),
+   Tuple2.of(ReducingStateDescriptor.class, (StateFactory) 
this::createReducingState),
+   Tuple2.of(AggregatingStateDescriptor.class, 
(StateFactory) this::createAggregatingState),
+   Tuple2.of(FoldingStateDescriptor.class, (StateFactory) 
this::createFoldingState)
+   ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+   }
+
+   private interface StateFactory {
--- End diff --

Leftover from previous iteration 👍


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
>

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527980#comment-16527980
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199226833
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
--- End diff --

I think creating the map is not a big deal because it will happen on init 
without big pressure on GC. Also map is more declarative and flexible if new 
state needs to be added then just the map needs to be modified but not the 
method. Or if needed in future, it can be easier refactored to build the 
mapping somewhere else and inject it into factory.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateB

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523640#comment-16523640
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6196
  
Thanks for the nice contribution. I had some comments inline. In particular 
three points about the serializer. I would suggest to avoid the use of raw 
types. I would also suggest to avoid using streaming API at least methods that 
can appear in hot loops (mainly copy, de/serialize) for performance reasons. I 
think the imperative style code will not even be (much) longer in those cases. 
Last, I suggest to always test new serializers via the `SerializerTestBase` 
because this catches many problems with little effort.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom TypeInfo
>  originalStateWithTtl = binder.createValueState(valueDesc);
>      return new TtlValueState(originalStateWithTtl, timeProvider);
> }
>   // List, Map, ...
> }
> {code}
> TTL serializer should add expiration timestamp



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523622#comment-16523622
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r198115714
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from, T reuse) {
+   List originalFromValues = decomposeValueInternal(from);
+   List originalReuseValues = decomposeValueInternal(reuse);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalFromValues.get(i), 
originalReuseValues.get(i)))
--- End diff --

+1


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
>

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523623#comment-16523623
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r198115772
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from, T reuse) {
+   List originalFromValues = decomposeValueInternal(from);
+   List originalReuseValues = decomposeValueInternal(reuse);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalFromValues.get(i), 
originalReuseValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public int getLength() {
+   return originalSerializers.stream().allMatch(s -> s.getLength() 
>= 0) ?
--- End diff --

+1


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new ob

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523620#comment-16523620
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r198115667
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
--- End diff --

I would also suggest to consider lower-level constructs for potentially hot 
methods like this one because this api can easily introduce (non obvious) 
performance regression.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvie

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523618#comment-16523618
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r198114853
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
--- End diff --

You could simply have tests for all the new serializer by extending 
`SerializerTestBase`, which I would recommend to do to catch mistakes.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom TypeInfo
>  originalStateWithTtl = binder.createValueState(valueDesc);
>      return new TtlValueState(originalStateWithTtl, timeProvider);
> }
>   // List, Map, ...
> }
> {code}
> TTL serializer should add expiration timestamp



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523614#comment-16523614
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r198113448
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
--- End diff --

Also here, list is just used as raw type. I think also the abstract methods 
could use some documentation for people that would like to create new 
subclasses in the future.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom TypeInfo
>  originalStateWithTtl = binder.createValueState(valueDesc);
>      return new TtlValueState(originalStateWithTtl, timeProvider);
> }
>   // List, Map, ...
> }
> {code}
> TTL serializer should add expiration timestamp



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523613#comment-16523613
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r198113175
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
--- End diff --

Why is this class using a lot of raw types instead of wildcards? E.g. why 
`List`instead of `List>`


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom TypeInfo
>  originalStateWithTtl = binder.createValueState(valueDesc);
>      return new TtlValueState(originalStateWithTtl, timeProvider);
> }
>   // List, Map, ...
> }
> {code}
> TTL serializer should add expiration timestamp



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523434#comment-16523434
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r198062463
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
+   this.originalStateFactory = originalStateFactory;
+   this.ttlConfig = ttlConfig;
+   this.timeProvider = timeProvider;
+   this.stateFactories = createStateFactories();
+   }
+
+   private Map, StateFactory> 
createStateFactories() {
+   return Stream.of(
+   Tuple2.of(ValueStateDescriptor.class, (StateFactory) 
this::createValueState),
+   Tuple2.of(ListStateDescriptor.class, (StateFactory) 
this::createListState),
+   Tuple2.of(MapStateDescriptor.class, (StateFactory) 
this::createMapState),
+   Tuple2.of(ReducingStateDescriptor.class, (StateFactory) 
this::createReducingState),
+   Tuple2.of(AggregatingStateDescriptor.class, 
(StateFactory) this::createAggregatingState),
+   Tuple2.of(FoldingStateDescriptor.class, (StateFactory) 
this::createFoldingState)
+   ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+   }
+
+   private interface StateFactory {
+IS create(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception;
+   }
+
+  

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523432#comment-16523432
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r198061894
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
+   this.originalStateFactory = originalStateFactory;
+   this.ttlConfig = ttlConfig;
+   this.timeProvider = timeProvider;
+   this.stateFactories = createStateFactories();
+   }
+
+   private Map, StateFactory> 
createStateFactories() {
+   return Stream.of(
+   Tuple2.of(ValueStateDescriptor.class, (StateFactory) 
this::createValueState),
+   Tuple2.of(ListStateDescriptor.class, (StateFactory) 
this::createListState),
+   Tuple2.of(MapStateDescriptor.class, (StateFactory) 
this::createMapState),
+   Tuple2.of(ReducingStateDescriptor.class, (StateFactory) 
this::createReducingState),
+   Tuple2.of(AggregatingStateDescriptor.class, 
(StateFactory) this::createAggregatingState),
+   Tuple2.of(FoldingStateDescriptor.class, (StateFactory) 
this::createFoldingState)
+   ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+   }
+
+   private interface StateFactory {
+IS create(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception;
+   }
+
+  

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523429#comment-16523429
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r198060715
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
+   this.originalStateFactory = originalStateFactory;
+   this.ttlConfig = ttlConfig;
+   this.timeProvider = timeProvider;
+   this.stateFactories = createStateFactories();
+   }
+
+   private Map, StateFactory> 
createStateFactories() {
+   return Stream.of(
+   Tuple2.of(ValueStateDescriptor.class, (StateFactory) 
this::createValueState),
+   Tuple2.of(ListStateDescriptor.class, (StateFactory) 
this::createListState),
+   Tuple2.of(MapStateDescriptor.class, (StateFactory) 
this::createMapState),
+   Tuple2.of(ReducingStateDescriptor.class, (StateFactory) 
this::createReducingState),
+   Tuple2.of(AggregatingStateDescriptor.class, 
(StateFactory) this::createAggregatingState),
+   Tuple2.of(FoldingStateDescriptor.class, (StateFactory) 
this::createFoldingState)
+   ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+   }
+
+   private interface StateFactory {
--- End diff --

Do we really need this interface? It looks identical to `KeyedStateFactory` 
and seems that interface would fit here as well?


> Wrap state binder wit

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523426#comment-16523426
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r198058861
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/** This factory produces concrete state objects in backends. */
+public interface KeyedStateFactory {
--- End diff --

I think this interface can go into the file of `KeyedStateBackend` because 
it belongs into that context.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom TypeInfo
>  originalStateWithTtl = binder.createValueState(valueDesc);
>      return new TtlValueState(originalStateWithTtl, timeProvider);
> }
>   // List, Map, ...
> }
> {code}
> TTL serializer should add expiration timestamp



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523210#comment-16523210
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r198015848
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from, T reuse) {
+   List originalFromValues = decomposeValueInternal(from);
+   List originalReuseValues = decomposeValueInternal(reuse);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalFromValues.get(i), 
originalReuseValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public int getLength() {
+   return originalSerializers.stream().allMatch(s -> s.getLength() 
>= 0) ?
+   
originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1;
+   }
+
+   @Override
+   public void serialize(T record, DataOutputView target) throws 
IOException {
+   List originalValues = decomposeValueInternal(record);
+   for (int i = 0; i < originalSerializers.size(); i++) {
+   
originalSerializers.get(i).serialize(originalValues.get(i), target);
+   }
+   }
+
+   @Override
+   public T deserialize(DataInputView source) throws IOException {
+   List originalValues = new ArrayList();
+   for (TypeSerializer typeSerializer : originalSer

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523209#comment-16523209
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r198015825
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from, T reuse) {
+   List originalFromValues = decomposeValueInternal(from);
+   List originalReuseValues = decomposeValueInternal(reuse);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalFromValues.get(i), 
originalReuseValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public int getLength() {
+   return originalSerializers.stream().allMatch(s -> s.getLength() 
>= 0) ?
+   
originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1;
+   }
+
+   @Override
+   public void serialize(T record, DataOutputView target) throws 
IOException {
+   List originalValues = decomposeValueInternal(record);
+   for (int i = 0; i < originalSerializers.size(); i++) {
+   
originalSerializers.get(i).serialize(originalValues.get(i), target);
+   }
+   }
+
+   @Override
+   public T deserialize(DataInputView source) throws IOException {
+   List originalValues = new ArrayList();
+   for (TypeSerializer typeSerializer : originalSer

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523205#comment-16523205
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r198015385
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
--- End diff --

Same here might result in a bad performance because of the random access of 
the list.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc)

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523203#comment-16523203
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r198015282
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from, T reuse) {
+   List originalFromValues = decomposeValueInternal(from);
+   List originalReuseValues = decomposeValueInternal(reuse);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalFromValues.get(i), 
originalReuseValues.get(i)))
--- End diff --

I would suggest to use the `Iterator` instead of `xxx.get(i)` here, because 
if the `originalSerializers` (or the other instances) is type of LinkedList the 
performance could be bad. 


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implemen

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523200#comment-16523200
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r198015105
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from, T reuse) {
+   List originalFromValues = decomposeValueInternal(from);
+   List originalReuseValues = decomposeValueInternal(reuse);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalFromValues.get(i), 
originalReuseValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public int getLength() {
+   return originalSerializers.stream().allMatch(s -> s.getLength() 
>= 0) ?
--- End diff --

nit: if we don't use the high level API, we could do this in a single loop 
with a better performance.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class h

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523197#comment-16523197
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r198014886
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from, T reuse) {
+   List originalFromValues = decomposeValueInternal(from);
+   List originalReuseValues = decomposeValueInternal(reuse);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalFromValues.get(i), 
originalReuseValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public int getLength() {
+   return originalSerializers.stream().allMatch(s -> s.getLength() 
>= 0) ?
+   
originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1;
+   }
+
+   @Override
+   public void serialize(T record, DataOutputView target) throws 
IOException {
+   List originalValues = decomposeValueInternal(record);
+   for (int i = 0; i < originalSerializers.size(); i++) {
+   
originalSerializers.get(i).serialize(originalValues.get(i), target);
--- End diff --

If `originalSerializers` or the `originalValues` is a type of something 
like `LinkedList`, then the `originalSerializers.get(i)` and 
`originalValues.get(i)` will get a very poor performance. I think we might 
sho

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523187#comment-16523187
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r198014021
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
--- End diff --

I see, but this looks a bit weird, it looks like that the `stateFactories` 
is only used for looking up the corresponding state factory for the 
`stateDesc`, and we need to firstly create it every time when calling the 
`createStateAndWrapWithTtlIfEnabled()`, the flow of the 
`createStateAndWrapWithTtlIfEnabled()` looks like:

- create a map of `state factory` (stateFactories).
- use the stateFactories to look up the corresponding state factory for the 
`stateDesc`.
- ...

In that case, maybe use the `switch case` to find the corresponding state 
factory is better, at lest we don't need to firstly create a map of `state 
factory` for every call  this way. What do you think?


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder fac

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520552#comment-16520552
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197498439
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
--- End diff --

It could be static. The idea was to create `TtlStateFactory` object for 
each call of `createStateAndWrapWithTtlIfEnabled`. I think creating of state 
object will not happen often, only at init. The object contains non static 
references to originalStateFactory, ttlConfig, timeProvider to make 
`createXxxState` methods less verbose. `stateFactories` contains reference to 
this object in `createStateFactories`, so it cannot be static unless everything 
is static and `createXxxState` are more verbose and accept additional 
parameters: originalStateFactory, ttlConfig, timeProvider.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  b

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519962#comment-16519962
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197331145
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
--- End diff --

Would be better to check the args are not null, or simply use the 
`@Nonnull` annotation.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation l

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519961#comment-16519961
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197329533
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
--- End diff --

I think this check looks like a bug.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom TypeInfo
>  originalStateWithTtl = binder.createValueState(valueDesc);
>      return new TtlValueState(originalStateWithTtl, timeProvider);
> }
>   // List, Map, ...
> }
> {code}
> TTL serializer should add expiration timestamp



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519968#comment-16519968
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197331741
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
+   this.originalStateFactory = originalStateFactory;
+   this.ttlConfig = ttlConfig;
+   this.timeProvider = timeProvider;
+   this.stateFactories = createStateFactories();
+   }
+
+   private Map, StateFactory> 
createStateFactories() {
+   return Stream.of(
+   Tuple2.of(ValueStateDescriptor.class, (StateFactory) 
this::createValueState),
+   Tuple2.of(ListStateDescriptor.class, (StateFactory) 
this::createListState),
+   Tuple2.of(MapStateDescriptor.class, (StateFactory) 
this::createMapState),
+   Tuple2.of(ReducingStateDescriptor.class, (StateFactory) 
this::createReducingState),
+   Tuple2.of(AggregatingStateDescriptor.class, 
(StateFactory) this::createAggregatingState),
+   Tuple2.of(FoldingStateDescriptor.class, (StateFactory) 
this::createFoldingState)
+   ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+   }
+
+   private interface StateFactory {
+IS create(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception;
+   }
+
+   priv

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519960#comment-16519960
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197330596
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from, T reuse) {
+   List originalFromValues = decomposeValueInternal(from);
+   List originalReuseValues = decomposeValueInternal(reuse);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalFromValues.get(i), 
originalReuseValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public int getLength() {
+   return originalSerializers.stream().allMatch(s -> s.getLength() 
>= 0) ?
+   
originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1;
+   }
+
+   @Override
+   public void serialize(T record, DataOutputView target) throws 
IOException {
+   List originalValues = decomposeValueInternal(record);
+   for (int i = 0; i < originalSerializers.size(); i++) {
+   
originalSerializers.get(i).serialize(originalValues.get(i), target);
+   }
+   }
+
+   @Override
+   public T deserialize(DataInputView source) throws IOException {
+   List originalValues = new ArrayList();
+   for (TypeSerializer typeSerializer : originalSer

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519966#comment-16519966
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197331764
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
+   this.originalStateFactory = originalStateFactory;
+   this.ttlConfig = ttlConfig;
+   this.timeProvider = timeProvider;
+   this.stateFactories = createStateFactories();
+   }
+
+   private Map, StateFactory> 
createStateFactories() {
+   return Stream.of(
+   Tuple2.of(ValueStateDescriptor.class, (StateFactory) 
this::createValueState),
+   Tuple2.of(ListStateDescriptor.class, (StateFactory) 
this::createListState),
+   Tuple2.of(MapStateDescriptor.class, (StateFactory) 
this::createMapState),
+   Tuple2.of(ReducingStateDescriptor.class, (StateFactory) 
this::createReducingState),
+   Tuple2.of(AggregatingStateDescriptor.class, 
(StateFactory) this::createAggregatingState),
+   Tuple2.of(FoldingStateDescriptor.class, (StateFactory) 
this::createFoldingState)
+   ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+   }
+
+   private interface StateFactory {
+IS create(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception;
+   }
+
+   priv

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519967#comment-16519967
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197331820
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
+   this.originalStateFactory = originalStateFactory;
+   this.ttlConfig = ttlConfig;
+   this.timeProvider = timeProvider;
+   this.stateFactories = createStateFactories();
+   }
+
+   private Map, StateFactory> 
createStateFactories() {
+   return Stream.of(
+   Tuple2.of(ValueStateDescriptor.class, (StateFactory) 
this::createValueState),
+   Tuple2.of(ListStateDescriptor.class, (StateFactory) 
this::createListState),
+   Tuple2.of(MapStateDescriptor.class, (StateFactory) 
this::createMapState),
+   Tuple2.of(ReducingStateDescriptor.class, (StateFactory) 
this::createReducingState),
+   Tuple2.of(AggregatingStateDescriptor.class, 
(StateFactory) this::createAggregatingState),
+   Tuple2.of(FoldingStateDescriptor.class, (StateFactory) 
this::createFoldingState)
+   ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+   }
+
+   private interface StateFactory {
+IS create(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception;
+   }
+
+   priv

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519959#comment-16519959
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197330095
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from, T reuse) {
+   List originalFromValues = decomposeValueInternal(from);
+   List originalReuseValues = decomposeValueInternal(reuse);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalFromValues.get(i), 
originalReuseValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public int getLength() {
+   return originalSerializers.stream().allMatch(s -> s.getLength() 
>= 0) ?
+   
originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1;
+   }
+
+   @Override
+   public void serialize(T record, DataOutputView target) throws 
IOException {
+   List originalValues = decomposeValueInternal(record);
+   for (int i = 0; i < originalSerializers.size(); i++) {
+   
originalSerializers.get(i).serialize(originalValues.get(i), target);
+   }
+   }
+
+   @Override
+   public T deserialize(DataInputView source) throws IOException {
+   List originalValues = new ArrayList();
+   for (TypeSerializer typeSerializer : originalSer

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519964#comment-16519964
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197331211
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
--- End diff --

Why this couldn't be static?


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom Typ

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519958#comment-16519958
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197329976
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from, T reuse) {
+   List originalFromValues = decomposeValueInternal(from);
+   List originalReuseValues = decomposeValueInternal(reuse);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalFromValues.get(i), 
originalReuseValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public int getLength() {
+   return originalSerializers.stream().allMatch(s -> s.getLength() 
>= 0) ?
+   
originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1;
+   }
+
+   @Override
+   public void serialize(T record, DataOutputView target) throws 
IOException {
+   List originalValues = decomposeValueInternal(record);
+   for (int i = 0; i < originalSerializers.size(); i++) {
+   
originalSerializers.get(i).serialize(originalValues.get(i), target);
+   }
+   }
+
+   @Override
+   public T deserialize(DataInputView source) throws IOException {
+   List originalValues = new ArrayList();
--- End diff --

I would suggest to give a init size

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519963#comment-16519963
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197330813
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 ---
@@ -48,7 +48,8 @@
KeyedStateBackend,
Snapshotable, 
Collection>,
Closeable,
-   CheckpointListener {
+   CheckpointListener,
+   KeyedStateFactory{
--- End diff --

I think this seems to miss a space ` `.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom TypeInfo
>  originalStateWithTtl = binder.createValueState(valueDesc);
>      return new TtlValueState(originalStateWithTtl, timeProvider);
> }
>   // List, Map, ...
> }
> {code}
> TTL serializer should add expiration timestamp



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519965#comment-16519965
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197331898
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
+   this.originalStateFactory = originalStateFactory;
+   this.ttlConfig = ttlConfig;
+   this.timeProvider = timeProvider;
+   this.stateFactories = createStateFactories();
+   }
+
+   private Map, StateFactory> 
createStateFactories() {
+   return Stream.of(
+   Tuple2.of(ValueStateDescriptor.class, (StateFactory) 
this::createValueState),
+   Tuple2.of(ListStateDescriptor.class, (StateFactory) 
this::createListState),
+   Tuple2.of(MapStateDescriptor.class, (StateFactory) 
this::createMapState),
+   Tuple2.of(ReducingStateDescriptor.class, (StateFactory) 
this::createReducingState),
+   Tuple2.of(AggregatingStateDescriptor.class, 
(StateFactory) this::createAggregatingState),
+   Tuple2.of(FoldingStateDescriptor.class, (StateFactory) 
this::createFoldingState)
+   ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+   }
+
+   private interface StateFactory {
+IS create(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception;
+   }
+
+   priv

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519369#comment-16519369
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

GitHub user azagrebin opened a pull request:

https://github.com/apache/flink/pull/6196

[FLINK-9513] Implement TTL state wrappers factory and serializer for value 
with TTL

## What is the purpose of the change

This PR introduces a state factory for wrapping state objects with TTL 
logic and serialiser of user value with expiration timestamp.
NOTE: This PR is based on #6186 and only last commit makes difference with 
it and needs review.

## Brief change log

  - abstract state creation in backends with `KeyedStateFactory` interface
  - add `TtlStateFactory`
  - add `CompositeSerializer`

## Verifying this change

This change is a trivial addition without any test coverage in this PR and 
should be covered together with TTL feature activation by final integration and 
e2e tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (yes)
  - The runtime per-record code paths (performance sensitive): (not yet)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (not yet)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (not applicable at the moment)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/azagrebin/flink FLINK-9513

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6196.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6196


commit 62faa8ee220c21fa824fec690073c27a0a994be5
Author: Andrey Zagrebin 
Date:   2018-06-04T15:28:40Z

[FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

commit 74c689e1660d40176b3c131fb0f3f9dcafa33889
Author: Andrey Zagrebin 
Date:   2018-06-20T15:05:28Z

Check overflow in expiration timestamp, allow only non-negative TTL

commit 1164aa2a9c4298461eaa44322ef9cefa00b4f0fe
Author: Andrey Zagrebin 
Date:   2018-06-21T12:24:04Z

small fixes

commit 1d19d4ac2b73ac83290b4b117b82895c99b51865
Author: Andrey Zagrebin 
Date:   2018-06-21T13:13:42Z

Make AbstractTtlState.getSerializedValue() unsupported for now in case of 
queryable state

commit 4dedb9a20244a2addd337617778b17fe8349
Author: Andrey Zagrebin 
Date:   2018-06-11T17:34:47Z

[FLINK-9513] Implement TTL state wrappers factory and serializer for value 
with TTL




> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom TypeInfo
>  originalStateWithTtl = binder.createValueState(valueDesc);
>      return new TtlValueState(originalStateWithTtl, timeProvider);
> }
>   // List, Map, ...
> }
> {code}
> TTL serializer should add expiration timestamp



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)