[ https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527987#comment-16527987 ]
ASF GitHub Bot commented on FLINK-9513: --------------------------------------- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199227770 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +public class TtlStateFactory { + public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled( + TypeSerializer<N> namespaceSerializer, + StateDescriptor<S, SV> stateDesc, + KeyedStateFactory originalStateFactory, + TtlConfig ttlConfig, + TtlTimeProvider timeProvider) throws Exception { + return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ? + originalStateFactory.createState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + .createState(namespaceSerializer, stateDesc); + } + + private final Map<Class<? extends StateDescriptor>, StateFactory> stateFactories; + + private final KeyedStateFactory originalStateFactory; + private final TtlConfig ttlConfig; + private final TtlTimeProvider timeProvider; + + private TtlStateFactory(KeyedStateFactory originalStateFactory, TtlConfig ttlConfig, TtlTimeProvider timeProvider) { + this.originalStateFactory = originalStateFactory; + this.ttlConfig = ttlConfig; + this.timeProvider = timeProvider; + this.stateFactories = createStateFactories(); + } + + private Map<Class<? extends StateDescriptor>, StateFactory> createStateFactories() { + return Stream.of( + Tuple2.of(ValueStateDescriptor.class, (StateFactory) this::createValueState), + Tuple2.of(ListStateDescriptor.class, (StateFactory) this::createListState), + Tuple2.of(MapStateDescriptor.class, (StateFactory) this::createMapState), + Tuple2.of(ReducingStateDescriptor.class, (StateFactory) this::createReducingState), + Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) this::createAggregatingState), + Tuple2.of(FoldingStateDescriptor.class, (StateFactory) this::createFoldingState) + ).collect(Collectors.toMap(t -> t.f0, t -> t.f1)); + } + + private interface StateFactory { + <N, SV, S extends State, IS extends S> IS create( + TypeSerializer<N> namespaceSerializer, + StateDescriptor<S, SV> stateDesc) throws Exception; + } + + private <N, SV, S extends State, IS extends S> IS createState( + TypeSerializer<N> namespaceSerializer, + StateDescriptor<S, SV> stateDesc) throws Exception { + StateFactory stateFactory = stateFactories.get(stateDesc.getClass()); + if (stateFactory == null) { + String message = String.format("State %s is not supported by %s", + stateDesc.getClass(), TtlStateFactory.class); + throw new FlinkRuntimeException(message); + } + return stateFactory.create(namespaceSerializer, stateDesc); + } + + @SuppressWarnings("unchecked") + private <N, SV, S extends State, IS extends S> IS createValueState( + TypeSerializer<N> namespaceSerializer, + StateDescriptor<S, SV> stateDesc) throws Exception { + SV defVal = stateDesc.getDefaultValue(); + TtlValue<SV> ttlDefVal = defVal == null ? null : new TtlValue<>(defVal, Long.MAX_VALUE); + ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>( --- End diff -- agree, I think we may actually not expose default value for value state with TTL or new descriptor builders in general > Wrap state binder with TTL logic > -------------------------------- > > Key: FLINK-9513 > URL: https://issues.apache.org/jira/browse/FLINK-9513 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing > Affects Versions: 1.6.0 > Reporter: Andrey Zagrebin > Assignee: Andrey Zagrebin > Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The main idea is to wrap user state value with a class holding the value and > the expiration timestamp (maybe meta data in future) and use the new object > as a value in the existing implementations: > {code:java} > class TtlValue<V> { > V value; > long expirationTimestamp; > } > {code} > The original state binder factory is wrapped with TtlStateBinder if TTL is > enabled: > {code:java} > state = ttlConfig.updateType == DISABLED ? > bind(binder) : bind(new TtlStateBinder(binder, timerService)); > {code} > TtlStateBinder decorates the states produced by the original binder with TTL > logic wrappers and adds TtlValue serialisation logic: > {code:java} > TtlStateBinder { > StateBinder binder; > ProcessingTimeProvier timeProvider; // System.currentTimeMillis() > <V> TtlValueState<V> createValueState(valueDesc) { > serializer = new TtlValueSerializer(valueDesc.getSerializer); > ttlValueDesc = new ValueDesc(serializer, ...); > // or implement custom TypeInfo > originalStateWithTtl = binder.createValueState(valueDesc); > return new TtlValueState(originalStateWithTtl, timeProvider); > } > // List, Map, ... > } > {code} > TTL serializer should add expiration timestamp -- This message was sent by Atlassian JIRA (v7.6.3#76005)