Sorry for the missed information On recovery the value is coming as false instead of true, state.backend has been configured in flink-conf.yaml along the the path for checkpointing and savepoint.
On Tue, Mar 3, 2020 at 3:34 PM Puneet Kinra < puneet.ki...@customercentria.com> wrote: > Hi > > Stuck with the simple program regarding the checkpointing Flink version I > am using 1.10.0 > > *Here I have created DummySource for testing* > > *DummySource* > package com.nudge.stateful; > > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.streaming.api.functions.source.SourceFunction; > > public class BeaconSource implements SourceFunction<Tuple2<Long,String>>{ > > /** > * > */ > private static final long serialVersionUID = 1L; > private Boolean isRunning=true; > > > public BeaconSource() { > super(); > // TODO Auto-generated constructor stub > } > > > > public void cancel() { > // TODO Auto-generated method stub > > this.isRunning=false; > > } > > public void run(SourceContext<Tuple2<Long,String>> arg0) throws Exception { > // TODO Auto-generated method stub > while(isRunning) { > Thread.sleep(30000L); > arg0.collect(new Tuple2<Long,String>(100000L,"AMQSource")); > } > } > > } > > > > --------------------------------------------------------------------------------------- > *KeyedProcessFunction (to register the timer and update the status to true > so that only one-time trigger should)* > > > package com.nudge.stateful; > > import org.apache.flink.api.common.functions.IterationRuntimeContext; > import org.apache.flink.api.common.functions.RuntimeContext; > import org.apache.flink.api.common.state.ListState; > import org.apache.flink.api.common.state.ListStateDescriptor; > import org.apache.flink.api.common.state.ValueState; > import org.apache.flink.api.common.state.ValueStateDescriptor; > import org.apache.flink.api.common.typeinfo.TypeHint; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.tuple.Tuple; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.api.java.tuple.Tuple3; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.runtime.state.FunctionInitializationContext; > import org.apache.flink.runtime.state.FunctionSnapshotContext; > import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; > import org.apache.flink.streaming.api.functions.KeyedProcessFunction; > import org.apache.flink.streaming.api.functions.ProcessFunction; > import org.apache.flink.util.Collector; > > import com.google.gson.JsonObject; > import com.google.gson.JsonParser; > > import scala.collection.mutable.LinkedHashMap; > > > > import java.util.HashMap; > import java.util.Map; > import java.util.Map.Entry; > import java.util.Set; > > public class TimeProcessTrigger extends > KeyedProcessFunction<Tuple,Tuple2<Long,String>,String>{ > > /** > * > */ > private static final long serialVersionUID = 1L; > /** > * > */ > > private transient ValueState<Boolean> contacthistory; > private static final Long ONE_MINUTE=60000L; > > > > > > > > > > > @Override > public void onTimer(long timestamp, KeyedProcessFunction<Tuple, > Tuple2<Long, String>, String>.OnTimerContext ctx, > Collector<String> out) throws Exception { > // TODO Auto-generated method stub > super.onTimer(timestamp, ctx, out); > System.out.println("Timer has fired for the key"+ctx.getCurrentKey()); > } > > > > > > > @Override > public void open(Configuration parameters) throws Exception { > // TODO Auto-generated method stub > super.open(parameters); > > > ValueStateDescriptor<Boolean> descriptor = new > ValueStateDescriptor<Boolean>( > "contact-history", // the state name > Boolean.class); // type information > > this.contacthistory=getRuntimeContext().getState(descriptor); > } > > > > > > > @Override > public void processElement(Tuple2<Long, String> input, > KeyedProcessFunction<Tuple, Tuple2<Long, String>, String>.Context ctx, > Collector<String> collect) > throws Exception { > // TODO Auto-generated method stub > > > System.out.println(this.contacthistory.value()); > Boolean value = this.contacthistory.value(); > if(value==null) { > Long currentTime = ctx.timerService().currentProcessingTime(); > Long regTimer=currentTime+ONE_MINUTE; > System.out.println("Updating the flag and registering the timer > @:"+regTimer); > this.contacthistory.update(true); > ctx.timerService().registerProcessingTimeTimer(regTimer); > > }else { > System.out.println("Timer has already register for this key"); > } > } > > } > > > ------------------------------------------------- > *Main App* > > package com.nudge.stateful; > > import org.apache.flink.api.java.functions.KeySelector; > import org.apache.flink.api.java.tuple.Tuple; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.api.java.utils.ParameterTool; > import org.apache.flink.runtime.state.filesystem.FsStateBackend; > import org.apache.flink.streaming.api.CheckpointingMode; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import org.apache.flink.streaming.api.datastream.KeyedStream; > import > org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; > import > org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > > import com.google.gson.JsonObject; > import com.google.gson.JsonParser; > import com.indiabulls.nudge.stateful.*; > > public class App > { > public static void main( String[] args ) throws Exception > { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(30000); > env.setParallelism(1); > // // advanced options: > // // set mode to exactly-once (this is the default) > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > // // make sure 500 ms of progress happen between checkpoints > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000); > // // checkpoints have to complete within one minute, or are discarded > env.getCheckpointConfig().setCheckpointTimeout(60000); > // // allow only one checkpoint to be in progress at the same time > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > // // enable externalized checkpoints which are retained after job > cancellation > > env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > // // allow job recovery fallback to checkpoint when there is a more > recent savepoint > env.getCheckpointConfig().setPreferCheckpointForRecovery(true); > > SingleOutputStreamOperator<Tuple2<Long, String>> AMQSource = > env.addSource(new BeaconSource()) > .name("AMQSource"); > env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); > env.setParallelism(1); > KeyedStream<Tuple2<Long, String>, Tuple> keyedValues = AMQSource.keyBy(0); > SingleOutputStreamOperator<String> processedStream = > keyedValues.process(new TimeProcessTrigger()).setParallelism(1); > processedStream.print(); > env.execute(); > } > } > -- > *Cheers * > > *Puneet Kinra* > > *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com > <puneet.ki...@customercentria.com>* > > *e-mail :puneet.ki...@customercentria.com > <puneet.ki...@customercentria.com>* > > > -- *Cheers * *Puneet Kinra* *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com <puneet.ki...@customercentria.com>* *e-mail :puneet.ki...@customercentria.com <puneet.ki...@customercentria.com>*