Hi, Thanks. Just to clarify, where would you then invoke the savepoint creation? I basically need to know when all data is read, create a savepoint and then exit. I think I could just as well use the PROCESS_CONTINUOSLY, monitor the stream outside and then use rest api to cancel with savepoint.
Any plans to have feature where I could choose Flink to make a savepoint on job exists? I am also keen on hearing other ideas how to bootstrap a state. I was initially thinking of just reading data from Cassandra if no state available. Br, Henkka On Thu, Jul 19, 2018 at 3:15 PM vino yang <yanghua1...@gmail.com> wrote: > Hi Henkka, > > The behavior of the text file source meets expectation. Flink will not > keep your source task thread when it exit from it's invoke method. That > means you should keep your source task alive. So to implement this, you > should customize a text file source (implement SourceFunction interface). > > For your requirement, you can check a no more data idle time, if expire, > then exit, finally the job will stop. > > You can also refer the implementation of other source connectors. > > Thanks, vino. > > 2018-07-19 19:52 GMT+08:00 Henri Heiskanen <henri.heiska...@gmail.com>: > >> Hi, >> >> I've been looking into how to initialise large state and especially >> checked this presentation by Lyft referenced in this group as well: >> https://www.youtube.com/watch?v=WdMcyN5QZZQ >> >> In our use case we would like to load roughly 4 billion entries into this >> state and I believe loading this data from s3, creating a savepoint and >> then restarting in streaming mode from a savepoint would work very well. In >> the presentation I get an impression that I could read from s3 and when all >> done (without any custom termination detector etc) I could just make a >> savepoint by calling the rest api from the app. However, I've noticed that >> if I read data from files the job will auto-terminate when all data is read >> and job appears not to be running even if I add the sleep in the main >> program (very simple app attached below). >> >> I could use FileProcessingMode.PROCESS_CONTINUOUSLY to prevent the job >> from terminating and create the savepoint from outside the app, but that >> would require termination detection etc and would make the solution less >> clean. >> >> Has anyone more details how I could accomplish this? >> >> Br, >> Henkka >> >> public class StreamingJob { >> >> public static void main(String[] args) throws Exception { >> if (args.length == 0) { >> args = "--initFile init.csv".split(" "); >> } >> >> // set up the streaming execution environment >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> ParameterTool params = ParameterTool.fromArgs(args); >> >> String initFile = params.get("initFile"); >> if (initFile != null) { >> env.readTextFile(initFile).map(new MapFunction<String, >> Tuple4<String, String, String, String>>() { >> @Override >> public Tuple4<String, String, String, String> map(String s) >> throws Exception { >> String[] data = s.split(","); >> return new Tuple4<String, String, String, String>(data[0], >> data[1], data[2], data[3]); >> } >> }).keyBy(0, 1).map(new ProfileInitMapper()); >> } >> >> // execute program >> env.execute("Flink Streaming Java API Skeleton"); >> >> // when all data read, save the state >> Thread.sleep(10000); >> } >> } >> >> >> >> >