Hi,

I am trying to use a ListState in a RichCoFlatMapFunction but when
calling: getRuntimeContext().getListState(descriptor) in the open-function
i am getting a "State key serializer has not .." exception. I am not sure
what i can do to avoid this exception - Are any of you able to provide some
help ?

Best regards
Jacob


 private ListState<Tuple2<String, Integer>> deltaPositions;

    @Override
    public void open(org.apache.flink.configuration.Configuration
parameters) throws Exception {
      // Create state variable
      ListStateDescriptor<Tuple2<String, Integer>> descriptor =
              new ListStateDescriptor<>(
                      "deltaPositions", // the state name
                      TypeInformation.of(new TypeHint<Tuple2<String,
Integer>>() {
                      }));

      deltaPositions = getRuntimeContext().getListState(descriptor);
    };



2016-06-22 20:41:38,813 INFO  org.apache.flink.runtime.taskmanager.Task
                - Stream of Items with collection of meadian times (1/1)
switched to FAILED with exception.
java.lang.RuntimeException: Error while getting state
at
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:131)
at
crisplant.bigdata.dataanalysis.baggagemonitor.streaming.liveitemeventsstoring.LiveItemEventsStoring$MergeMedianTimesFlatMapFunction.open(LiveItemEventsStoring.java:83)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:49)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: State key serializer has not been
configured in the config. This operation cannot use partitioned state.
at
org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
at
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:129)
... 8 more
2016-06-22 20:41:38,815 INFO  org.apache.flink.runtime.taskmanager.Task
                - Freeing ta

-- 
Jacob Bay Larsen

Phone: +45 6133 1108
E-mail: m...@jacobbay.dk

Reply via email to