[ 
https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas reassigned FLINK-8090:
-------------------------------------

    Assignee:     (was: Kostas Kloudas)

> An operator should not be able to register two states with the same name.
> -------------------------------------------------------------------------
>
>                 Key: FLINK-8090
>                 URL: https://issues.apache.org/jira/browse/FLINK-8090
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.4.0
>            Reporter: Kostas Kloudas
>
> Currently a {{ProcessFunction}} like this is a valid job:
> {code}
> final MapStateDescriptor<Integer, Tuple2<Integer, Long>> 
> firstMapStateDescriptor = new MapStateDescriptor<>(
>                                       "timon-one",
>                                       BasicTypeInfo.INT_TYPE_INFO,
>                                       source.getType());
> final MapStateDescriptor<Integer, Tuple2<Integer, Long>> 
> secondMapStateDescriptor = new MapStateDescriptor<>(
>                                       "timon-one",
>                                       BasicTypeInfo.INT_TYPE_INFO,
>                                       source.getType());
> new ProcessFunction<Tuple2<Integer, Long>, Object>() {
>                               private static final long serialVersionUID = 
> -805125545438296619L;
>                               private transient MapState<Integer, 
> Tuple2<Integer, Long>> firstMapState;
>                               private transient MapState<Integer, 
> Tuple2<Integer, Long>> secondMapState;
>                               @Override
>                               public void open(Configuration parameters) 
> throws Exception {
>                                       super.open(parameters);
>                                       firstMapState = 
> getRuntimeContext().getMapState(firstMapStateDescriptor);
>                                       secondMapState = 
> getRuntimeContext().getMapState(secondMapStateDescriptor);
>                               }
>                               @Override
>                               public void processElement(Tuple2<Integer, 
> Long> value, Context ctx, Collector<Object> out) throws Exception {
>                                       Tuple2<Integer, Long> v = 
> firstMapState.get(value.f0);
>                                       if (v == null) {
>                                               v = new Tuple2<>(value.f0, 0L);
>                                       }
>                                       System.out.println(value);
>                                       firstMapState.put(value.f0, new 
> Tuple2<>(v.f0, v.f1 + value.f1));
>                               }
>                       }
> {code}
> This should not be the case, and the job should fail with an adequate message.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to