[ https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-8090: ---------------------------------- Labels: auto-deprioritized-major auto-deprioritized-minor auto-unassigned pull-request-available (was: auto-deprioritized-major auto-unassigned pull-request-available stale-minor) Priority: Not a Priority (was: Minor) This issue was labeled "stale-minor" 7 days ago and has not received any updates so it is being deprioritized. If this ticket is actually Minor, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Improve error message when registering different states under the same name. > ---------------------------------------------------------------------------- > > Key: FLINK-8090 > URL: https://issues.apache.org/jira/browse/FLINK-8090 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.4.0 > Reporter: Kostas Kloudas > Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > auto-unassigned, pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Currently a {{ProcessFunction}} like this: > {code} > final MapStateDescriptor<Integer, Tuple2<Integer, Long>> > firstMapStateDescriptor = new MapStateDescriptor<>( > "timon-one", > BasicTypeInfo.INT_TYPE_INFO, > source.getType()); > final ListStateDescriptor<Integer> secondListStateDescriptor = new > ListStateDescriptor<Integer>( > "timon-one", > BasicTypeInfo.INT_TYPE_INFO); > new ProcessFunction<Tuple2<Integer, Long>, Object>() { > private static final long serialVersionUID = > -805125545438296619L; > private transient MapState<Integer, > Tuple2<Integer, Long>> firstMapState; > private transient ListState<Integer> > secondListState; > @Override > public void open(Configuration parameters) > throws Exception { > super.open(parameters); > firstMapState = > getRuntimeContext().getMapState(firstMapStateDescriptor); > secondListState = > getRuntimeContext().getListState(secondListStateDescriptor); > } > @Override > public void processElement(Tuple2<Integer, > Long> value, Context ctx, Collector<Object> out) throws Exception { > Tuple2<Integer, Long> v = > firstMapState.get(value.f0); > if (v == null) { > v = new Tuple2<>(value.f0, 0L); > } > firstMapState.put(value.f0, new > Tuple2<>(v.f0, v.f1 + value.f1)); > } > } > {code} > fails with: > {code} > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127) > at > org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ClassCastException: > org.apache.flink.runtime.state.heap.HeapMapState cannot be cast to > org.apache.flink.api.common.state.ListState > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71) > ... 9 more > {code} > Which is cryptic, as it does not explain the reason for the problem. The > error message should be something along the line of "Duplicate state name". -- This message was sent by Atlassian Jira (v8.20.1#820001)