[ 
https://issues.apache.org/jira/browse/FLINK-6775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-6775.
--------------------------------
       Resolution: Fixed
    Fix Version/s: 1.4.0
                   1.3.1
                   1.2.2

1.4.0: 88ffad272eea5865cc43bf44a8980754d8711178
1.3.1: d0e417e51fb7f29adfbb8779ceee7c01a9cdc7c7
1.2.2: 6f482aeb36f79a8059be1a2350e6d049cf2020e5

> StateDescriptor cannot be shared by multiple subtasks
> -----------------------------------------------------
>
>                 Key: FLINK-6775
>                 URL: https://issues.apache.org/jira/browse/FLINK-6775
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.0.3, 1.1.4, 1.3.0, 1.2.1, 1.4.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Blocker
>             Fix For: 1.2.2, 1.3.1, 1.4.0
>
>
> The {{StateDescriptor}} contains the {{TypeSerializer}} which is used to 
> serialize the state. The serializer instance won't be duplicated when it is 
> accessed. Therefore, the {{StateDescriptor}} cannot be shared if the 
> {{TypeSerializer}} is stateful as in the case of the {{KryoSerializer}}.
> This problem can easily arise when a user defines a stateful operator which 
> defines the {{StateDescriptor}} statically. The work around is to not define 
> a static {{StateDescriptor}}. However, I would still make it a blocker, 
> because it is extremely hard to debug for the user if things fail because the 
> {{TypeSerializer}} is used concurrently.
> The following operator produces the problem:
> {code}
> private static final class StatefulMapper extends 
> RichMapFunction<Tuple2<Long,Long>, Tuple2<Long, Long>> implements 
> CheckpointedFunction {
>         private static final long serialVersionUID = -1175717056869107847L;
>         private static final ValueStateDescriptor<PojoType> POJO_VALUE_STATE 
> = new ValueStateDescriptor<PojoType>("pojoType", PojoType.class);
>         private transient ValueState<PojoType> valueState;
>         public StatefulMapper() {
>             valueState = null;
>         }
>         @Override
>         public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple) throws 
> Exception {
>             PojoType pojoType = new PojoType(1, 1.0, "1.0", new NestedPojo(2, 
> 2.0));
>             valueState.update(pojoType);
>             return tuple;
>         }
>         @Override
>         public void snapshotState(FunctionSnapshotContext 
> functionSnapshotContext) throws Exception {}
>         @Override
>         public void initializeState(FunctionInitializationContext 
> functionInitializationContext) throws Exception {
>             valueState = 
> functionInitializationContext.getKeyedStateStore().getState(POJO_VALUE_STATE);
>         }
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to