I killed the task manager and job manager forcefully by the kill -9 command and while recovering I am checking the flag returned by the isRestored method in the Intializestate function. anyways I figured the issue and fixed it thanks for the support.
On Tue, Mar 3, 2020 at 7:24 PM Gary Yao <g...@apache.org> wrote: > Hi Puneet, > > Can you describe how you validated that the state is not restored > properly? Specifically, how did you introduce faults to the cluster? > > Best, > Gary > > On Tue, Mar 3, 2020 at 11:08 AM Puneet Kinra < > puneet.ki...@customercentria.com> wrote: > >> Sorry for the missed information >> >> On recovery the value is coming as false instead of true, state.backend >> has been configured in flink-conf.yaml along the >> the path for checkpointing and savepoint. >> >> On Tue, Mar 3, 2020 at 3:34 PM Puneet Kinra < >> puneet.ki...@customercentria.com> wrote: >> >>> Hi >>> >>> Stuck with the simple program regarding the checkpointing Flink version >>> I am using 1.10.0 >>> >>> *Here I have created DummySource for testing* >>> >>> *DummySource* >>> package com.nudge.stateful; >>> >>> import org.apache.flink.api.java.tuple.Tuple2; >>> import org.apache.flink.streaming.api.functions.source.SourceFunction; >>> >>> public class BeaconSource implements SourceFunction<Tuple2<Long,String>>{ >>> >>> /** >>> * >>> */ >>> private static final long serialVersionUID = 1L; >>> private Boolean isRunning=true; >>> >>> >>> public BeaconSource() { >>> super(); >>> // TODO Auto-generated constructor stub >>> } >>> >>> >>> >>> public void cancel() { >>> // TODO Auto-generated method stub >>> >>> this.isRunning=false; >>> >>> } >>> >>> public void run(SourceContext<Tuple2<Long,String>> arg0) throws >>> Exception { >>> // TODO Auto-generated method stub >>> while(isRunning) { >>> Thread.sleep(30000L); >>> arg0.collect(new Tuple2<Long,String>(100000L,"AMQSource")); >>> } >>> } >>> >>> } >>> >>> >>> >>> --------------------------------------------------------------------------------------- >>> *KeyedProcessFunction (to register the timer and update the status to >>> true so that only one-time trigger should)* >>> >>> >>> package com.nudge.stateful; >>> >>> import org.apache.flink.api.common.functions.IterationRuntimeContext; >>> import org.apache.flink.api.common.functions.RuntimeContext; >>> import org.apache.flink.api.common.state.ListState; >>> import org.apache.flink.api.common.state.ListStateDescriptor; >>> import org.apache.flink.api.common.state.ValueState; >>> import org.apache.flink.api.common.state.ValueStateDescriptor; >>> import org.apache.flink.api.common.typeinfo.TypeHint; >>> import org.apache.flink.api.common.typeinfo.TypeInformation; >>> import org.apache.flink.api.java.tuple.Tuple; >>> import org.apache.flink.api.java.tuple.Tuple2; >>> import org.apache.flink.api.java.tuple.Tuple3; >>> import org.apache.flink.configuration.Configuration; >>> import org.apache.flink.runtime.state.FunctionInitializationContext; >>> import org.apache.flink.runtime.state.FunctionSnapshotContext; >>> import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; >>> import org.apache.flink.streaming.api.functions.KeyedProcessFunction; >>> import org.apache.flink.streaming.api.functions.ProcessFunction; >>> import org.apache.flink.util.Collector; >>> >>> import com.google.gson.JsonObject; >>> import com.google.gson.JsonParser; >>> >>> import scala.collection.mutable.LinkedHashMap; >>> >>> >>> >>> import java.util.HashMap; >>> import java.util.Map; >>> import java.util.Map.Entry; >>> import java.util.Set; >>> >>> public class TimeProcessTrigger extends >>> KeyedProcessFunction<Tuple,Tuple2<Long,String>,String>{ >>> >>> /** >>> * >>> */ >>> private static final long serialVersionUID = 1L; >>> /** >>> * >>> */ >>> >>> private transient ValueState<Boolean> contacthistory; >>> private static final Long ONE_MINUTE=60000L; >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> @Override >>> public void onTimer(long timestamp, KeyedProcessFunction<Tuple, >>> Tuple2<Long, String>, String>.OnTimerContext ctx, >>> Collector<String> out) throws Exception { >>> // TODO Auto-generated method stub >>> super.onTimer(timestamp, ctx, out); >>> System.out.println("Timer has fired for the key"+ctx.getCurrentKey()); >>> } >>> >>> >>> >>> >>> >>> >>> @Override >>> public void open(Configuration parameters) throws Exception { >>> // TODO Auto-generated method stub >>> super.open(parameters); >>> >>> >>> ValueStateDescriptor<Boolean> descriptor = new >>> ValueStateDescriptor<Boolean>( >>> "contact-history", // the state name >>> Boolean.class); // type information >>> >>> this.contacthistory=getRuntimeContext().getState(descriptor); >>> } >>> >>> >>> >>> >>> >>> >>> @Override >>> public void processElement(Tuple2<Long, String> input, >>> KeyedProcessFunction<Tuple, Tuple2<Long, String>, String>.Context ctx, >>> Collector<String> collect) >>> throws Exception { >>> // TODO Auto-generated method stub >>> >>> >>> System.out.println(this.contacthistory.value()); >>> Boolean value = this.contacthistory.value(); >>> if(value==null) { >>> Long currentTime = ctx.timerService().currentProcessingTime(); >>> Long regTimer=currentTime+ONE_MINUTE; >>> System.out.println("Updating the flag and registering the timer >>> @:"+regTimer); >>> this.contacthistory.update(true); >>> ctx.timerService().registerProcessingTimeTimer(regTimer); >>> >>> }else { >>> System.out.println("Timer has already register for this key"); >>> } >>> } >>> >>> } >>> >>> >>> ------------------------------------------------- >>> *Main App* >>> >>> package com.nudge.stateful; >>> >>> import org.apache.flink.api.java.functions.KeySelector; >>> import org.apache.flink.api.java.tuple.Tuple; >>> import org.apache.flink.api.java.tuple.Tuple2; >>> import org.apache.flink.api.java.utils.ParameterTool; >>> import org.apache.flink.runtime.state.filesystem.FsStateBackend; >>> import org.apache.flink.streaming.api.CheckpointingMode; >>> import org.apache.flink.streaming.api.TimeCharacteristic; >>> import org.apache.flink.streaming.api.datastream.DataStreamSource; >>> import org.apache.flink.streaming.api.datastream.KeyedStream; >>> import >>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; >>> import >>> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; >>> import >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>> >>> import com.google.gson.JsonObject; >>> import com.google.gson.JsonParser; >>> import com.indiabulls.nudge.stateful.*; >>> >>> public class App >>> { >>> public static void main( String[] args ) throws Exception >>> { >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> env.enableCheckpointing(30000); >>> env.setParallelism(1); >>> // // advanced options: >>> // // set mode to exactly-once (this is the default) >>> >>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >>> // // make sure 500 ms of progress happen between checkpoints >>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000); >>> // // checkpoints have to complete within one minute, or are discarded >>> env.getCheckpointConfig().setCheckpointTimeout(60000); >>> // // allow only one checkpoint to be in progress at the same time >>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); >>> // // enable externalized checkpoints which are retained after job >>> cancellation >>> >>> env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >>> // // allow job recovery fallback to checkpoint when there is a more >>> recent savepoint >>> env.getCheckpointConfig().setPreferCheckpointForRecovery(true); >>> >>> SingleOutputStreamOperator<Tuple2<Long, String>> AMQSource = >>> env.addSource(new BeaconSource()) >>> .name("AMQSource"); >>> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); >>> env.setParallelism(1); >>> KeyedStream<Tuple2<Long, String>, Tuple> keyedValues = >>> AMQSource.keyBy(0); >>> SingleOutputStreamOperator<String> processedStream = >>> keyedValues.process(new TimeProcessTrigger()).setParallelism(1); >>> processedStream.print(); >>> env.execute(); >>> } >>> } >>> -- >>> *Cheers * >>> >>> *Puneet Kinra* >>> >>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com >>> <puneet.ki...@customercentria.com>* >>> >>> *e-mail :puneet.ki...@customercentria.com >>> <puneet.ki...@customercentria.com>* >>> >>> >>> >> >> -- >> *Cheers * >> >> *Puneet Kinra* >> >> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com >> <puneet.ki...@customercentria.com>* >> >> *e-mail :puneet.ki...@customercentria.com >> <puneet.ki...@customercentria.com>* >> >> >> -- *Cheers * *Puneet Kinra* *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com <puneet.ki...@customercentria.com>* *e-mail :puneet.ki...@customercentria.com <puneet.ki...@customercentria.com>*