[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532509#comment-16532509 ] ASF GitHub Bot commented on FLINK-9513: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6196 > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateBinder { > StateBinder binder; > ProcessingTimeProvier timeProvider; // System.currentTimeMillis() > TtlValueState createValueState(valueDesc) { > serializer = new TtlValueSerializer(valueDesc.getSerializer); > ttlValueDesc = new ValueDesc(serializer, ...); > // or implement custom TypeInfo > originalStateWithTtl = binder.createValueState(valueDesc); > return new TtlValueState(originalStateWithTtl, timeProvider); > } > // List, Map, ... > } > {code} > TTL serializer should add expiration timestamp -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532426#comment-16532426 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6196 I found there is still a small issue with the equals/hashCode but will just fix it before merging. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateBinder { > StateBinder binder; > ProcessingTimeProvier timeProvider; // System.currentTimeMillis() > TtlValueState createValueState(valueDesc) { > serializer = new TtlValueSerializer(valueDesc.getSerializer); > ttlValueDesc = new ValueDesc(serializer, ...); > // or implement custom TypeInfo > originalStateWithTtl = binder.createValueState(valueDesc); > return new TtlValueState(originalStateWithTtl, timeProvider); > } > // List, Map, ... > } > {code} > TTL serializer should add expiration timestamp -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532419#comment-16532419 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6196 LGTM 👍 merging. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateBinder { > StateBinder binder; > ProcessingTimeProvier timeProvider; // System.currentTimeMillis() > TtlValueState createValueState(valueDesc) { > serializer = new TtlValueSerializer(valueDesc.getSerializer); > ttlValueDesc = new ValueDesc(serializer, ...); > // or implement custom TypeInfo > originalStateWithTtl = binder.createValueState(valueDesc); > return new TtlValueState(originalStateWithTtl, timeProvider); > } > // List, Map, ... > } > {code} > TTL serializer should add expiration timestamp -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531726#comment-16531726 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199894398 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -272,4 +254,60 @@ public int getVersion() { previousSerializersAndConfigs.get(index).f0, UnloadableDummyTypeSerializer.class, previousSerializersAndConfigs.get(index).f1, fieldSerializers[index]); } + + /** This class holds composite serializer parameters which can be precomputed in advanced for better performance. */ + protected static class PrecomputedParameters implements Serializable { + /** Whether target type is immutable. */ + final boolean immutableTargetType; + + /** Whether target type and its fields are immutable. */ + final boolean immutable; + + /** Byte length of target object in serialized form. */ + private final int length; + + /** Whether any field serializer is stateful. */ + final boolean stateful; + + final int hashCode; --- End diff -- I wonder if this should be `transient` in a serializable class, the hash code could be based on object identity. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateBinder { > StateBinder binder; > ProcessingTimeProvier timeProvider; // System.currentTimeMillis() > TtlValueState createValueState(valueDesc) { > serializer = new TtlValueSerializer(valueDesc.getSerializer); > ttlValueDesc = new ValueDesc(serializer, ...); > // or implement custom TypeInfo > originalStateWithTtl = binder.createValueState(valueDesc); > return new TtlValueState(originalStateWithTtl, timeProvider); > } > // List, Map, ... > } > {code} > TTL serializer should add expiration timestamp -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531724#comment-16531724 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199894217 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -272,4 +254,60 @@ public int getVersion() { previousSerializersAndConfigs.get(index).f0, UnloadableDummyTypeSerializer.class, previousSerializersAndConfigs.get(index).f1, fieldSerializers[index]); } + + /** This class holds composite serializer parameters which can be precomputed in advanced for better performance. */ + protected static class PrecomputedParameters implements Serializable { --- End diff -- If this is serializable, we should add a version uid. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateBinder { > StateBinder binder; > ProcessingTimeProvier timeProvider; // System.currentTimeMillis() > TtlValueState createValueState(valueDesc) { > serializer = new TtlValueSerializer(valueDesc.getSerializer); > ttlValueDesc = new ValueDesc(serializer, ...); > // or implement custom TypeInfo > originalStateWithTtl = binder.createValueState(valueDesc); > return new TtlValueState(originalStateWithTtl, timeProvider); > } > // List, Map, ... > } > {code} > TTL serializer should add expiration timestamp -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531285#comment-16531285 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6196 Had a few more comments, but they all are basically optimizations. I leave it up to you if you still want to address all or some of them. Please let me know. Otherwise, we can merge this. 👍 > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateBinder { > StateBinder binder; > ProcessingTimeProvier timeProvider; // System.currentTimeMillis() > TtlValueState createValueState(valueDesc) { > serializer = new TtlValueSerializer(valueDesc.getSerializer); > ttlValueDesc = new ValueDesc(serializer, ...); > // or implement custom TypeInfo > originalStateWithTtl = binder.createValueState(valueDesc); > return new TtlValueState(originalStateWithTtl, timeProvider); > } > // List, Map, ... > } > {code} > TTL serializer should add expiration timestamp -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531277#comment-16531277 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199783587 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a composite type using array of its field serializers. + * Fields are indexed the same way as their serializers. + * + * @param type of custom serialized value + */ +public abstract class CompositeSerializer extends TypeSerializer { + private static final long serialVersionUID = 1L; + + /** Serializers for fields which constitute T. */ + protected final TypeSerializer[] fieldSerializers; + + /** Whether T is an immutable type. */ + final boolean immutableTargetType; + + /** Byte length of target object in serialized form. */ + private final int length; + + /** Whether any field serializer is stateful. */ + private final boolean stateful; + + private final int hashCode; + + @SuppressWarnings("unchecked") + protected CompositeSerializer(boolean immutableTargetType, TypeSerializer ... fieldSerializers) { + Preconditions.checkNotNull(fieldSerializers); + Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull)); + this.immutableTargetType = immutableTargetType && + Arrays.stream(fieldSerializers).allMatch(TypeSerializer::isImmutableType); + this.fieldSerializers = (TypeSerializer[]) fieldSerializers; + this.length = calcLength(); + this.stateful = isStateful(); + this.hashCode = Arrays.hashCode(fieldSerializers); + } + + private boolean isStateful() { + TypeSerializer[] duplicatedSerializers = duplicateFieldSerializers(); + return IntStream.range(0, fieldSerializers.length) + .anyMatch(i -> fieldSerializers[i] != duplicatedSerializers[i]); + } + + /** Create new instance from its fields. */ + public abstract T createInstance(@Nonnull Object ... values); + + /** Modify field of existing instance. Supported only by mutable types. */ + protected abstract void setField(@Nonnull T value, int index, Object fieldValue); + + /** Get field of existing instance. */ + protected abstract Object getField(@Nonnull T value, int index); + + /** Factory for concrete serializer. */ + protected abstract CompositeSerializer createSerializerInstance(TypeSerializer ... originalSerializers); + + @Override + public CompositeSerializer duplicate() { + return stateful ? createSerializerInstance(duplicateFieldSerializers()) : this; --- End diff -- Another small point here for `createSerializerInstance(...)`: we have no (non-public) constructor that can also take all boolean flags, length, and (maybe) hash directly. So if we copy the serializer, I guess it always goes through the whole process again to figure this out, but we could just copy it from the previous instance. > Wrap state binder with TTL logic > --
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531270#comment-16531270 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199782860 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a composite type using array of its field serializers. + * Fields are indexed the same way as their serializers. + * + * @param type of custom serialized value + */ +public abstract class CompositeSerializer extends TypeSerializer { + private static final long serialVersionUID = 1L; + + /** Serializers for fields which constitute T. */ + protected final TypeSerializer[] fieldSerializers; + + /** Whether T is an immutable type. */ + final boolean immutableTargetType; + + /** Byte length of target object in serialized form. */ + private final int length; + + /** Whether any field serializer is stateful. */ + private final boolean stateful; + + private final int hashCode; + + @SuppressWarnings("unchecked") + protected CompositeSerializer(boolean immutableTargetType, TypeSerializer ... fieldSerializers) { + Preconditions.checkNotNull(fieldSerializers); + Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull)); + this.immutableTargetType = immutableTargetType && + Arrays.stream(fieldSerializers).allMatch(TypeSerializer::isImmutableType); + this.fieldSerializers = (TypeSerializer[]) fieldSerializers; + this.length = calcLength(); + this.stateful = isStateful(); + this.hashCode = Arrays.hashCode(fieldSerializers); + } + + private boolean isStateful() { + TypeSerializer[] duplicatedSerializers = duplicateFieldSerializers(); --- End diff -- The flag for `isStateful()` is the only one that I suggested as a candidate for lazy init when `duplicate()` is called for the first time. Reason is that duplicating some types of inner serializers can sometimes be a bit expensive. But again, I feel that this can also be changed in followup work, if needed. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerSe
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531265#comment-16531265 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199782303 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a composite type using array of its field serializers. + * Fields are indexed the same way as their serializers. + * + * @param type of custom serialized value + */ +public abstract class CompositeSerializer extends TypeSerializer { + private static final long serialVersionUID = 1L; + + /** Serializers for fields which constitute T. */ + protected final TypeSerializer[] fieldSerializers; + + /** Whether T is an immutable type. */ + final boolean immutableTargetType; + + /** Byte length of target object in serialized form. */ + private final int length; + + /** Whether any field serializer is stateful. */ + private final boolean stateful; + + private final int hashCode; + + @SuppressWarnings("unchecked") + protected CompositeSerializer(boolean immutableTargetType, TypeSerializer ... fieldSerializers) { + Preconditions.checkNotNull(fieldSerializers); + Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull)); + this.immutableTargetType = immutableTargetType && + Arrays.stream(fieldSerializers).allMatch(TypeSerializer::isImmutableType); + this.fieldSerializers = (TypeSerializer[]) fieldSerializers; + this.length = calcLength(); + this.stateful = isStateful(); + this.hashCode = Arrays.hashCode(fieldSerializers); --- End diff -- I think up to this point, the code is iterating `fieldSerializers` 5 times (null checks, immutable check, length calc, stateful check, and hash code computation. It could be done in one iteration, but since this method should typically not be called in hot loops, this is an optional improvement. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic:
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531159#comment-16531159 ] ASF GitHub Bot commented on FLINK-9513: --- Github user azagrebin commented on the issue: https://github.com/apache/flink/pull/6196 @StefanRRichter I added more precomputed fields to `CompositeSerializer` constructor and included `TtlStateFactory` in TTL tests. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateBinder { > StateBinder binder; > ProcessingTimeProvier timeProvider; // System.currentTimeMillis() > TtlValueState createValueState(valueDesc) { > serializer = new TtlValueSerializer(valueDesc.getSerializer); > ttlValueDesc = new ValueDesc(serializer, ...); > // or implement custom TypeInfo > originalStateWithTtl = binder.createValueState(valueDesc); > return new TtlValueState(originalStateWithTtl, timeProvider); > } > // List, Map, ... > } > {code} > TTL serializer should add expiration timestamp -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529828#comment-16529828 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199485075 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * Base class for composite serializers. + * + * This class serializes a composite type using array of its field serializers. + * Fields are indexed the same way as their serializers. + * + * @param type of custom serialized value + */ +public abstract class CompositeSerializer extends TypeSerializer { + private static final long serialVersionUID = 1L; + + protected final TypeSerializer[] fieldSerializers; + final boolean isImmutableTargetType; --- End diff -- I think in Java code style, a boolean field name should not be prefixed with `is...`, only the getter should be prefixed with `is...` > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateBinder { > StateBinder binder; > ProcessingTimeProvier timeProvider; // System.currentTimeMillis() > TtlValueState createValueState(valueDesc) { > serializer = new TtlValueSerializer(valueDesc.getSerializer); > ttlValueDesc = new ValueDesc(serializer, ...); > // or implement custom TypeInfo > originalStateWithTtl = binder.createValueState(valueDesc); > return new TtlValueState(originalStateWithTtl, timeProvider); > } > // List, Map, ... > } > {code} > TTL serializer should add expiration timestamp -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529800#comment-16529800 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6196 I had a few more comments, in particular some improvements for the new serializer. I think when those are addressed this is good to merge. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateBinder { > StateBinder binder; > ProcessingTimeProvier timeProvider; // System.currentTimeMillis() > TtlValueState createValueState(valueDesc) { > serializer = new TtlValueSerializer(valueDesc.getSerializer); > ttlValueDesc = new ValueDesc(serializer, ...); > // or implement custom TypeInfo > originalStateWithTtl = binder.createValueState(valueDesc); > return new TtlValueState(originalStateWithTtl, timeProvider); > } > // List, Map, ... > } > {code} > TTL serializer should add expiration timestamp -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529797#comment-16529797 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199477483 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +@SuppressWarnings("unchecked") +public class TtlStateFactory { --- End diff -- It might make sense to also have a test when we introduce a new class that provides some kind of "service", e.g. to check that all the types are correctly mapped and also prevent that somebody breaks the mapping by accident. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateBinder { > StateBinder binder; > ProcessingTimeProvier timeProvider; // System.currentTimeMillis() > TtlValueState createValueState(valueDesc) { > serializer = new TtlValueSerializer(valueDesc.getSerializer); > ttlValueDesc = new ValueDesc(serializer, ...); > // or implement custom TypeInfo > originalStateWithTtl = binder.createValueState(valueDesc); > return new TtlValueState(originalStateWithTtl, timeProvider); > } > // List, Map, ... > } > {code} > TTL serializer should add expiration timestamp -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529794#comment-16529794 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199476870 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +@SuppressWarnings("unchecked") --- End diff -- I would not suppress warnings in the scope of a full class, better more fine grained on methods. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateBinder { > StateBinder binder; > ProcessingTimeProvier timeProvider; // System.currentTimeMillis() > TtlValueState createValueState(valueDesc) { > serializer = new TtlValueSerializer(valueDesc.getSerializer); > ttlValueDesc = new ValueDesc(serializer, ...); > // or implement custom TypeInfo > originalStateWithTtl = binder.createValueState(valueDesc); > return new TtlValueState(originalStateWithTtl, timeProvider); > } > // List, Map, ... > } > {code} > TTL serializer should add expiration timestamp -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529780#comment-16529780 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199474873 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * Base class for composite serializers. + * + * This class serializes a composite type using array of its field serializers. + * Fields are indexed the same way as their serializers. + * + * @param type of custom serialized value + */ +public abstract class CompositeSerializer extends TypeSerializer { + private static final long serialVersionUID = 1L; + + protected final TypeSerializer[] fieldSerializers; + final boolean isImmutableTargetType; + private final int length; + + @SuppressWarnings("unchecked") + protected CompositeSerializer(boolean isImmutableTargetType, TypeSerializer ... fieldSerializers) { + Preconditions.checkNotNull(fieldSerializers); + Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull)); + this.isImmutableTargetType = isImmutableTargetType; + this.fieldSerializers = (TypeSerializer[]) fieldSerializers; + this.length = calcLength(); + } + + /** Create new instance from its fields. */ + public abstract T createInstance(@Nonnull Object ... values); + + /** Modify field of existing instance. Supported only by mutable types. */ + protected abstract void setField(@Nonnull T value, int index, Object fieldValue); + + /** Get field of existing instance. */ + protected abstract Object getField(@Nonnull T value, int index); + + /** Factory for concrete serializer. */ + protected abstract CompositeSerializer createSerializerInstance(TypeSerializer ... originalSerializers); + + @Override + public CompositeSerializer duplicate() { + TypeSerializer[] duplicatedSerializers = new TypeSerializer[fieldSerializers.length]; + boolean stateful = false; + for (int index = 0; index < fieldSerializers.length; index++) { + duplicatedSerializers[index] = fieldSerializers[index].duplicate(); + if (fieldSerializers[index] != duplicatedSerializers[index]) { + stateful = true; + } + } + return stateful ? createSerializerInstance(duplicatedSerializers) : this; + } + + @Override + public boolean isImmutableType() { + for (TypeSerializer fieldSerializer : fieldSerializers) { --- End diff -- We can compute many things like length, immutability, etc already in the constructor. Statelessness is the one thing that we might want to figure out and remember on the first attempt. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Prior
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529776#comment-16529776 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199474421 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * Base class for composite serializers. + * + * This class serializes a composite type using array of its field serializers. + * Fields are indexed the same way as their serializers. + * + * @param type of custom serialized value + */ +public abstract class CompositeSerializer extends TypeSerializer { + private static final long serialVersionUID = 1L; + + protected final TypeSerializer[] fieldSerializers; + final boolean isImmutableTargetType; + private final int length; + + @SuppressWarnings("unchecked") + protected CompositeSerializer(boolean isImmutableTargetType, TypeSerializer ... fieldSerializers) { + Preconditions.checkNotNull(fieldSerializers); + Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull)); + this.isImmutableTargetType = isImmutableTargetType; + this.fieldSerializers = (TypeSerializer[]) fieldSerializers; + this.length = calcLength(); + } + + /** Create new instance from its fields. */ + public abstract T createInstance(@Nonnull Object ... values); + + /** Modify field of existing instance. Supported only by mutable types. */ + protected abstract void setField(@Nonnull T value, int index, Object fieldValue); + + /** Get field of existing instance. */ + protected abstract Object getField(@Nonnull T value, int index); + + /** Factory for concrete serializer. */ + protected abstract CompositeSerializer createSerializerInstance(TypeSerializer ... originalSerializers); + + @Override + public CompositeSerializer duplicate() { + TypeSerializer[] duplicatedSerializers = new TypeSerializer[fieldSerializers.length]; + boolean stateful = false; + for (int index = 0; index < fieldSerializers.length; index++) { + duplicatedSerializers[index] = fieldSerializers[index].duplicate(); + if (fieldSerializers[index] != duplicatedSerializers[index]) { + stateful = true; + } + } + return stateful ? createSerializerInstance(duplicatedSerializers) : this; + } + + @Override + public boolean isImmutableType() { + for (TypeSerializer fieldSerializer : fieldSerializers) { + if (!fieldSerializer.isImmutableType()) { + return false; + } + } + return isImmutableTargetType; + } + + @Override + public T createInstance() { + Object[] fields = new Object[fieldSerializers.length]; + for (int index = 0; index < fieldSerializers.length; index++) { + fields[index] = fieldSerializers[index].createInstance(); + } + return createInstance(fields); + } + + @Override + public T copy(T from)
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529777#comment-16529777 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199474494 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * Base class for composite serializers. + * + * This class serializes a composite type using array of its field serializers. + * Fields are indexed the same way as their serializers. + * + * @param type of custom serialized value + */ +public abstract class CompositeSerializer extends TypeSerializer { + private static final long serialVersionUID = 1L; + + protected final TypeSerializer[] fieldSerializers; + final boolean isImmutableTargetType; + private final int length; + + @SuppressWarnings("unchecked") + protected CompositeSerializer(boolean isImmutableTargetType, TypeSerializer ... fieldSerializers) { + Preconditions.checkNotNull(fieldSerializers); + Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull)); + this.isImmutableTargetType = isImmutableTargetType; + this.fieldSerializers = (TypeSerializer[]) fieldSerializers; + this.length = calcLength(); + } + + /** Create new instance from its fields. */ + public abstract T createInstance(@Nonnull Object ... values); + + /** Modify field of existing instance. Supported only by mutable types. */ + protected abstract void setField(@Nonnull T value, int index, Object fieldValue); + + /** Get field of existing instance. */ + protected abstract Object getField(@Nonnull T value, int index); + + /** Factory for concrete serializer. */ + protected abstract CompositeSerializer createSerializerInstance(TypeSerializer ... originalSerializers); + + @Override + public CompositeSerializer duplicate() { + TypeSerializer[] duplicatedSerializers = new TypeSerializer[fieldSerializers.length]; + boolean stateful = false; + for (int index = 0; index < fieldSerializers.length; index++) { + duplicatedSerializers[index] = fieldSerializers[index].duplicate(); + if (fieldSerializers[index] != duplicatedSerializers[index]) { + stateful = true; + } + } + return stateful ? createSerializerInstance(duplicatedSerializers) : this; + } + + @Override + public boolean isImmutableType() { + for (TypeSerializer fieldSerializer : fieldSerializers) { + if (!fieldSerializer.isImmutableType()) { + return false; + } + } + return isImmutableTargetType; + } + + @Override + public T createInstance() { + Object[] fields = new Object[fieldSerializers.length]; + for (int index = 0; index < fieldSerializers.length; index++) { + fields[index] = fieldSerializers[index].createInstance(); + } + return createInstance(fields); + } + + @Override + public T copy(T from)
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529773#comment-16529773 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199474258 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * Base class for composite serializers. + * + * This class serializes a composite type using array of its field serializers. + * Fields are indexed the same way as their serializers. + * + * @param type of custom serialized value + */ +public abstract class CompositeSerializer extends TypeSerializer { + private static final long serialVersionUID = 1L; + + protected final TypeSerializer[] fieldSerializers; + final boolean isImmutableTargetType; + private final int length; + + @SuppressWarnings("unchecked") + protected CompositeSerializer(boolean isImmutableTargetType, TypeSerializer ... fieldSerializers) { + Preconditions.checkNotNull(fieldSerializers); + Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull)); + this.isImmutableTargetType = isImmutableTargetType; + this.fieldSerializers = (TypeSerializer[]) fieldSerializers; + this.length = calcLength(); + } + + /** Create new instance from its fields. */ + public abstract T createInstance(@Nonnull Object ... values); + + /** Modify field of existing instance. Supported only by mutable types. */ + protected abstract void setField(@Nonnull T value, int index, Object fieldValue); + + /** Get field of existing instance. */ + protected abstract Object getField(@Nonnull T value, int index); + + /** Factory for concrete serializer. */ + protected abstract CompositeSerializer createSerializerInstance(TypeSerializer ... originalSerializers); + + @Override + public CompositeSerializer duplicate() { + TypeSerializer[] duplicatedSerializers = new TypeSerializer[fieldSerializers.length]; + boolean stateful = false; + for (int index = 0; index < fieldSerializers.length; index++) { + duplicatedSerializers[index] = fieldSerializers[index].duplicate(); + if (fieldSerializers[index] != duplicatedSerializers[index]) { + stateful = true; + } + } + return stateful ? createSerializerInstance(duplicatedSerializers) : this; + } + + @Override + public boolean isImmutableType() { + for (TypeSerializer fieldSerializer : fieldSerializers) { --- End diff -- Why not compute this once in the constructor and remember in a boolean flag?? > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529772#comment-16529772 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199474152 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * Base class for composite serializers. + * + * This class serializes a composite type using array of its field serializers. + * Fields are indexed the same way as their serializers. + * + * @param type of custom serialized value + */ +public abstract class CompositeSerializer extends TypeSerializer { + private static final long serialVersionUID = 1L; + + protected final TypeSerializer[] fieldSerializers; + final boolean isImmutableTargetType; + private final int length; + + @SuppressWarnings("unchecked") + protected CompositeSerializer(boolean isImmutableTargetType, TypeSerializer ... fieldSerializers) { + Preconditions.checkNotNull(fieldSerializers); + Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull)); + this.isImmutableTargetType = isImmutableTargetType; + this.fieldSerializers = (TypeSerializer[]) fieldSerializers; + this.length = calcLength(); + } + + /** Create new instance from its fields. */ + public abstract T createInstance(@Nonnull Object ... values); + + /** Modify field of existing instance. Supported only by mutable types. */ + protected abstract void setField(@Nonnull T value, int index, Object fieldValue); + + /** Get field of existing instance. */ + protected abstract Object getField(@Nonnull T value, int index); + + /** Factory for concrete serializer. */ + protected abstract CompositeSerializer createSerializerInstance(TypeSerializer ... originalSerializers); + + @Override + public CompositeSerializer duplicate() { + TypeSerializer[] duplicatedSerializers = new TypeSerializer[fieldSerializers.length]; + boolean stateful = false; + for (int index = 0; index < fieldSerializers.length; index++) { + duplicatedSerializers[index] = fieldSerializers[index].duplicate(); + if (fieldSerializers[index] != duplicatedSerializers[index]) { --- End diff -- I wonder if we need to do these checks every time `duplicate()` is called? We could check it once, remember if all field serializer are stateless and from that point return `this` immediately. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue {
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529696#comment-16529696 ] ASF GitHub Bot commented on FLINK-9513: --- Github user azagrebin commented on the issue: https://github.com/apache/flink/pull/6196 Regarding CI, seems to be unrelated test `YARNSessionCapacitySchedulerITCase`, [passed in my CI](https://travis-ci.org/azagrebin/flink/builds/398352328) > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateBinder { > StateBinder binder; > ProcessingTimeProvier timeProvider; // System.currentTimeMillis() > TtlValueState createValueState(valueDesc) { > serializer = new TtlValueSerializer(valueDesc.getSerializer); > ttlValueDesc = new ValueDesc(serializer, ...); > // or implement custom TypeInfo > originalStateWithTtl = binder.createValueState(valueDesc); > return new TtlValueState(originalStateWithTtl, timeProvider); > } > // List, Map, ... > } > {code} > TTL serializer should add expiration timestamp -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16528012#comment-16528012 ] ASF GitHub Bot commented on FLINK-9513: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199231639 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +public class TtlStateFactory { + public static IS createStateAndWrapWithTtlIfEnabled( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc, + KeyedStateFactory originalStateFactory, + TtlConfig ttlConfig, + TtlTimeProvider timeProvider) throws Exception { + return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ? + originalStateFactory.createState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + .createState(namespaceSerializer, stateDesc); + } + + private final Map, StateFactory> stateFactories; + + private final KeyedStateFactory originalStateFactory; + private final TtlConfig ttlConfig; + private final TtlTimeProvider timeProvider; + + private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) { --- End diff -- I added checks in `createStateAndWrapWithTtlIfEnabled`, the only user of this private constructor. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue seria
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16528009#comment-16528009 ] ASF GitHub Bot commented on FLINK-9513: --- Github user azagrebin commented on the issue: https://github.com/apache/flink/pull/6196 @StefanRRichter @sihuazhou Thanks guys for the helpful review. I refactored the `CompositeSerializer` to rely rather on loops than streams and added tests for it. Please, have a look. cc @twalthr Maybe, the implementation of `CompositeSerializer` in this PR could be potentially interesting for other complex types, e.g. tuples or pojos allowing code reuse where it is possible without performance loss. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateBinder { > StateBinder binder; > ProcessingTimeProvier timeProvider; // System.currentTimeMillis() > TtlValueState createValueState(valueDesc) { > serializer = new TtlValueSerializer(valueDesc.getSerializer); > ttlValueDesc = new ValueDesc(serializer, ...); > // or implement custom TypeInfo > originalStateWithTtl = binder.createValueState(valueDesc); > return new TtlValueState(originalStateWithTtl, timeProvider); > } > // List, Map, ... > } > {code} > TTL serializer should add expiration timestamp -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527993#comment-16527993 ] ASF GitHub Bot commented on FLINK-9513: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199228508 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +public class TtlStateFactory { + public static IS createStateAndWrapWithTtlIfEnabled( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc, + KeyedStateFactory originalStateFactory, + TtlConfig ttlConfig, + TtlTimeProvider timeProvider) throws Exception { + return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ? + originalStateFactory.createState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + .createState(namespaceSerializer, stateDesc); + } + + private final Map, StateFactory> stateFactories; + + private final KeyedStateFactory originalStateFactory; + private final TtlConfig ttlConfig; + private final TtlTimeProvider timeProvider; + + private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) { + this.originalStateFactory = originalStateFactory; + this.ttlConfig = ttlConfig; + this.timeProvider = timeProvider; + this.stateFactories = createStateFactories(); + } + + private Map, StateFactory> createStateFactories() { + return Stream.of( + Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState), + Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState), + Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState), + Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState), + Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState), + Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState) + ).collect(Collectors.toMap(t -> t.f0, t -> t.f1)); + } + + private interface StateFactory { +IS create( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc) throws Exception; + } + + priv
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527988#comment-16527988 ] ASF GitHub Bot commented on FLINK-9513: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199227970 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +public class TtlStateFactory { + public static IS createStateAndWrapWithTtlIfEnabled( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc, + KeyedStateFactory originalStateFactory, + TtlConfig ttlConfig, + TtlTimeProvider timeProvider) throws Exception { + return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ? + originalStateFactory.createState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + .createState(namespaceSerializer, stateDesc); + } + + private final Map, StateFactory> stateFactories; + + private final KeyedStateFactory originalStateFactory; + private final TtlConfig ttlConfig; + private final TtlTimeProvider timeProvider; + + private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) { + this.originalStateFactory = originalStateFactory; + this.ttlConfig = ttlConfig; + this.timeProvider = timeProvider; + this.stateFactories = createStateFactories(); + } + + private Map, StateFactory> createStateFactories() { + return Stream.of( + Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState), + Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState), + Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState), + Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState), + Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState), + Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState) + ).collect(Collectors.toMap(t -> t.f0, t -> t.f1)); + } + + private interface StateFactory { +IS create( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc) throws Exception; + } + + priv
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527987#comment-16527987 ] ASF GitHub Bot commented on FLINK-9513: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199227770 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +public class TtlStateFactory { + public static IS createStateAndWrapWithTtlIfEnabled( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc, + KeyedStateFactory originalStateFactory, + TtlConfig ttlConfig, + TtlTimeProvider timeProvider) throws Exception { + return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ? + originalStateFactory.createState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + .createState(namespaceSerializer, stateDesc); + } + + private final Map, StateFactory> stateFactories; + + private final KeyedStateFactory originalStateFactory; + private final TtlConfig ttlConfig; + private final TtlTimeProvider timeProvider; + + private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) { + this.originalStateFactory = originalStateFactory; + this.ttlConfig = ttlConfig; + this.timeProvider = timeProvider; + this.stateFactories = createStateFactories(); + } + + private Map, StateFactory> createStateFactories() { + return Stream.of( + Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState), + Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState), + Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState), + Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState), + Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState), + Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState) + ).collect(Collectors.toMap(t -> t.f0, t -> t.f1)); + } + + private interface StateFactory { +IS create( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc) throws Exception; + } + + priv
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527982#comment-16527982 ] ASF GitHub Bot commented on FLINK-9513: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199226982 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +public class TtlStateFactory { + public static IS createStateAndWrapWithTtlIfEnabled( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc, + KeyedStateFactory originalStateFactory, + TtlConfig ttlConfig, + TtlTimeProvider timeProvider) throws Exception { + return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ? + originalStateFactory.createState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + .createState(namespaceSerializer, stateDesc); + } + + private final Map, StateFactory> stateFactories; + + private final KeyedStateFactory originalStateFactory; + private final TtlConfig ttlConfig; + private final TtlTimeProvider timeProvider; + + private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) { + this.originalStateFactory = originalStateFactory; + this.ttlConfig = ttlConfig; + this.timeProvider = timeProvider; + this.stateFactories = createStateFactories(); + } + + private Map, StateFactory> createStateFactories() { + return Stream.of( + Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState), + Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState), + Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState), + Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState), + Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState), + Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState) + ).collect(Collectors.toMap(t -> t.f0, t -> t.f1)); + } + + private interface StateFactory { --- End diff -- Leftover from previous iteration 👍 > Wrap state binder with TTL logic > > > Key: FLINK-9513 >
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527980#comment-16527980 ] ASF GitHub Bot commented on FLINK-9513: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199226833 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +public class TtlStateFactory { + public static IS createStateAndWrapWithTtlIfEnabled( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc, + KeyedStateFactory originalStateFactory, + TtlConfig ttlConfig, + TtlTimeProvider timeProvider) throws Exception { + return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ? + originalStateFactory.createState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + .createState(namespaceSerializer, stateDesc); + } + + private final Map, StateFactory> stateFactories; --- End diff -- I think creating the map is not a big deal because it will happen on init without big pressure on GC. Also map is more declarative and flexible if new state needs to be added then just the map needs to be modified but not the method. Or if needed in future, it can be easier refactored to build the mapping somewhere else and inject it into factory. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateB
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523640#comment-16523640 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6196 Thanks for the nice contribution. I had some comments inline. In particular three points about the serializer. I would suggest to avoid the use of raw types. I would also suggest to avoid using streaming API at least methods that can appear in hot loops (mainly copy, de/serialize) for performance reasons. I think the imperative style code will not even be (much) longer in those cases. Last, I suggest to always test new serializers via the `SerializerTestBase` because this catches many problems with little effort. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateBinder { > StateBinder binder; > ProcessingTimeProvier timeProvider; // System.currentTimeMillis() > TtlValueState createValueState(valueDesc) { > serializer = new TtlValueSerializer(valueDesc.getSerializer); > ttlValueDesc = new ValueDesc(serializer, ...); > // or implement custom TypeInfo > originalStateWithTtl = binder.createValueState(valueDesc); > return new TtlValueState(originalStateWithTtl, timeProvider); > } > // List, Map, ... > } > {code} > TTL serializer should add expiration timestamp -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523622#comment-16523622 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r198115714 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,204 @@ +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a list of objects + * + * @param type of custom serialized value + */ +@SuppressWarnings("unchecked") +public abstract class CompositeSerializer extends TypeSerializer { + private final List originalSerializers; + + protected CompositeSerializer(List originalSerializers) { + Preconditions.checkNotNull(originalSerializers); + this.originalSerializers = originalSerializers; + } + + protected abstract T composeValue(List values); + + protected abstract List decomposeValue(T v); + + protected abstract CompositeSerializer createSerializerInstance(List originalSerializers); + + private T composeValueInternal(List values) { + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return composeValue(values); + } + + private List decomposeValueInternal(T v) { + List values = decomposeValue(v); + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return values; + } + + private CompositeSerializer createSerializerInstanceInternal(List originalSerializers) { + Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size()); + return createSerializerInstance(originalSerializers); + } + + @Override + public CompositeSerializer duplicate() { + return createSerializerInstanceInternal(originalSerializers.stream() + .map(TypeSerializer::duplicate) + .collect(Collectors.toList())); + } + + @Override + public boolean isImmutableType() { + return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType); + } + + @Override + public T createInstance() { + return composeValueInternal(originalSerializers.stream() + .map(TypeSerializer::createInstance) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from) { + List originalValues = decomposeValueInternal(from); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from, T reuse) { + List originalFromValues = decomposeValueInternal(from); + List originalReuseValues = decomposeValueInternal(reuse); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i))) --- End diff -- +1 > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is >
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523623#comment-16523623 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r198115772 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,204 @@ +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a list of objects + * + * @param type of custom serialized value + */ +@SuppressWarnings("unchecked") +public abstract class CompositeSerializer extends TypeSerializer { + private final List originalSerializers; + + protected CompositeSerializer(List originalSerializers) { + Preconditions.checkNotNull(originalSerializers); + this.originalSerializers = originalSerializers; + } + + protected abstract T composeValue(List values); + + protected abstract List decomposeValue(T v); + + protected abstract CompositeSerializer createSerializerInstance(List originalSerializers); + + private T composeValueInternal(List values) { + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return composeValue(values); + } + + private List decomposeValueInternal(T v) { + List values = decomposeValue(v); + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return values; + } + + private CompositeSerializer createSerializerInstanceInternal(List originalSerializers) { + Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size()); + return createSerializerInstance(originalSerializers); + } + + @Override + public CompositeSerializer duplicate() { + return createSerializerInstanceInternal(originalSerializers.stream() + .map(TypeSerializer::duplicate) + .collect(Collectors.toList())); + } + + @Override + public boolean isImmutableType() { + return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType); + } + + @Override + public T createInstance() { + return composeValueInternal(originalSerializers.stream() + .map(TypeSerializer::createInstance) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from) { + List originalValues = decomposeValueInternal(from); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from, T reuse) { + List originalFromValues = decomposeValueInternal(from); + List originalReuseValues = decomposeValueInternal(reuse); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public int getLength() { + return originalSerializers.stream().allMatch(s -> s.getLength() >= 0) ? --- End diff -- +1 > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new ob
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523620#comment-16523620 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r198115667 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,204 @@ +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a list of objects + * + * @param type of custom serialized value + */ +@SuppressWarnings("unchecked") +public abstract class CompositeSerializer extends TypeSerializer { + private final List originalSerializers; + + protected CompositeSerializer(List originalSerializers) { + Preconditions.checkNotNull(originalSerializers); + this.originalSerializers = originalSerializers; + } + + protected abstract T composeValue(List values); + + protected abstract List decomposeValue(T v); + + protected abstract CompositeSerializer createSerializerInstance(List originalSerializers); + + private T composeValueInternal(List values) { + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return composeValue(values); + } + + private List decomposeValueInternal(T v) { + List values = decomposeValue(v); + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return values; + } + + private CompositeSerializer createSerializerInstanceInternal(List originalSerializers) { + Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size()); + return createSerializerInstance(originalSerializers); + } + + @Override + public CompositeSerializer duplicate() { + return createSerializerInstanceInternal(originalSerializers.stream() + .map(TypeSerializer::duplicate) + .collect(Collectors.toList())); + } + + @Override + public boolean isImmutableType() { + return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType); + } + + @Override + public T createInstance() { + return composeValueInternal(originalSerializers.stream() + .map(TypeSerializer::createInstance) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from) { + List originalValues = decomposeValueInternal(from); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i))) --- End diff -- I would also suggest to consider lower-level constructs for potentially hot methods like this one because this api can easily introduce (non obvious) performance regression. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateBinder { > StateBinder binder; > ProcessingTimeProvie
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523618#comment-16523618 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r198114853 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,204 @@ +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a list of objects + * + * @param type of custom serialized value + */ +@SuppressWarnings("unchecked") +public abstract class CompositeSerializer extends TypeSerializer { --- End diff -- You could simply have tests for all the new serializer by extending `SerializerTestBase`, which I would recommend to do to catch mistakes. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateBinder { > StateBinder binder; > ProcessingTimeProvier timeProvider; // System.currentTimeMillis() > TtlValueState createValueState(valueDesc) { > serializer = new TtlValueSerializer(valueDesc.getSerializer); > ttlValueDesc = new ValueDesc(serializer, ...); > // or implement custom TypeInfo > originalStateWithTtl = binder.createValueState(valueDesc); > return new TtlValueState(originalStateWithTtl, timeProvider); > } > // List, Map, ... > } > {code} > TTL serializer should add expiration timestamp -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523614#comment-16523614 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r198113448 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,204 @@ +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a list of objects + * + * @param type of custom serialized value + */ +@SuppressWarnings("unchecked") +public abstract class CompositeSerializer extends TypeSerializer { + private final List originalSerializers; + + protected CompositeSerializer(List originalSerializers) { + Preconditions.checkNotNull(originalSerializers); + this.originalSerializers = originalSerializers; + } + + protected abstract T composeValue(List values); --- End diff -- Also here, list is just used as raw type. I think also the abstract methods could use some documentation for people that would like to create new subclasses in the future. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateBinder { > StateBinder binder; > ProcessingTimeProvier timeProvider; // System.currentTimeMillis() > TtlValueState createValueState(valueDesc) { > serializer = new TtlValueSerializer(valueDesc.getSerializer); > ttlValueDesc = new ValueDesc(serializer, ...); > // or implement custom TypeInfo > originalStateWithTtl = binder.createValueState(valueDesc); > return new TtlValueState(originalStateWithTtl, timeProvider); > } > // List, Map, ... > } > {code} > TTL serializer should add expiration timestamp -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523613#comment-16523613 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r198113175 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,204 @@ +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a list of objects + * + * @param type of custom serialized value + */ +@SuppressWarnings("unchecked") +public abstract class CompositeSerializer extends TypeSerializer { + private final List originalSerializers; --- End diff -- Why is this class using a lot of raw types instead of wildcards? E.g. why `List`instead of `List>` > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateBinder { > StateBinder binder; > ProcessingTimeProvier timeProvider; // System.currentTimeMillis() > TtlValueState createValueState(valueDesc) { > serializer = new TtlValueSerializer(valueDesc.getSerializer); > ttlValueDesc = new ValueDesc(serializer, ...); > // or implement custom TypeInfo > originalStateWithTtl = binder.createValueState(valueDesc); > return new TtlValueState(originalStateWithTtl, timeProvider); > } > // List, Map, ... > } > {code} > TTL serializer should add expiration timestamp -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523434#comment-16523434 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r198062463 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +public class TtlStateFactory { + public static IS createStateAndWrapWithTtlIfEnabled( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc, + KeyedStateFactory originalStateFactory, + TtlConfig ttlConfig, + TtlTimeProvider timeProvider) throws Exception { + return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ? + originalStateFactory.createState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + .createState(namespaceSerializer, stateDesc); + } + + private final Map, StateFactory> stateFactories; + + private final KeyedStateFactory originalStateFactory; + private final TtlConfig ttlConfig; + private final TtlTimeProvider timeProvider; + + private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) { + this.originalStateFactory = originalStateFactory; + this.ttlConfig = ttlConfig; + this.timeProvider = timeProvider; + this.stateFactories = createStateFactories(); + } + + private Map, StateFactory> createStateFactories() { + return Stream.of( + Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState), + Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState), + Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState), + Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState), + Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState), + Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState) + ).collect(Collectors.toMap(t -> t.f0, t -> t.f1)); + } + + private interface StateFactory { +IS create( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc) throws Exception; + } + +
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523432#comment-16523432 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r198061894 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +public class TtlStateFactory { + public static IS createStateAndWrapWithTtlIfEnabled( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc, + KeyedStateFactory originalStateFactory, + TtlConfig ttlConfig, + TtlTimeProvider timeProvider) throws Exception { + return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ? + originalStateFactory.createState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + .createState(namespaceSerializer, stateDesc); + } + + private final Map, StateFactory> stateFactories; + + private final KeyedStateFactory originalStateFactory; + private final TtlConfig ttlConfig; + private final TtlTimeProvider timeProvider; + + private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) { + this.originalStateFactory = originalStateFactory; + this.ttlConfig = ttlConfig; + this.timeProvider = timeProvider; + this.stateFactories = createStateFactories(); + } + + private Map, StateFactory> createStateFactories() { + return Stream.of( + Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState), + Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState), + Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState), + Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState), + Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState), + Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState) + ).collect(Collectors.toMap(t -> t.f0, t -> t.f1)); + } + + private interface StateFactory { +IS create( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc) throws Exception; + } + +
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523429#comment-16523429 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r198060715 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +public class TtlStateFactory { + public static IS createStateAndWrapWithTtlIfEnabled( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc, + KeyedStateFactory originalStateFactory, + TtlConfig ttlConfig, + TtlTimeProvider timeProvider) throws Exception { + return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ? + originalStateFactory.createState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + .createState(namespaceSerializer, stateDesc); + } + + private final Map, StateFactory> stateFactories; + + private final KeyedStateFactory originalStateFactory; + private final TtlConfig ttlConfig; + private final TtlTimeProvider timeProvider; + + private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) { + this.originalStateFactory = originalStateFactory; + this.ttlConfig = ttlConfig; + this.timeProvider = timeProvider; + this.stateFactories = createStateFactories(); + } + + private Map, StateFactory> createStateFactories() { + return Stream.of( + Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState), + Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState), + Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState), + Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState), + Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState), + Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState) + ).collect(Collectors.toMap(t -> t.f0, t -> t.f1)); + } + + private interface StateFactory { --- End diff -- Do we really need this interface? It looks identical to `KeyedStateFactory` and seems that interface would fit here as well? > Wrap state binder wit
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523426#comment-16523426 ] ASF GitHub Bot commented on FLINK-9513: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r198058861 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +/** This factory produces concrete state objects in backends. */ +public interface KeyedStateFactory { --- End diff -- I think this interface can go into the file of `KeyedStateBackend` because it belongs into that context. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateBinder { > StateBinder binder; > ProcessingTimeProvier timeProvider; // System.currentTimeMillis() > TtlValueState createValueState(valueDesc) { > serializer = new TtlValueSerializer(valueDesc.getSerializer); > ttlValueDesc = new ValueDesc(serializer, ...); > // or implement custom TypeInfo > originalStateWithTtl = binder.createValueState(valueDesc); > return new TtlValueState(originalStateWithTtl, timeProvider); > } > // List, Map, ... > } > {code} > TTL serializer should add expiration timestamp -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523210#comment-16523210 ] ASF GitHub Bot commented on FLINK-9513: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r198015848 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,204 @@ +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a list of objects + * + * @param type of custom serialized value + */ +@SuppressWarnings("unchecked") +public abstract class CompositeSerializer extends TypeSerializer { + private final List originalSerializers; + + protected CompositeSerializer(List originalSerializers) { + Preconditions.checkNotNull(originalSerializers); + this.originalSerializers = originalSerializers; + } + + protected abstract T composeValue(List values); + + protected abstract List decomposeValue(T v); + + protected abstract CompositeSerializer createSerializerInstance(List originalSerializers); + + private T composeValueInternal(List values) { + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return composeValue(values); + } + + private List decomposeValueInternal(T v) { + List values = decomposeValue(v); + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return values; + } + + private CompositeSerializer createSerializerInstanceInternal(List originalSerializers) { + Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size()); + return createSerializerInstance(originalSerializers); + } + + @Override + public CompositeSerializer duplicate() { + return createSerializerInstanceInternal(originalSerializers.stream() + .map(TypeSerializer::duplicate) + .collect(Collectors.toList())); + } + + @Override + public boolean isImmutableType() { + return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType); + } + + @Override + public T createInstance() { + return composeValueInternal(originalSerializers.stream() + .map(TypeSerializer::createInstance) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from) { + List originalValues = decomposeValueInternal(from); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from, T reuse) { + List originalFromValues = decomposeValueInternal(from); + List originalReuseValues = decomposeValueInternal(reuse); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public int getLength() { + return originalSerializers.stream().allMatch(s -> s.getLength() >= 0) ? + originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1; + } + + @Override + public void serialize(T record, DataOutputView target) throws IOException { + List originalValues = decomposeValueInternal(record); + for (int i = 0; i < originalSerializers.size(); i++) { + originalSerializers.get(i).serialize(originalValues.get(i), target); + } + } + + @Override + public T deserialize(DataInputView source) throws IOException { + List originalValues = new ArrayList(); + for (TypeSerializer typeSerializer : originalSer
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523209#comment-16523209 ] ASF GitHub Bot commented on FLINK-9513: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r198015825 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,204 @@ +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a list of objects + * + * @param type of custom serialized value + */ +@SuppressWarnings("unchecked") +public abstract class CompositeSerializer extends TypeSerializer { + private final List originalSerializers; + + protected CompositeSerializer(List originalSerializers) { + Preconditions.checkNotNull(originalSerializers); + this.originalSerializers = originalSerializers; + } + + protected abstract T composeValue(List values); + + protected abstract List decomposeValue(T v); + + protected abstract CompositeSerializer createSerializerInstance(List originalSerializers); + + private T composeValueInternal(List values) { + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return composeValue(values); + } + + private List decomposeValueInternal(T v) { + List values = decomposeValue(v); + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return values; + } + + private CompositeSerializer createSerializerInstanceInternal(List originalSerializers) { + Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size()); + return createSerializerInstance(originalSerializers); + } + + @Override + public CompositeSerializer duplicate() { + return createSerializerInstanceInternal(originalSerializers.stream() + .map(TypeSerializer::duplicate) + .collect(Collectors.toList())); + } + + @Override + public boolean isImmutableType() { + return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType); + } + + @Override + public T createInstance() { + return composeValueInternal(originalSerializers.stream() + .map(TypeSerializer::createInstance) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from) { + List originalValues = decomposeValueInternal(from); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from, T reuse) { + List originalFromValues = decomposeValueInternal(from); + List originalReuseValues = decomposeValueInternal(reuse); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public int getLength() { + return originalSerializers.stream().allMatch(s -> s.getLength() >= 0) ? + originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1; + } + + @Override + public void serialize(T record, DataOutputView target) throws IOException { + List originalValues = decomposeValueInternal(record); + for (int i = 0; i < originalSerializers.size(); i++) { + originalSerializers.get(i).serialize(originalValues.get(i), target); + } + } + + @Override + public T deserialize(DataInputView source) throws IOException { + List originalValues = new ArrayList(); + for (TypeSerializer typeSerializer : originalSer
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523205#comment-16523205 ] ASF GitHub Bot commented on FLINK-9513: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r198015385 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,204 @@ +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a list of objects + * + * @param type of custom serialized value + */ +@SuppressWarnings("unchecked") +public abstract class CompositeSerializer extends TypeSerializer { + private final List originalSerializers; + + protected CompositeSerializer(List originalSerializers) { + Preconditions.checkNotNull(originalSerializers); + this.originalSerializers = originalSerializers; + } + + protected abstract T composeValue(List values); + + protected abstract List decomposeValue(T v); + + protected abstract CompositeSerializer createSerializerInstance(List originalSerializers); + + private T composeValueInternal(List values) { + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return composeValue(values); + } + + private List decomposeValueInternal(T v) { + List values = decomposeValue(v); + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return values; + } + + private CompositeSerializer createSerializerInstanceInternal(List originalSerializers) { + Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size()); + return createSerializerInstance(originalSerializers); + } + + @Override + public CompositeSerializer duplicate() { + return createSerializerInstanceInternal(originalSerializers.stream() + .map(TypeSerializer::duplicate) + .collect(Collectors.toList())); + } + + @Override + public boolean isImmutableType() { + return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType); + } + + @Override + public T createInstance() { + return composeValueInternal(originalSerializers.stream() + .map(TypeSerializer::createInstance) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from) { + List originalValues = decomposeValueInternal(from); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i))) --- End diff -- Same here might result in a bad performance because of the random access of the list. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateBinder { > StateBinder binder; > ProcessingTimeProvier timeProvider; // System.currentTimeMillis() > TtlValueState createValueState(valueDesc)
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523203#comment-16523203 ] ASF GitHub Bot commented on FLINK-9513: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r198015282 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,204 @@ +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a list of objects + * + * @param type of custom serialized value + */ +@SuppressWarnings("unchecked") +public abstract class CompositeSerializer extends TypeSerializer { + private final List originalSerializers; + + protected CompositeSerializer(List originalSerializers) { + Preconditions.checkNotNull(originalSerializers); + this.originalSerializers = originalSerializers; + } + + protected abstract T composeValue(List values); + + protected abstract List decomposeValue(T v); + + protected abstract CompositeSerializer createSerializerInstance(List originalSerializers); + + private T composeValueInternal(List values) { + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return composeValue(values); + } + + private List decomposeValueInternal(T v) { + List values = decomposeValue(v); + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return values; + } + + private CompositeSerializer createSerializerInstanceInternal(List originalSerializers) { + Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size()); + return createSerializerInstance(originalSerializers); + } + + @Override + public CompositeSerializer duplicate() { + return createSerializerInstanceInternal(originalSerializers.stream() + .map(TypeSerializer::duplicate) + .collect(Collectors.toList())); + } + + @Override + public boolean isImmutableType() { + return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType); + } + + @Override + public T createInstance() { + return composeValueInternal(originalSerializers.stream() + .map(TypeSerializer::createInstance) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from) { + List originalValues = decomposeValueInternal(from); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from, T reuse) { + List originalFromValues = decomposeValueInternal(from); + List originalReuseValues = decomposeValueInternal(reuse); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i))) --- End diff -- I would suggest to use the `Iterator` instead of `xxx.get(i)` here, because if the `originalSerializers` (or the other instances) is type of LinkedList the performance could be bad. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implemen
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523200#comment-16523200 ] ASF GitHub Bot commented on FLINK-9513: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r198015105 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,204 @@ +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a list of objects + * + * @param type of custom serialized value + */ +@SuppressWarnings("unchecked") +public abstract class CompositeSerializer extends TypeSerializer { + private final List originalSerializers; + + protected CompositeSerializer(List originalSerializers) { + Preconditions.checkNotNull(originalSerializers); + this.originalSerializers = originalSerializers; + } + + protected abstract T composeValue(List values); + + protected abstract List decomposeValue(T v); + + protected abstract CompositeSerializer createSerializerInstance(List originalSerializers); + + private T composeValueInternal(List values) { + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return composeValue(values); + } + + private List decomposeValueInternal(T v) { + List values = decomposeValue(v); + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return values; + } + + private CompositeSerializer createSerializerInstanceInternal(List originalSerializers) { + Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size()); + return createSerializerInstance(originalSerializers); + } + + @Override + public CompositeSerializer duplicate() { + return createSerializerInstanceInternal(originalSerializers.stream() + .map(TypeSerializer::duplicate) + .collect(Collectors.toList())); + } + + @Override + public boolean isImmutableType() { + return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType); + } + + @Override + public T createInstance() { + return composeValueInternal(originalSerializers.stream() + .map(TypeSerializer::createInstance) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from) { + List originalValues = decomposeValueInternal(from); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from, T reuse) { + List originalFromValues = decomposeValueInternal(from); + List originalReuseValues = decomposeValueInternal(reuse); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public int getLength() { + return originalSerializers.stream().allMatch(s -> s.getLength() >= 0) ? --- End diff -- nit: if we don't use the high level API, we could do this in a single loop with a better performance. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class h
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523197#comment-16523197 ] ASF GitHub Bot commented on FLINK-9513: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r198014886 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,204 @@ +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a list of objects + * + * @param type of custom serialized value + */ +@SuppressWarnings("unchecked") +public abstract class CompositeSerializer extends TypeSerializer { + private final List originalSerializers; + + protected CompositeSerializer(List originalSerializers) { + Preconditions.checkNotNull(originalSerializers); + this.originalSerializers = originalSerializers; + } + + protected abstract T composeValue(List values); + + protected abstract List decomposeValue(T v); + + protected abstract CompositeSerializer createSerializerInstance(List originalSerializers); + + private T composeValueInternal(List values) { + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return composeValue(values); + } + + private List decomposeValueInternal(T v) { + List values = decomposeValue(v); + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return values; + } + + private CompositeSerializer createSerializerInstanceInternal(List originalSerializers) { + Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size()); + return createSerializerInstance(originalSerializers); + } + + @Override + public CompositeSerializer duplicate() { + return createSerializerInstanceInternal(originalSerializers.stream() + .map(TypeSerializer::duplicate) + .collect(Collectors.toList())); + } + + @Override + public boolean isImmutableType() { + return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType); + } + + @Override + public T createInstance() { + return composeValueInternal(originalSerializers.stream() + .map(TypeSerializer::createInstance) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from) { + List originalValues = decomposeValueInternal(from); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from, T reuse) { + List originalFromValues = decomposeValueInternal(from); + List originalReuseValues = decomposeValueInternal(reuse); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public int getLength() { + return originalSerializers.stream().allMatch(s -> s.getLength() >= 0) ? + originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1; + } + + @Override + public void serialize(T record, DataOutputView target) throws IOException { + List originalValues = decomposeValueInternal(record); + for (int i = 0; i < originalSerializers.size(); i++) { + originalSerializers.get(i).serialize(originalValues.get(i), target); --- End diff -- If `originalSerializers` or the `originalValues` is a type of something like `LinkedList`, then the `originalSerializers.get(i)` and `originalValues.get(i)` will get a very poor performance. I think we might sho
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523187#comment-16523187 ] ASF GitHub Bot commented on FLINK-9513: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r198014021 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +public class TtlStateFactory { + public static IS createStateAndWrapWithTtlIfEnabled( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc, + KeyedStateFactory originalStateFactory, + TtlConfig ttlConfig, + TtlTimeProvider timeProvider) throws Exception { + return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ? + originalStateFactory.createState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + .createState(namespaceSerializer, stateDesc); + } + + private final Map, StateFactory> stateFactories; --- End diff -- I see, but this looks a bit weird, it looks like that the `stateFactories` is only used for looking up the corresponding state factory for the `stateDesc`, and we need to firstly create it every time when calling the `createStateAndWrapWithTtlIfEnabled()`, the flow of the `createStateAndWrapWithTtlIfEnabled()` looks like: - create a map of `state factory` (stateFactories). - use the stateFactories to look up the corresponding state factory for the `stateDesc`. - ... In that case, maybe use the `switch case` to find the corresponding state factory is better, at lest we don't need to firstly create a map of `state factory` for every call this way. What do you think? > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder fac
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520552#comment-16520552 ] ASF GitHub Bot commented on FLINK-9513: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r197498439 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +public class TtlStateFactory { + public static IS createStateAndWrapWithTtlIfEnabled( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc, + KeyedStateFactory originalStateFactory, + TtlConfig ttlConfig, + TtlTimeProvider timeProvider) throws Exception { + return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ? + originalStateFactory.createState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + .createState(namespaceSerializer, stateDesc); + } + + private final Map, StateFactory> stateFactories; --- End diff -- It could be static. The idea was to create `TtlStateFactory` object for each call of `createStateAndWrapWithTtlIfEnabled`. I think creating of state object will not happen often, only at init. The object contains non static references to originalStateFactory, ttlConfig, timeProvider to make `createXxxState` methods less verbose. `stateFactories` contains reference to this object in `createStateFactories`, so it cannot be static unless everything is static and `createXxxState` are more verbose and accept additional parameters: originalStateFactory, ttlConfig, timeProvider. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > b
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519962#comment-16519962 ] ASF GitHub Bot commented on FLINK-9513: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r197331145 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +public class TtlStateFactory { + public static IS createStateAndWrapWithTtlIfEnabled( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc, + KeyedStateFactory originalStateFactory, + TtlConfig ttlConfig, + TtlTimeProvider timeProvider) throws Exception { + return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ? + originalStateFactory.createState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + .createState(namespaceSerializer, stateDesc); + } + + private final Map, StateFactory> stateFactories; + + private final KeyedStateFactory originalStateFactory; + private final TtlConfig ttlConfig; + private final TtlTimeProvider timeProvider; + + private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) { --- End diff -- Would be better to check the args are not null, or simply use the `@Nonnull` annotation. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation l
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519961#comment-16519961 ] ASF GitHub Bot commented on FLINK-9513: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r197329533 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,204 @@ +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a list of objects + * + * @param type of custom serialized value + */ +@SuppressWarnings("unchecked") +public abstract class CompositeSerializer extends TypeSerializer { + private final List originalSerializers; + + protected CompositeSerializer(List originalSerializers) { + Preconditions.checkNotNull(originalSerializers); + this.originalSerializers = originalSerializers; + } + + protected abstract T composeValue(List values); + + protected abstract List decomposeValue(T v); + + protected abstract CompositeSerializer createSerializerInstance(List originalSerializers); + + private T composeValueInternal(List values) { + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return composeValue(values); + } + + private List decomposeValueInternal(T v) { + List values = decomposeValue(v); + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return values; + } + + private CompositeSerializer createSerializerInstanceInternal(List originalSerializers) { + Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size()); --- End diff -- I think this check looks like a bug. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateBinder { > StateBinder binder; > ProcessingTimeProvier timeProvider; // System.currentTimeMillis() > TtlValueState createValueState(valueDesc) { > serializer = new TtlValueSerializer(valueDesc.getSerializer); > ttlValueDesc = new ValueDesc(serializer, ...); > // or implement custom TypeInfo > originalStateWithTtl = binder.createValueState(valueDesc); > return new TtlValueState(originalStateWithTtl, timeProvider); > } > // List, Map, ... > } > {code} > TTL serializer should add expiration timestamp -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519968#comment-16519968 ] ASF GitHub Bot commented on FLINK-9513: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r197331741 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +public class TtlStateFactory { + public static IS createStateAndWrapWithTtlIfEnabled( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc, + KeyedStateFactory originalStateFactory, + TtlConfig ttlConfig, + TtlTimeProvider timeProvider) throws Exception { + return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ? + originalStateFactory.createState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + .createState(namespaceSerializer, stateDesc); + } + + private final Map, StateFactory> stateFactories; + + private final KeyedStateFactory originalStateFactory; + private final TtlConfig ttlConfig; + private final TtlTimeProvider timeProvider; + + private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) { + this.originalStateFactory = originalStateFactory; + this.ttlConfig = ttlConfig; + this.timeProvider = timeProvider; + this.stateFactories = createStateFactories(); + } + + private Map, StateFactory> createStateFactories() { + return Stream.of( + Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState), + Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState), + Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState), + Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState), + Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState), + Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState) + ).collect(Collectors.toMap(t -> t.f0, t -> t.f1)); + } + + private interface StateFactory { +IS create( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc) throws Exception; + } + + priv
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519960#comment-16519960 ] ASF GitHub Bot commented on FLINK-9513: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r197330596 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,204 @@ +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a list of objects + * + * @param type of custom serialized value + */ +@SuppressWarnings("unchecked") +public abstract class CompositeSerializer extends TypeSerializer { + private final List originalSerializers; + + protected CompositeSerializer(List originalSerializers) { + Preconditions.checkNotNull(originalSerializers); + this.originalSerializers = originalSerializers; + } + + protected abstract T composeValue(List values); + + protected abstract List decomposeValue(T v); + + protected abstract CompositeSerializer createSerializerInstance(List originalSerializers); + + private T composeValueInternal(List values) { + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return composeValue(values); + } + + private List decomposeValueInternal(T v) { + List values = decomposeValue(v); + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return values; + } + + private CompositeSerializer createSerializerInstanceInternal(List originalSerializers) { + Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size()); + return createSerializerInstance(originalSerializers); + } + + @Override + public CompositeSerializer duplicate() { + return createSerializerInstanceInternal(originalSerializers.stream() + .map(TypeSerializer::duplicate) + .collect(Collectors.toList())); + } + + @Override + public boolean isImmutableType() { + return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType); + } + + @Override + public T createInstance() { + return composeValueInternal(originalSerializers.stream() + .map(TypeSerializer::createInstance) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from) { + List originalValues = decomposeValueInternal(from); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from, T reuse) { + List originalFromValues = decomposeValueInternal(from); + List originalReuseValues = decomposeValueInternal(reuse); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public int getLength() { + return originalSerializers.stream().allMatch(s -> s.getLength() >= 0) ? + originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1; + } + + @Override + public void serialize(T record, DataOutputView target) throws IOException { + List originalValues = decomposeValueInternal(record); + for (int i = 0; i < originalSerializers.size(); i++) { + originalSerializers.get(i).serialize(originalValues.get(i), target); + } + } + + @Override + public T deserialize(DataInputView source) throws IOException { + List originalValues = new ArrayList(); + for (TypeSerializer typeSerializer : originalSer
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519966#comment-16519966 ] ASF GitHub Bot commented on FLINK-9513: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r197331764 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +public class TtlStateFactory { + public static IS createStateAndWrapWithTtlIfEnabled( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc, + KeyedStateFactory originalStateFactory, + TtlConfig ttlConfig, + TtlTimeProvider timeProvider) throws Exception { + return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ? + originalStateFactory.createState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + .createState(namespaceSerializer, stateDesc); + } + + private final Map, StateFactory> stateFactories; + + private final KeyedStateFactory originalStateFactory; + private final TtlConfig ttlConfig; + private final TtlTimeProvider timeProvider; + + private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) { + this.originalStateFactory = originalStateFactory; + this.ttlConfig = ttlConfig; + this.timeProvider = timeProvider; + this.stateFactories = createStateFactories(); + } + + private Map, StateFactory> createStateFactories() { + return Stream.of( + Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState), + Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState), + Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState), + Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState), + Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState), + Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState) + ).collect(Collectors.toMap(t -> t.f0, t -> t.f1)); + } + + private interface StateFactory { +IS create( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc) throws Exception; + } + + priv
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519967#comment-16519967 ] ASF GitHub Bot commented on FLINK-9513: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r197331820 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +public class TtlStateFactory { + public static IS createStateAndWrapWithTtlIfEnabled( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc, + KeyedStateFactory originalStateFactory, + TtlConfig ttlConfig, + TtlTimeProvider timeProvider) throws Exception { + return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ? + originalStateFactory.createState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + .createState(namespaceSerializer, stateDesc); + } + + private final Map, StateFactory> stateFactories; + + private final KeyedStateFactory originalStateFactory; + private final TtlConfig ttlConfig; + private final TtlTimeProvider timeProvider; + + private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) { + this.originalStateFactory = originalStateFactory; + this.ttlConfig = ttlConfig; + this.timeProvider = timeProvider; + this.stateFactories = createStateFactories(); + } + + private Map, StateFactory> createStateFactories() { + return Stream.of( + Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState), + Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState), + Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState), + Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState), + Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState), + Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState) + ).collect(Collectors.toMap(t -> t.f0, t -> t.f1)); + } + + private interface StateFactory { +IS create( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc) throws Exception; + } + + priv
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519959#comment-16519959 ] ASF GitHub Bot commented on FLINK-9513: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r197330095 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,204 @@ +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a list of objects + * + * @param type of custom serialized value + */ +@SuppressWarnings("unchecked") +public abstract class CompositeSerializer extends TypeSerializer { + private final List originalSerializers; + + protected CompositeSerializer(List originalSerializers) { + Preconditions.checkNotNull(originalSerializers); + this.originalSerializers = originalSerializers; + } + + protected abstract T composeValue(List values); + + protected abstract List decomposeValue(T v); + + protected abstract CompositeSerializer createSerializerInstance(List originalSerializers); + + private T composeValueInternal(List values) { + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return composeValue(values); + } + + private List decomposeValueInternal(T v) { + List values = decomposeValue(v); + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return values; + } + + private CompositeSerializer createSerializerInstanceInternal(List originalSerializers) { + Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size()); + return createSerializerInstance(originalSerializers); + } + + @Override + public CompositeSerializer duplicate() { + return createSerializerInstanceInternal(originalSerializers.stream() + .map(TypeSerializer::duplicate) + .collect(Collectors.toList())); + } + + @Override + public boolean isImmutableType() { + return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType); + } + + @Override + public T createInstance() { + return composeValueInternal(originalSerializers.stream() + .map(TypeSerializer::createInstance) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from) { + List originalValues = decomposeValueInternal(from); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from, T reuse) { + List originalFromValues = decomposeValueInternal(from); + List originalReuseValues = decomposeValueInternal(reuse); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public int getLength() { + return originalSerializers.stream().allMatch(s -> s.getLength() >= 0) ? + originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1; + } + + @Override + public void serialize(T record, DataOutputView target) throws IOException { + List originalValues = decomposeValueInternal(record); + for (int i = 0; i < originalSerializers.size(); i++) { + originalSerializers.get(i).serialize(originalValues.get(i), target); + } + } + + @Override + public T deserialize(DataInputView source) throws IOException { + List originalValues = new ArrayList(); + for (TypeSerializer typeSerializer : originalSer
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519964#comment-16519964 ] ASF GitHub Bot commented on FLINK-9513: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r197331211 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +public class TtlStateFactory { + public static IS createStateAndWrapWithTtlIfEnabled( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc, + KeyedStateFactory originalStateFactory, + TtlConfig ttlConfig, + TtlTimeProvider timeProvider) throws Exception { + return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ? + originalStateFactory.createState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + .createState(namespaceSerializer, stateDesc); + } + + private final Map, StateFactory> stateFactories; --- End diff -- Why this couldn't be static? > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateBinder { > StateBinder binder; > ProcessingTimeProvier timeProvider; // System.currentTimeMillis() > TtlValueState createValueState(valueDesc) { > serializer = new TtlValueSerializer(valueDesc.getSerializer); > ttlValueDesc = new ValueDesc(serializer, ...); > // or implement custom Typ
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519958#comment-16519958 ] ASF GitHub Bot commented on FLINK-9513: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r197329976 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,204 @@ +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a list of objects + * + * @param type of custom serialized value + */ +@SuppressWarnings("unchecked") +public abstract class CompositeSerializer extends TypeSerializer { + private final List originalSerializers; + + protected CompositeSerializer(List originalSerializers) { + Preconditions.checkNotNull(originalSerializers); + this.originalSerializers = originalSerializers; + } + + protected abstract T composeValue(List values); + + protected abstract List decomposeValue(T v); + + protected abstract CompositeSerializer createSerializerInstance(List originalSerializers); + + private T composeValueInternal(List values) { + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return composeValue(values); + } + + private List decomposeValueInternal(T v) { + List values = decomposeValue(v); + Preconditions.checkArgument(values.size() == originalSerializers.size()); + return values; + } + + private CompositeSerializer createSerializerInstanceInternal(List originalSerializers) { + Preconditions.checkArgument(originalSerializers.size() == originalSerializers.size()); + return createSerializerInstance(originalSerializers); + } + + @Override + public CompositeSerializer duplicate() { + return createSerializerInstanceInternal(originalSerializers.stream() + .map(TypeSerializer::duplicate) + .collect(Collectors.toList())); + } + + @Override + public boolean isImmutableType() { + return originalSerializers.stream().allMatch(TypeSerializer::isImmutableType); + } + + @Override + public T createInstance() { + return composeValueInternal(originalSerializers.stream() + .map(TypeSerializer::createInstance) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from) { + List originalValues = decomposeValueInternal(from); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public T copy(T from, T reuse) { + List originalFromValues = decomposeValueInternal(from); + List originalReuseValues = decomposeValueInternal(reuse); + return composeValueInternal( + IntStream.range(0, originalSerializers.size()) + .mapToObj(i -> originalSerializers.get(i).copy(originalFromValues.get(i), originalReuseValues.get(i))) + .collect(Collectors.toList())); + } + + @Override + public int getLength() { + return originalSerializers.stream().allMatch(s -> s.getLength() >= 0) ? + originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1; + } + + @Override + public void serialize(T record, DataOutputView target) throws IOException { + List originalValues = decomposeValueInternal(record); + for (int i = 0; i < originalSerializers.size(); i++) { + originalSerializers.get(i).serialize(originalValues.get(i), target); + } + } + + @Override + public T deserialize(DataInputView source) throws IOException { + List originalValues = new ArrayList(); --- End diff -- I would suggest to give a init size
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519963#comment-16519963 ] ASF GitHub Bot commented on FLINK-9513: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r197330813 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java --- @@ -48,7 +48,8 @@ KeyedStateBackend, Snapshotable, Collection>, Closeable, - CheckpointListener { + CheckpointListener, + KeyedStateFactory{ --- End diff -- I think this seems to miss a space ` `. > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateBinder { > StateBinder binder; > ProcessingTimeProvier timeProvider; // System.currentTimeMillis() > TtlValueState createValueState(valueDesc) { > serializer = new TtlValueSerializer(valueDesc.getSerializer); > ttlValueDesc = new ValueDesc(serializer, ...); > // or implement custom TypeInfo > originalStateWithTtl = binder.createValueState(valueDesc); > return new TtlValueState(originalStateWithTtl, timeProvider); > } > // List, Map, ... > } > {code} > TTL serializer should add expiration timestamp -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519965#comment-16519965 ] ASF GitHub Bot commented on FLINK-9513: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r197331898 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +public class TtlStateFactory { + public static IS createStateAndWrapWithTtlIfEnabled( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc, + KeyedStateFactory originalStateFactory, + TtlConfig ttlConfig, + TtlTimeProvider timeProvider) throws Exception { + return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ? + originalStateFactory.createState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + .createState(namespaceSerializer, stateDesc); + } + + private final Map, StateFactory> stateFactories; + + private final KeyedStateFactory originalStateFactory; + private final TtlConfig ttlConfig; + private final TtlTimeProvider timeProvider; + + private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) { + this.originalStateFactory = originalStateFactory; + this.ttlConfig = ttlConfig; + this.timeProvider = timeProvider; + this.stateFactories = createStateFactories(); + } + + private Map, StateFactory> createStateFactories() { + return Stream.of( + Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState), + Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState), + Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState), + Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState), + Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState), + Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState) + ).collect(Collectors.toMap(t -> t.f0, t -> t.f1)); + } + + private interface StateFactory { +IS create( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc) throws Exception; + } + + priv
[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic
[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16519369#comment-16519369 ] ASF GitHub Bot commented on FLINK-9513: --- GitHub user azagrebin opened a pull request: https://github.com/apache/flink/pull/6196 [FLINK-9513] Implement TTL state wrappers factory and serializer for value with TTL ## What is the purpose of the change This PR introduces a state factory for wrapping state objects with TTL logic and serialiser of user value with expiration timestamp. NOTE: This PR is based on #6186 and only last commit makes difference with it and needs review. ## Brief change log - abstract state creation in backends with `KeyedStateFactory` interface - add `TtlStateFactory` - add `CompositeSerializer` ## Verifying this change This change is a trivial addition without any test coverage in this PR and should be covered together with TTL feature activation by final integration and e2e tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (yes) - The runtime per-record code paths (performance sensitive): (not yet) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (not yet) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (not applicable at the moment) You can merge this pull request into a Git repository by running: $ git pull https://github.com/azagrebin/flink FLINK-9513 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6196.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6196 commit 62faa8ee220c21fa824fec690073c27a0a994be5 Author: Andrey Zagrebin Date: 2018-06-04T15:28:40Z [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers commit 74c689e1660d40176b3c131fb0f3f9dcafa33889 Author: Andrey Zagrebin Date: 2018-06-20T15:05:28Z Check overflow in expiration timestamp, allow only non-negative TTL commit 1164aa2a9c4298461eaa44322ef9cefa00b4f0fe Author: Andrey Zagrebin Date: 2018-06-21T12:24:04Z small fixes commit 1d19d4ac2b73ac83290b4b117b82895c99b51865 Author: Andrey Zagrebin Date: 2018-06-21T13:13:42Z Make AbstractTtlState.getSerializedValue() unsupported for now in case of queryable state commit 4dedb9a20244a2addd337617778b17fe8349 Author: Andrey Zagrebin Date: 2018-06-11T17:34:47Z [FLINK-9513] Implement TTL state wrappers factory and serializer for value with TTL > Wrap state binder with TTL logic > > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Affects Versions: 1.6.0 >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateBinder { > StateBinder binder; > ProcessingTimeProvier timeProvider; // System.currentTimeMillis() > TtlValueState createValueState(valueDesc) { > serializer = new TtlValueSerializer(valueDesc.getSerializer); > ttlValueDesc = new ValueDesc(serializer, ...); > // or implement custom TypeInfo > originalStateWithTtl = binder.createValueState(valueDesc); > return new TtlValueState(originalStateWithTtl, timeProvider); > } > // List, Map, ... > } > {code} > TTL serializer should add expiration timestamp -- This message was sent by Atlassian JIRA (v7.6.3#76005)