The picture in first e-mail shows that job was completed in 93ms

________________________________
From: Abdullah bin Omar <abdullahbinoma...@gmail.com>
Sent: Monday, March 8, 2021 3:53 PM
To: user@flink.apache.org <user@flink.apache.org>
Subject: Re: Trigger and completed Checkpointing do not appeared

Hi,

Please read the previous email (and also this email) to answer me.

Here in the attached pic, the interval showed 1s. and the job is finished 1939ms

According to the code in the previous email, at least there should be some 
checkpoint triggered and completed.

However, in the apache flink UI, it showed no trigger and completed checkpoint 
(according to the attached pic in the first email)

What is the problem? Why does the completed checkpointing not work in here?

Thank you




On Mon, Mar 8, 2021 at 3:07 PM Abdullah bin Omar 
<abdullahbinoma...@gmail.com<mailto:abdullahbinoma...@gmail.com>> wrote:
Hi,

I run a sample code for word count. The input is just some text, and it 
contains output. In the output, it counts the words. Then in the code, I put 
all necessary lines to enable the checkpoint. However, I did not see any 
triggered or completed checkpoints (in the attached pic). But the word count is 
still working.

The code is below:


package org.apache.flink.flink_quickstart_java;



import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.utils.MultipleParameterTool;

import org.apache.flink.streaming.api.datastream.DataStream;

//import org.apache.flink.streaming.examples.wordcount.util.WordCountData;

import org.apache.flink.util.Collector;

import org.apache.flink.util.Preconditions;


import org.apache.flink.streaming.api.CheckpointingMode;

import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;



public class StreamingJob {




    // *************************************************************************

    // PROGRAM

    // *************************************************************************


    public static void main(String[] args) throws Exception {


        // Checking input parameters

        final MultipleParameterTool params = 
MultipleParameterTool.fromArgs(args);


        // set up the execution environment

        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();



     // start a checkpoint every 1000 ms

     env.enableCheckpointing(1000);



     // to set minimum progress time to happen between checkpoints

     env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);



     // checkpoints have to complete within 10000 ms, or are discarded

     env.getCheckpointConfig().setCheckpointTimeout(10000);



     // set mode to exactly-once (this is the default)

     
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); 
 // AT_LEAST_ONCE



     // allow only one checkpoint to be in progress at the same time

     env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);


     // enable externalized checkpoints which are retained after job 
cancellation

     
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  // DELETE_ON_CANCELLATION



     //StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

     env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, 100 ));

                                                    // number of restart 
attempts , delay in each restart



        // make parameters available in the web interface

        env.getConfig().setGlobalJobParameters(params);


        // get input data

        DataStream<String> text = null;

        if (params.has("input")) {

            // union all the inputs from text files

            for (String input : params.getMultiParameterRequired("input")) {

                if (text == null) {

                    text = env.readTextFile(input);

                } else {

                    text = text.union(env.readTextFile(input));

                }

            }

            Preconditions.checkNotNull(text, "Input DataStream should not be 
null.");

        } else {

            System.out.println("Executing WordCount example with default input 
data set.");

            System.out.println("Use --input to specify file input.");

            // get default test text data

            //text = env.fromElements(WordCountData.WORDS);

        }


        DataStream<Tuple2<String, Integer>> counts =

                // split up the lines in pairs (2-tuples) containing: (word,1)

                text.flatMap(new Tokenizer())

                        // group by the tuple field "0" and sum up tuple field 
"1"

                        .keyBy(value -> value.f0)

                        .sum(1);


        // emit result

        if (params.has("output")) {

            counts.writeAsText(params.get("output"));

        } else {

            System.out.println("Printing result to stdout. Use --output to 
specify output path.");

            counts.print();

        }

        // execute program

        env.execute("Checkpointing");

    }


    // *************************************************************************

    // USER FUNCTIONS

    // *************************************************************************


    /**

     * Implements the string tokenizer that splits sentences into words as a 
user-defined

     * FlatMapFunction. The function takes a line (String) and splits it into 
multiple pairs in the

     * form of "(word,1)" ({@code Tuple2<String, Integer>}).

     */

    public static final class Tokenizer

            implements FlatMapFunction<String, Tuple2<String, Integer>> {


        @Override

        public void flatMap(String value, Collector<Tuple2<String, Integer>> 
out) {

            // normalize and split the line

            String[] tokens = value.toLowerCase().split("\\W+");


            // emit the pairs

            for (String token : tokens) {

                if (token.length() > 0) {

                    out.collect(new Tuple2<>(token, 1));

                }

            }

        }

    }

}

Reply via email to