Hi Jacob,

the `ListState` abstraction is a state which we call partitioned/key-value
state. As such, it is only possible to use it with a keyed stream. This
means that you have to call `keyBy` after the `connect` API call.

Cheers,
Till

On Wed, Jun 22, 2016 at 9:17 PM, Jacob Bay Larsen <m...@jacobbay.dk> wrote:

> Hi,
>
> I am trying to use a ListState in a RichCoFlatMapFunction but when
> calling: getRuntimeContext().getListState(descriptor) in the open-function
> i am getting a "State key serializer has not .." exception. I am not sure
> what i can do to avoid this exception - Are any of you able to provide some
> help ?
>
> Best regards
> Jacob
>
>
>  private ListState<Tuple2<String, Integer>> deltaPositions;
>
>     @Override
>     public void open(org.apache.flink.configuration.Configuration
> parameters) throws Exception {
>       // Create state variable
>       ListStateDescriptor<Tuple2<String, Integer>> descriptor =
>               new ListStateDescriptor<>(
>                       "deltaPositions", // the state name
>                       TypeInformation.of(new TypeHint<Tuple2<String,
> Integer>>() {
>                       }));
>
>       deltaPositions = getRuntimeContext().getListState(descriptor);
>     };
>
>
>
> 2016-06-22 20:41:38,813 INFO  org.apache.flink.runtime.taskmanager.Task
>                   - Stream of Items with collection of meadian times (1/1)
> switched to FAILED with exception.
> java.lang.RuntimeException: Error while getting state
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:131)
> at
> crisplant.bigdata.dataanalysis.baggagemonitor.streaming.liveitemeventsstoring.LiveItemEventsStoring$MergeMedianTimesFlatMapFunction.open(LiveItemEventsStoring.java:83)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:49)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: State key serializer has not been
> configured in the config. This operation cannot use partitioned state.
> at
> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:129)
> ... 8 more
> 2016-06-22 20:41:38,815 INFO  org.apache.flink.runtime.taskmanager.Task
>                   - Freeing ta
>
> --
> Jacob Bay Larsen
>
> Phone: +45 6133 1108
> E-mail: m...@jacobbay.dk
>

Reply via email to