Hi Buvana, 

At a first glance, your snapshotState() should return a Double.

Kostas

> On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) 
> <buvana.rama...@nokia-bell-labs.com> wrote:
> 
> Thank you Kostas & Ufuk. I get into the following compilation error when I 
> use checkpointed interface. Pasting the code & message as follows:
> 
> Is the Serializable definition supposed to be from java.io.Serializable or 
> somewhere else?
> 
> Thanks again,
> Buvana
> 
> ================================================================================================================
> Code:
> 
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.streaming.api.checkpoint.Checkpointed;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> 
> import java.io.Serializable;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
> import org.apache.flink.util.Collector;
> 
> import java.util.Properties;
> 
> /**
> * Created by buvana on 8/9/16.
> */
> public class stateful {
>    private static String INPUT_KAFKA_TOPIC = null;
> ---
> --- skipping the main as it’s the same as before except for class name change 
> -------------
> ---
>       public static class MapStateful extends RichFlatMapFunction<String, 
> Tuple2<String, Double>>
>            implements Checkpointed<Double> {
> 
>        private Double prev_tuple = null;
> 
>        @Override
>        public void flatMap(String incString, Collector<Tuple2<String, 
> Double>> out) {
>            try {
>                Double value = Double.parseDouble(incString);
>                System.out.println("value = " + value);
>                System.out.println(prev_tuple);
> 
>                Double value2 = value - prev_tuple;
>                prev_tuple = value;
> 
>                Tuple2<String, Double> tp = new Tuple2<String, Double>();
>                tp.setField(INPUT_KAFKA_TOPIC, 0);
>                tp.setField(value2, 1);
>                out.collect(tp);
>            } catch (NumberFormatException e) {
>                System.out.println("Could not convert to Float" + incString);
>                System.err.println("Could not convert to Float" + incString);
>            }
>        }
>        @Override
>        public void open(Configuration config) {
>            if (prev_tuple == null) {
>                // only recreate if null
>                // restoreState will be called before open()
>                // so this will already set the sum to the restored value
>                prev_tuple = new Double("0.0");
>            }
>        }
> 
>        @Override
>        public Serializable snapshotState(
>                long checkpointId,
>                long checkpointTimestamp) throws Exception {
>            return prev_tuple;
>        }
> 
> 
>        @Override
>        public void restoreState(Double state) {
>            prev_tuple = state;
>        }
>    }
> }
> ===============================================================================================================
> ERROR message while building:
> 
> $ mvn clean package
> [INFO] Scanning for projects...
> [INFO]                                                                        
>  
> [INFO] 
> ------------------------------------------------------------------------
> [INFO] Building Flink Quickstart Job 0.1
> [INFO] 
> ------------------------------------------------------------------------
> [WARNING] The artifact org.apache.commons:commons-io:jar:1.3.2 has been 
> relocated to commons-io:commons-io:jar:1.3.2
> [INFO] 
> [INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ wiki-edits ---
> [INFO] Deleting /home/buvana/flink/flink-1.1.0/wiki-edits/target
> [INFO] 
> [INFO] --- maven-resources-plugin:2.3:resources (default-resources) @ 
> wiki-edits ---
> [INFO] Using 'UTF-8' encoding to copy filtered resources.
> [INFO] Copying 1 resource
> [INFO] 
> [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ wiki-edits 
> ---
> [INFO] Changes detected - recompiling the module!
> [INFO] Compiling 7 source files to 
> /home/buvana/flink/flink-1.1.0/wiki-edits/target/classes
> [INFO] -------------------------------------------------------------
> [ERROR] COMPILATION ERROR : 
> [INFO] -------------------------------------------------------------
> [ERROR] 
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[116,19]
>  wikiedits.stateful.MapStateful is not abstract and does not override 
> abstract method snapshotState(long,long) in 
> org.apache.flink.streaming.api.checkpoint.Checkpointed
> [ERROR] 
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[151,29]
>  snapshotState(long,long) in wikiedits.stateful.MapStateful cannot implement 
> snapshotState(long,long) in 
> org.apache.flink.streaming.api.checkpoint.Checkpointed
>  return type java.io.Serializable is not compatible with java.lang.Double
> [ERROR] 
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[150,9]
>  method does not override or implement a method from a supertype
> [INFO] 3 errors 
> [INFO] -------------------------------------------------------------
> [INFO] 
> ------------------------------------------------------------------------
> [INFO] BUILD FAILURE
> [INFO] 
> ------------------------------------------------------------------------
> [INFO] Total time: 2.171s
> [INFO] Finished at: Thu Aug 11 10:47:07 EDT 2016
> [INFO] Final Memory: 26M/660M
> [INFO] 
> ------------------------------------------------------------------------
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) 
> on project wiki-edits: Compilation failure: Compilation failure:
> [ERROR] 
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[116,19]
>  wikiedits.stateful.MapStateful is not abstract and does not override 
> abstract method snapshotState(long,long) in 
> org.apache.flink.streaming.api.checkpoint.Checkpointed
> [ERROR] 
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[151,29]
>  snapshotState(long,long) in wikiedits.stateful.MapStateful cannot implement 
> snapshotState(long,long) in 
> org.apache.flink.streaming.api.checkpoint.Checkpointed
> [ERROR] return type java.io.Serializable is not compatible with 
> java.lang.Double
> [ERROR] 
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[150,9]
>  method does not override or implement a method from a supertype
> [ERROR] -> [Help 1]
> [ERROR] 
> [ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
> switch.
> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
> [ERROR] 
> [ERROR] For more information about the errors and possible solutions, please 
> read the following articles:
> [ERROR] [Help 1] 
> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> ================================================================================================================
> 
> -----Original Message-----
> From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] 
> Sent: Thursday, August 11, 2016 10:34 AM
> To: user@flink.apache.org
> Subject: Re: flink - Working with State example
> 
> Exactly as Ufuk suggested, if you are not grouping your stream by key, you 
> should use the checkpointed interface.
> 
> The reason I asked before if you are using the keyBy() is because this is the 
> one that implicitly sets the keySerializer and scopes your (keyed) state to a 
> specific key.
> 
> If there is no keying, then keyed state cannot be used and the Checkpointed 
> interface should be used instead. 
> 
> Let us know if you need anything else.
> 
> Kostas
> 
>> On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <u...@apache.org> wrote:
>> 
>> This only works for keyed streams, you have to use keyBy().
>> 
>> You can use the Checkpointed interface instead 
>> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).
>> 
>> On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US) 
>> <buvana.rama...@nokia-bell-labs.com> wrote:
>>> Hi Kostas,
>>> 
>>> 
>>> 
>>> Here is my code. All I am trying to compute is (x[t] – x[t-1]), where 
>>> x[t] is the current value of the incoming sample and x[t-1] is the 
>>> previous value of the incoming sample. I store the current value in 
>>> state store
>>> (‘prev_tuple’) so that I can use it for computation in next cycle. As 
>>> you may observe, I am not using keyBy. I am simply printing out the 
>>> resultant tuple.
>>> 
>>> 
>>> 
>>> It appears from the error message that I have to set the key 
>>> serializer (and possibly value serializer) for the state store. I am 
>>> not sure how to do that…
>>> 
>>> 
>>> 
>>> Thanks for your interest in helping,
>>> 
>>> 
>>> 
>>> 
>>> 
>>> Regards,
>>> 
>>> Buvana
>>> 
>>> 
>>> 
>>> public class stateful {
>>> 
>>>   private static String INPUT_KAFKA_TOPIC = null;
>>> 
>>>   private static int TIME_WINDOW = 0;
>>> 
>>> 
>>> 
>>>   public static void main(String[] args) throws Exception {
>>> 
>>> 
>>> 
>>>       if (args.length < 2) {
>>> 
>>>           throw new IllegalArgumentException("The application needs 
>>> two arguments. The first is the name of the kafka topic from which it has 
>>> to \n"
>>> 
>>>                   + "fetch the data. The second argument is the size 
>>> of the window, in seconds, to which the aggregation function must be 
>>> applied.
>>> \n");
>>> 
>>>       }
>>> 
>>> 
>>> 
>>>       INPUT_KAFKA_TOPIC = args[0];
>>> 
>>>       TIME_WINDOW = Integer.parseInt(args[1]);
>>> 
>>> 
>>> 
>>>       Properties properties = null;
>>> 
>>> 
>>> 
>>>       properties = new Properties();
>>> 
>>>       properties.setProperty("bootstrap.servers", "localhost:9092");
>>> 
>>>       properties.setProperty("zookeeper.connect", "localhost:2181");
>>> 
>>>       properties.setProperty("group.id", "test");
>>> 
>>> 
>>> 
>>>       StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> 
>>>       //env.setStateBackend(new
>>> FsStateBackend("file://home/buvana/flink/checkpoints"));
>>> 
>>> 
>>> 
>>>       DataStreamSource<String> stream = env
>>> 
>>>               .addSource(new 
>>> FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new SimpleStringSchema(), 
>>> properties));
>>> 
>>> 
>>> 
>>>       // maps the data into Flink tuples
>>> 
>>>       DataStream<Tuple2<String,Double>> streamTuples = 
>>> stream.flatMap(new Rec2Tuple2());
>>> 
>>> 
>>> 
>>>       // write the result to the console or in a Kafka topic
>>> 
>>>       streamTuples.print();
>>> 
>>> 
>>> 
>>>       env.execute("plus one");
>>> 
>>> 
>>> 
>>>   }
>>> 
>>> 
>>> 
>>>   public static class Rec2Tuple2 extends RichFlatMapFunction<String, 
>>> Tuple2<String,Double> > {
>>> 
>>>       private transient ValueState<Tuple2<String, Double>> 
>>> prev_tuple;
>>> 
>>> 
>>> 
>>>       @Override
>>> 
>>>       public void flatMap(String incString, Collector<Tuple2<String,
>>> Double>> out) throws Exception {
>>> 
>>>           try {
>>> 
>>>               Double value = Double.parseDouble(incString);
>>> 
>>>               System.out.println("value = " + value);
>>> 
>>>               Tuple2<String, Double> prev_stored_tp = 
>>> prev_tuple.value();
>>> 
>>>               System.out.println(prev_stored_tp);
>>> 
>>> 
>>> 
>>>               Double value2 = value - prev_stored_tp.f1;
>>> 
>>>               prev_stored_tp.f1 = value;
>>> 
>>>               prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
>>> 
>>>               prev_tuple.update(prev_stored_tp);
>>> 
>>> 
>>> 
>>>               Tuple2<String, Double> tp = new Tuple2<String, 
>>> Double>();
>>> 
>>>               tp.setField(INPUT_KAFKA_TOPIC, 0);
>>> 
>>>               tp.setField(value2, 1);
>>> 
>>>               out.collect(tp);
>>> 
>>> 
>>> 
>>>           } catch (NumberFormatException e) {
>>> 
>>>               System.out.println("Could not convert to Float" + 
>>> incString);
>>> 
>>>               System.err.println("Could not convert to Float" + 
>>> incString);
>>> 
>>>           }
>>> 
>>>       }
>>> 
>>> 
>>> 
>>>       @Override
>>> 
>>>       public void open(Configuration config) {
>>> 
>>>           ValueStateDescriptor<Tuple2<String, Double>> descriptor =
>>> 
>>>                   new ValueStateDescriptor<>(
>>> 
>>>                           "previous input value", // the state name
>>> 
>>>                           TypeInformation.of(new 
>>> TypeHint<Tuple2<String,
>>> Double>>() {}), // type information
>>> 
>>>                           Tuple2.of("test topic", 0.0)); // default 
>>> value of the state, if nothing was set
>>> 
>>>           prev_tuple = getRuntimeContext().getState(descriptor);
>>> 
>>>       }
>>> 
>>>   }
>>> 
>>> }
>>> 
>>> 
>>> 
>>> From: Kostas Kloudas [mailto:k.klou...@data-artisans.com]
>>> Sent: Thursday, August 11, 2016 5:45 AM
>>> To: user@flink.apache.org
>>> Subject: Re: flink - Working with State example
>>> 
>>> 
>>> 
>>> Hello Buvana,
>>> 
>>> 
>>> 
>>> Can you share a bit more details on your operator and how you are using it?
>>> 
>>> For example, are you using keyBy before using you custom operator?
>>> 
>>> 
>>> 
>>> Thanks a lot,
>>> 
>>> Kostas
>>> 
>>> 
>>> 
>>> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US) 
>>> <buvana.rama...@nokia-bell-labs.com> wrote:
>>> 
>>> 
>>> 
>>> Hello,
>>> 
>>> 
>>> 
>>> I am utilizing the code snippet in:
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming
>>> /state.html and particularly ‘open’ function in my code:
>>> 
>>> @Override
>>> 
>>>   public void open(Configuration config) {
>>> 
>>>       ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
>>> 
>>>               new ValueStateDescriptor<>(
>>> 
>>>                       "average", // the state name
>>> 
>>>                       TypeInformation.of(new TypeHint<Tuple2<Long,
>>> Long>>() {}), // type information
>>> 
>>>                       Tuple2.of(0L, 0L)); // default value of the 
>>> state, if nothing was set
>>> 
>>>       sum = getRuntimeContext().getState(descriptor);
>>> 
>>>   }
>>> 
>>> 
>>> 
>>> When I run, I get the following error:
>>> 
>>> Caused by: java.lang.RuntimeException: Error while getting state
>>> 
>>>              at
>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getS
>>> tate(StreamingRuntimeContext.java:120)
>>> 
>>>              at 
>>> wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
>>> 
>>>              at
>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction
>>> (FunctionUtils.java:38)
>>> 
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.op
>>> en(AbstractUdfStreamOperator.java:91)
>>> 
>>>              at
>>> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFla
>>> tMap.java:41)
>>> 
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(
>>> StreamTask.java:314)
>>> 
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
>>> .java:214)
>>> 
>>>              at
>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> 
>>>              at java.lang.Thread.run(Thread.java:745)
>>> 
>>> Caused by: java.lang.Exception: State key serializer has not been 
>>> configured in the config. This operation cannot use partitioned state.
>>> 
>>>              at
>>> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedSta
>>> te(AbstractStateBackend.java:199)
>>> 
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPa
>>> rtitionedState(AbstractStreamOperator.java:260)
>>> 
>>>              at
>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getS
>>> tate(StreamingRuntimeContext.java:118)
>>> 
>>>              ... 8 more
>>> 
>>> 
>>> 
>>> Where do I define the key & value serializer for state?
>>> 
>>> 
>>> 
>>> Thanks,
>>> 
>>> Buvana
>>> 
>>> 
> 

Reply via email to