Hi,

I've been looking into how to initialise large state and especially checked
this presentation by Lyft referenced in this group as well:
https://www.youtube.com/watch?v=WdMcyN5QZZQ

In our use case we would like to load roughly 4 billion entries into this
state and I believe loading this data from s3, creating a savepoint and
then restarting in streaming mode from a savepoint would work very well. In
the presentation I get an impression that I could read from s3 and when all
done (without any custom termination detector etc) I could just make a
savepoint by calling the rest api from the app. However, I've noticed that
if I read data from files the job will auto-terminate when all data is read
and job appears not to be running even if I add the sleep in the main
program (very simple app attached below).

I could use FileProcessingMode.PROCESS_CONTINUOUSLY to prevent the job from
terminating and create the savepoint from outside the app, but that would
require termination detection etc and would make the solution less clean.

Has anyone more details how I could accomplish this?

Br,
Henkka

public class StreamingJob {

   public static void main(String[] args) throws Exception {
      if (args.length == 0) {
         args = "--initFile init.csv".split(" ");
      }

      // set up the streaming execution environment
      final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

      ParameterTool params = ParameterTool.fromArgs(args);

      String initFile = params.get("initFile");
      if (initFile != null) {
         env.readTextFile(initFile).map(new MapFunction<String,
Tuple4<String, String, String, String>>() {
            @Override
            public Tuple4<String, String, String, String> map(String
s) throws Exception {
               String[] data = s.split(",");
               return new Tuple4<String, String, String,
String>(data[0], data[1], data[2], data[3]);
            }
         }).keyBy(0, 1).map(new ProfileInitMapper());
      }

      // execute program
      env.execute("Flink Streaming Java API Skeleton");

      // when all data read, save the state
      Thread.sleep(10000);
   }
}

Reply via email to