[ https://issues.apache.org/jira/browse/FLINK-6775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Till Rohrmann closed FLINK-6775. -------------------------------- Resolution: Fixed Fix Version/s: 1.4.0 1.3.1 1.2.2 1.4.0: 88ffad272eea5865cc43bf44a8980754d8711178 1.3.1: d0e417e51fb7f29adfbb8779ceee7c01a9cdc7c7 1.2.2: 6f482aeb36f79a8059be1a2350e6d049cf2020e5 > StateDescriptor cannot be shared by multiple subtasks > ----------------------------------------------------- > > Key: FLINK-6775 > URL: https://issues.apache.org/jira/browse/FLINK-6775 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.0.3, 1.1.4, 1.3.0, 1.2.1, 1.4.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > Priority: Blocker > Fix For: 1.2.2, 1.3.1, 1.4.0 > > > The {{StateDescriptor}} contains the {{TypeSerializer}} which is used to > serialize the state. The serializer instance won't be duplicated when it is > accessed. Therefore, the {{StateDescriptor}} cannot be shared if the > {{TypeSerializer}} is stateful as in the case of the {{KryoSerializer}}. > This problem can easily arise when a user defines a stateful operator which > defines the {{StateDescriptor}} statically. The work around is to not define > a static {{StateDescriptor}}. However, I would still make it a blocker, > because it is extremely hard to debug for the user if things fail because the > {{TypeSerializer}} is used concurrently. > The following operator produces the problem: > {code} > private static final class StatefulMapper extends > RichMapFunction<Tuple2<Long,Long>, Tuple2<Long, Long>> implements > CheckpointedFunction { > private static final long serialVersionUID = -1175717056869107847L; > private static final ValueStateDescriptor<PojoType> POJO_VALUE_STATE > = new ValueStateDescriptor<PojoType>("pojoType", PojoType.class); > private transient ValueState<PojoType> valueState; > public StatefulMapper() { > valueState = null; > } > @Override > public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple) throws > Exception { > PojoType pojoType = new PojoType(1, 1.0, "1.0", new NestedPojo(2, > 2.0)); > valueState.update(pojoType); > return tuple; > } > @Override > public void snapshotState(FunctionSnapshotContext > functionSnapshotContext) throws Exception {} > @Override > public void initializeState(FunctionInitializationContext > functionInitializationContext) throws Exception { > valueState = > functionInitializationContext.getKeyedStateStore().getState(POJO_VALUE_STATE); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)