Hi, I've been looking into how to initialise large state and especially checked this presentation by Lyft referenced in this group as well: https://www.youtube.com/watch?v=WdMcyN5QZZQ
In our use case we would like to load roughly 4 billion entries into this state and I believe loading this data from s3, creating a savepoint and then restarting in streaming mode from a savepoint would work very well. In the presentation I get an impression that I could read from s3 and when all done (without any custom termination detector etc) I could just make a savepoint by calling the rest api from the app. However, I've noticed that if I read data from files the job will auto-terminate when all data is read and job appears not to be running even if I add the sleep in the main program (very simple app attached below). I could use FileProcessingMode.PROCESS_CONTINUOUSLY to prevent the job from terminating and create the savepoint from outside the app, but that would require termination detection etc and would make the solution less clean. Has anyone more details how I could accomplish this? Br, Henkka public class StreamingJob { public static void main(String[] args) throws Exception { if (args.length == 0) { args = "--initFile init.csv".split(" "); } // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ParameterTool params = ParameterTool.fromArgs(args); String initFile = params.get("initFile"); if (initFile != null) { env.readTextFile(initFile).map(new MapFunction<String, Tuple4<String, String, String, String>>() { @Override public Tuple4<String, String, String, String> map(String s) throws Exception { String[] data = s.split(","); return new Tuple4<String, String, String, String>(data[0], data[1], data[2], data[3]); } }).keyBy(0, 1).map(new ProfileInitMapper()); } // execute program env.execute("Flink Streaming Java API Skeleton"); // when all data read, save the state Thread.sleep(10000); } }