The picture in first e-mail shows that job was completed in 93ms ________________________________ From: Abdullah bin Omar <abdullahbinoma...@gmail.com> Sent: Monday, March 8, 2021 3:53 PM To: user@flink.apache.org <user@flink.apache.org> Subject: Re: Trigger and completed Checkpointing do not appeared
Hi, Please read the previous email (and also this email) to answer me. Here in the attached pic, the interval showed 1s. and the job is finished 1939ms According to the code in the previous email, at least there should be some checkpoint triggered and completed. However, in the apache flink UI, it showed no trigger and completed checkpoint (according to the attached pic in the first email) What is the problem? Why does the completed checkpointing not work in here? Thank you On Mon, Mar 8, 2021 at 3:07 PM Abdullah bin Omar <abdullahbinoma...@gmail.com<mailto:abdullahbinoma...@gmail.com>> wrote: Hi, I run a sample code for word count. The input is just some text, and it contains output. In the output, it counts the words. Then in the code, I put all necessary lines to enable the checkpoint. However, I did not see any triggered or completed checkpoints (in the attached pic). But the word count is still working. The code is below: package org.apache.flink.flink_quickstart_java; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.MultipleParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; //import org.apache.flink.streaming.examples.wordcount.util.WordCountData; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; import org.apache.flink.api.common.restartstrategy.RestartStrategies; public class StreamingJob { // ************************************************************************* // PROGRAM // ************************************************************************* public static void main(String[] args) throws Exception { // Checking input parameters final MultipleParameterTool params = MultipleParameterTool.fromArgs(args); // set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // start a checkpoint every 1000 ms env.enableCheckpointing(1000); // to set minimum progress time to happen between checkpoints env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // checkpoints have to complete within 10000 ms, or are discarded env.getCheckpointConfig().setCheckpointTimeout(10000); // set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // AT_LEAST_ONCE // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // enable externalized checkpoints which are retained after job cancellation env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // DELETE_ON_CANCELLATION //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, 100 )); // number of restart attempts , delay in each restart // make parameters available in the web interface env.getConfig().setGlobalJobParameters(params); // get input data DataStream<String> text = null; if (params.has("input")) { // union all the inputs from text files for (String input : params.getMultiParameterRequired("input")) { if (text == null) { text = env.readTextFile(input); } else { text = text.union(env.readTextFile(input)); } } Preconditions.checkNotNull(text, "Input DataStream should not be null."); } else { System.out.println("Executing WordCount example with default input data set."); System.out.println("Use --input to specify file input."); // get default test text data //text = env.fromElements(WordCountData.WORDS); } DataStream<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .keyBy(value -> value.f0) .sum(1); // emit result if (params.has("output")) { counts.writeAsText(params.get("output")); } else { System.out.println("Printing result to stdout. Use --output to specify output path."); counts.print(); } // execute program env.execute("Checkpointing"); } // ************************************************************************* // USER FUNCTIONS // ************************************************************************* /** * Implements the string tokenizer that splits sentences into words as a user-defined * FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the * form of "(word,1)" ({@code Tuple2<String, Integer>}). */ public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<>(token, 1)); } } } } }