I killed the task manager and job manager forcefully by the kill -9 command
and while recovering
I am checking the flag returned by the isRestored method in the
Intializestate function.
 anyways I figured the issue and fixed it thanks for the support.

On Tue, Mar 3, 2020 at 7:24 PM Gary Yao <g...@apache.org> wrote:

> Hi Puneet,
>
> Can you describe how you validated that the state is not restored
> properly? Specifically, how did you introduce faults to the cluster?
>
> Best,
> Gary
>
> On Tue, Mar 3, 2020 at 11:08 AM Puneet Kinra <
> puneet.ki...@customercentria.com> wrote:
>
>> Sorry for the missed information
>>
>> On recovery the value is coming as false instead of true, state.backend
>> has been configured in flink-conf.yaml  along the
>> the path for checkpointing and savepoint.
>>
>> On Tue, Mar 3, 2020 at 3:34 PM Puneet Kinra <
>> puneet.ki...@customercentria.com> wrote:
>>
>>> Hi
>>>
>>> Stuck with the simple program regarding the checkpointing Flink version
>>> I am using 1.10.0
>>>
>>> *Here I have created DummySource for testing*
>>>
>>> *DummySource*
>>> package com.nudge.stateful;
>>>
>>> import org.apache.flink.api.java.tuple.Tuple2;
>>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>>>
>>> public class BeaconSource implements SourceFunction<Tuple2<Long,String>>{
>>>
>>> /**
>>> *
>>> */
>>> private static final long serialVersionUID = 1L;
>>> private Boolean isRunning=true;
>>>
>>>
>>> public BeaconSource() {
>>> super();
>>> // TODO Auto-generated constructor stub
>>> }
>>>
>>>
>>>
>>> public void cancel() {
>>> // TODO Auto-generated method stub
>>>
>>> this.isRunning=false;
>>>
>>> }
>>>
>>> public void run(SourceContext<Tuple2<Long,String>> arg0) throws
>>> Exception {
>>> // TODO Auto-generated method stub
>>> while(isRunning) {
>>> Thread.sleep(30000L);
>>> arg0.collect(new Tuple2<Long,String>(100000L,"AMQSource"));
>>> }
>>> }
>>>
>>> }
>>>
>>>
>>>
>>> ---------------------------------------------------------------------------------------
>>> *KeyedProcessFunction (to register the timer and update the status to
>>> true so that only one-time trigger should)*
>>>
>>>
>>> package com.nudge.stateful;
>>>
>>> import org.apache.flink.api.common.functions.IterationRuntimeContext;
>>> import org.apache.flink.api.common.functions.RuntimeContext;
>>> import org.apache.flink.api.common.state.ListState;
>>> import org.apache.flink.api.common.state.ListStateDescriptor;
>>> import org.apache.flink.api.common.state.ValueState;
>>> import org.apache.flink.api.common.state.ValueStateDescriptor;
>>> import org.apache.flink.api.common.typeinfo.TypeHint;
>>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>>> import org.apache.flink.api.java.tuple.Tuple;
>>> import org.apache.flink.api.java.tuple.Tuple2;
>>> import org.apache.flink.api.java.tuple.Tuple3;
>>> import org.apache.flink.configuration.Configuration;
>>> import org.apache.flink.runtime.state.FunctionInitializationContext;
>>> import org.apache.flink.runtime.state.FunctionSnapshotContext;
>>> import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
>>> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
>>> import org.apache.flink.streaming.api.functions.ProcessFunction;
>>> import org.apache.flink.util.Collector;
>>>
>>> import com.google.gson.JsonObject;
>>> import com.google.gson.JsonParser;
>>>
>>> import scala.collection.mutable.LinkedHashMap;
>>>
>>>
>>>
>>> import java.util.HashMap;
>>> import java.util.Map;
>>> import java.util.Map.Entry;
>>> import java.util.Set;
>>>
>>> public class TimeProcessTrigger extends
>>> KeyedProcessFunction<Tuple,Tuple2<Long,String>,String>{
>>>
>>> /**
>>> *
>>> */
>>> private static final long serialVersionUID = 1L;
>>> /**
>>> *
>>> */
>>>
>>> private transient ValueState<Boolean> contacthistory;
>>> private static final  Long  ONE_MINUTE=60000L;
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> @Override
>>> public void onTimer(long timestamp, KeyedProcessFunction<Tuple,
>>> Tuple2<Long, String>, String>.OnTimerContext ctx,
>>> Collector<String> out) throws Exception {
>>> // TODO Auto-generated method stub
>>> super.onTimer(timestamp, ctx, out);
>>> System.out.println("Timer has fired for the key"+ctx.getCurrentKey());
>>> }
>>>
>>>
>>>
>>>
>>>
>>>
>>> @Override
>>> public void open(Configuration parameters) throws Exception {
>>> // TODO Auto-generated method stub
>>> super.open(parameters);
>>>
>>>
>>> ValueStateDescriptor<Boolean> descriptor = new
>>> ValueStateDescriptor<Boolean>(
>>> "contact-history", // the state name
>>> Boolean.class); // type information
>>>
>>> this.contacthistory=getRuntimeContext().getState(descriptor);
>>> }
>>>
>>>
>>>
>>>
>>>
>>>
>>> @Override
>>> public void processElement(Tuple2<Long, String> input,
>>> KeyedProcessFunction<Tuple, Tuple2<Long, String>, String>.Context ctx,
>>> Collector<String> collect)
>>> throws Exception {
>>> // TODO Auto-generated method stub
>>>
>>>
>>> System.out.println(this.contacthistory.value());
>>> Boolean value = this.contacthistory.value();
>>> if(value==null) {
>>> Long currentTime = ctx.timerService().currentProcessingTime();
>>> Long regTimer=currentTime+ONE_MINUTE;
>>> System.out.println("Updating the flag and registering the timer
>>> @:"+regTimer);
>>> this.contacthistory.update(true);
>>> ctx.timerService().registerProcessingTimeTimer(regTimer);
>>>
>>> }else {
>>> System.out.println("Timer has already register for this key");
>>> }
>>> }
>>>
>>> }
>>>
>>>
>>> -------------------------------------------------
>>> *Main App*
>>>
>>> package com.nudge.stateful;
>>>
>>> import org.apache.flink.api.java.functions.KeySelector;
>>> import org.apache.flink.api.java.tuple.Tuple;
>>> import org.apache.flink.api.java.tuple.Tuple2;
>>> import org.apache.flink.api.java.utils.ParameterTool;
>>> import org.apache.flink.runtime.state.filesystem.FsStateBackend;
>>> import org.apache.flink.streaming.api.CheckpointingMode;
>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>> import org.apache.flink.streaming.api.datastream.DataStreamSource;
>>> import org.apache.flink.streaming.api.datastream.KeyedStream;
>>> import
>>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>>> import
>>> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
>>> import
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>
>>> import com.google.gson.JsonObject;
>>> import com.google.gson.JsonParser;
>>> import com.indiabulls.nudge.stateful.*;
>>>
>>> public class App
>>> {
>>> public static void main( String[] args ) throws Exception
>>> {
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.enableCheckpointing(30000);
>>> env.setParallelism(1);
>>> // // advanced options:
>>> // // set mode to exactly-once (this is the default)
>>>
>>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>> // // make sure 500 ms of progress happen between checkpoints
>>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
>>> // // checkpoints have to complete within one minute, or are discarded
>>> env.getCheckpointConfig().setCheckpointTimeout(60000);
>>> // // allow only one checkpoint to be in progress at the same time
>>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>>> // // enable externalized checkpoints which are retained after job
>>> cancellation
>>>
>>> env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>> // // allow job recovery fallback to checkpoint when there is a more
>>> recent savepoint
>>> env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
>>>
>>> SingleOutputStreamOperator<Tuple2<Long, String>> AMQSource =
>>> env.addSource(new BeaconSource())
>>> .name("AMQSource");
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
>>> env.setParallelism(1);
>>> KeyedStream<Tuple2<Long, String>, Tuple> keyedValues =
>>> AMQSource.keyBy(0);
>>> SingleOutputStreamOperator<String> processedStream =
>>> keyedValues.process(new TimeProcessTrigger()).setParallelism(1);
>>> processedStream.print();
>>> env.execute();
>>> }
>>> }
>>> --
>>> *Cheers *
>>>
>>> *Puneet Kinra*
>>>
>>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
>>> <puneet.ki...@customercentria.com>*
>>>
>>> *e-mail :puneet.ki...@customercentria.com
>>> <puneet.ki...@customercentria.com>*
>>>
>>>
>>>
>>
>> --
>> *Cheers *
>>
>> *Puneet Kinra*
>>
>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
>> <puneet.ki...@customercentria.com>*
>>
>> *e-mail :puneet.ki...@customercentria.com
>> <puneet.ki...@customercentria.com>*
>>
>>
>>

-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
<puneet.ki...@customercentria.com>*

*e-mail :puneet.ki...@customercentria.com
<puneet.ki...@customercentria.com>*

Reply via email to