This only works for keyed streams, you have to use keyBy().

You can use the Checkpointed interface instead
(https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).

On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US)
<buvana.rama...@nokia-bell-labs.com> wrote:
> Hi Kostas,
>
>
>
> Here is my code. All I am trying to compute is (x[t] – x[t-1]), where x[t]
> is the current value of the incoming sample and x[t-1] is the previous value
> of the incoming sample. I store the current value in state store
> (‘prev_tuple’) so that I can use it for computation in next cycle. As you
> may observe, I am not using keyBy. I am simply printing out the resultant
> tuple.
>
>
>
> It appears from the error message that I have to set the key serializer (and
> possibly value serializer) for the state store. I am not sure how to do
> that…
>
>
>
> Thanks for your interest in helping,
>
>
>
>
>
> Regards,
>
> Buvana
>
>
>
> public class stateful {
>
>     private static String INPUT_KAFKA_TOPIC = null;
>
>     private static int TIME_WINDOW = 0;
>
>
>
>     public static void main(String[] args) throws Exception {
>
>
>
>         if (args.length < 2) {
>
>             throw new IllegalArgumentException("The application needs two
> arguments. The first is the name of the kafka topic from which it has to \n"
>
>                     + "fetch the data. The second argument is the size of
> the window, in seconds, to which the aggregation function must be applied.
> \n");
>
>         }
>
>
>
>         INPUT_KAFKA_TOPIC = args[0];
>
>         TIME_WINDOW = Integer.parseInt(args[1]);
>
>
>
>         Properties properties = null;
>
>
>
>         properties = new Properties();
>
>         properties.setProperty("bootstrap.servers", "localhost:9092");
>
>         properties.setProperty("zookeeper.connect", "localhost:2181");
>
>         properties.setProperty("group.id", "test");
>
>
>
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>         //env.setStateBackend(new
> FsStateBackend("file://home/buvana/flink/checkpoints"));
>
>
>
>         DataStreamSource<String> stream = env
>
>                 .addSource(new FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new
> SimpleStringSchema(), properties));
>
>
>
>         // maps the data into Flink tuples
>
>         DataStream<Tuple2<String,Double>> streamTuples = stream.flatMap(new
> Rec2Tuple2());
>
>
>
>         // write the result to the console or in a Kafka topic
>
>         streamTuples.print();
>
>
>
>         env.execute("plus one");
>
>
>
>     }
>
>
>
>     public static class Rec2Tuple2 extends RichFlatMapFunction<String,
> Tuple2<String,Double> > {
>
>         private transient ValueState<Tuple2<String, Double>> prev_tuple;
>
>
>
>         @Override
>
>         public void flatMap(String incString, Collector<Tuple2<String,
> Double>> out) throws Exception {
>
>             try {
>
>                 Double value = Double.parseDouble(incString);
>
>                 System.out.println("value = " + value);
>
>                 Tuple2<String, Double> prev_stored_tp = prev_tuple.value();
>
>                 System.out.println(prev_stored_tp);
>
>
>
>                 Double value2 = value - prev_stored_tp.f1;
>
>                 prev_stored_tp.f1 = value;
>
>                 prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
>
>                 prev_tuple.update(prev_stored_tp);
>
>
>
>                 Tuple2<String, Double> tp = new Tuple2<String, Double>();
>
>                 tp.setField(INPUT_KAFKA_TOPIC, 0);
>
>                 tp.setField(value2, 1);
>
>                 out.collect(tp);
>
>
>
>             } catch (NumberFormatException e) {
>
>                 System.out.println("Could not convert to Float" +
> incString);
>
>                 System.err.println("Could not convert to Float" +
> incString);
>
>             }
>
>         }
>
>
>
>         @Override
>
>         public void open(Configuration config) {
>
>             ValueStateDescriptor<Tuple2<String, Double>> descriptor =
>
>                     new ValueStateDescriptor<>(
>
>                             "previous input value", // the state name
>
>                             TypeInformation.of(new TypeHint<Tuple2<String,
> Double>>() {}), // type information
>
>                             Tuple2.of("test topic", 0.0)); // default value
> of the state, if nothing was set
>
>             prev_tuple = getRuntimeContext().getState(descriptor);
>
>         }
>
>     }
>
> }
>
>
>
> From: Kostas Kloudas [mailto:k.klou...@data-artisans.com]
> Sent: Thursday, August 11, 2016 5:45 AM
> To: user@flink.apache.org
> Subject: Re: flink - Working with State example
>
>
>
> Hello Buvana,
>
>
>
> Can you share a bit more details on your operator and how you are using it?
>
> For example, are you using keyBy before using you custom operator?
>
>
>
> Thanks a lot,
>
> Kostas
>
>
>
> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US)
> <buvana.rama...@nokia-bell-labs.com> wrote:
>
>
>
> Hello,
>
>
>
> I am utilizing the code snippet in:
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html
> and particularly ‘open’ function in my code:
>
> @Override
>
>     public void open(Configuration config) {
>
>         ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
>
>                 new ValueStateDescriptor<>(
>
>                         "average", // the state name
>
>                         TypeInformation.of(new TypeHint<Tuple2<Long,
> Long>>() {}), // type information
>
>                         Tuple2.of(0L, 0L)); // default value of the state,
> if nothing was set
>
>         sum = getRuntimeContext().getState(descriptor);
>
>     }
>
>
>
> When I run, I get the following error:
>
> Caused by: java.lang.RuntimeException: Error while getting state
>
>                at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:120)
>
>                at wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
>
>                at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
>
>                at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
>
>                at
> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:41)
>
>                at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:314)
>
>                at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:214)
>
>                at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>
>                at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.Exception: State key serializer has not been configured
> in the config. This operation cannot use partitioned state.
>
>                at
> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
>
>                at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
>
>                at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:118)
>
>                ... 8 more
>
>
>
> Where do I define the key & value serializer for state?
>
>
>
> Thanks,
>
> Buvana
>
>

Reply via email to