Sorry for the missed information

On recovery the value is coming as false instead of true, state.backend has
been configured in flink-conf.yaml  along the
the path for checkpointing and savepoint.

On Tue, Mar 3, 2020 at 3:34 PM Puneet Kinra <
puneet.ki...@customercentria.com> wrote:

> Hi
>
> Stuck with the simple program regarding the checkpointing Flink version I
> am using 1.10.0
>
> *Here I have created DummySource for testing*
>
> *DummySource*
> package com.nudge.stateful;
>
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>
> public class BeaconSource implements SourceFunction<Tuple2<Long,String>>{
>
> /**
> *
> */
> private static final long serialVersionUID = 1L;
> private Boolean isRunning=true;
>
>
> public BeaconSource() {
> super();
> // TODO Auto-generated constructor stub
> }
>
>
>
> public void cancel() {
> // TODO Auto-generated method stub
>
> this.isRunning=false;
>
> }
>
> public void run(SourceContext<Tuple2<Long,String>> arg0) throws Exception {
> // TODO Auto-generated method stub
> while(isRunning) {
> Thread.sleep(30000L);
> arg0.collect(new Tuple2<Long,String>(100000L,"AMQSource"));
> }
> }
>
> }
>
>
>
> ---------------------------------------------------------------------------------------
> *KeyedProcessFunction (to register the timer and update the status to true
> so that only one-time trigger should)*
>
>
> package com.nudge.stateful;
>
> import org.apache.flink.api.common.functions.IterationRuntimeContext;
> import org.apache.flink.api.common.functions.RuntimeContext;
> import org.apache.flink.api.common.state.ListState;
> import org.apache.flink.api.common.state.ListStateDescriptor;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.runtime.state.FunctionInitializationContext;
> import org.apache.flink.runtime.state.FunctionSnapshotContext;
> import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.streaming.api.functions.ProcessFunction;
> import org.apache.flink.util.Collector;
>
> import com.google.gson.JsonObject;
> import com.google.gson.JsonParser;
>
> import scala.collection.mutable.LinkedHashMap;
>
>
>
> import java.util.HashMap;
> import java.util.Map;
> import java.util.Map.Entry;
> import java.util.Set;
>
> public class TimeProcessTrigger extends
> KeyedProcessFunction<Tuple,Tuple2<Long,String>,String>{
>
> /**
> *
> */
> private static final long serialVersionUID = 1L;
> /**
> *
> */
>
> private transient ValueState<Boolean> contacthistory;
> private static final  Long  ONE_MINUTE=60000L;
>
>
>
>
>
>
>
>
>
>
> @Override
> public void onTimer(long timestamp, KeyedProcessFunction<Tuple,
> Tuple2<Long, String>, String>.OnTimerContext ctx,
> Collector<String> out) throws Exception {
> // TODO Auto-generated method stub
> super.onTimer(timestamp, ctx, out);
> System.out.println("Timer has fired for the key"+ctx.getCurrentKey());
> }
>
>
>
>
>
>
> @Override
> public void open(Configuration parameters) throws Exception {
> // TODO Auto-generated method stub
> super.open(parameters);
>
>
> ValueStateDescriptor<Boolean> descriptor = new
> ValueStateDescriptor<Boolean>(
> "contact-history", // the state name
> Boolean.class); // type information
>
> this.contacthistory=getRuntimeContext().getState(descriptor);
> }
>
>
>
>
>
>
> @Override
> public void processElement(Tuple2<Long, String> input,
> KeyedProcessFunction<Tuple, Tuple2<Long, String>, String>.Context ctx,
> Collector<String> collect)
> throws Exception {
> // TODO Auto-generated method stub
>
>
> System.out.println(this.contacthistory.value());
> Boolean value = this.contacthistory.value();
> if(value==null) {
> Long currentTime = ctx.timerService().currentProcessingTime();
> Long regTimer=currentTime+ONE_MINUTE;
> System.out.println("Updating the flag and registering the timer
> @:"+regTimer);
> this.contacthistory.update(true);
> ctx.timerService().registerProcessingTimeTimer(regTimer);
>
> }else {
> System.out.println("Timer has already register for this key");
> }
> }
>
> }
>
>
> -------------------------------------------------
> *Main App*
>
> package com.nudge.stateful;
>
> import org.apache.flink.api.java.functions.KeySelector;
> import org.apache.flink.api.java.tuple.Tuple;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.utils.ParameterTool;
> import org.apache.flink.runtime.state.filesystem.FsStateBackend;
> import org.apache.flink.streaming.api.CheckpointingMode;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.datastream.KeyedStream;
> import
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import
> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>
> import com.google.gson.JsonObject;
> import com.google.gson.JsonParser;
> import com.indiabulls.nudge.stateful.*;
>
> public class App
> {
> public static void main( String[] args ) throws Exception
> {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(30000);
> env.setParallelism(1);
> // // advanced options:
> // // set mode to exactly-once (this is the default)
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> // // make sure 500 ms of progress happen between checkpoints
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
> // // checkpoints have to complete within one minute, or are discarded
> env.getCheckpointConfig().setCheckpointTimeout(60000);
> // // allow only one checkpoint to be in progress at the same time
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> // // enable externalized checkpoints which are retained after job
> cancellation
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> // // allow job recovery fallback to checkpoint when there is a more
> recent savepoint
> env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
>
> SingleOutputStreamOperator<Tuple2<Long, String>> AMQSource =
> env.addSource(new BeaconSource())
> .name("AMQSource");
> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
> env.setParallelism(1);
> KeyedStream<Tuple2<Long, String>, Tuple> keyedValues = AMQSource.keyBy(0);
> SingleOutputStreamOperator<String> processedStream =
> keyedValues.process(new TimeProcessTrigger()).setParallelism(1);
> processedStream.print();
> env.execute();
> }
> }
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
> <puneet.ki...@customercentria.com>*
>
> *e-mail :puneet.ki...@customercentria.com
> <puneet.ki...@customercentria.com>*
>
>
>

-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
<puneet.ki...@customercentria.com>*

*e-mail :puneet.ki...@customercentria.com
<puneet.ki...@customercentria.com>*

Reply via email to