Hi Henkka, You might want to consider implementing a dedicated job for state bootstrapping that uses the same operator UUID and state names. That might be easier than integrating the logic into your regular job.
I think you have to use the monitoring file source because AFAIK it won't be possible to start a savepoint once a task is finished, because Flink is not able to inject a checkpoint / savepoint barrier into finished tasks. Detecting that all data was read is of course tricky, but you could monitor the processed records count metrics and take a save point once they don't change anymore. Best, Fabian 2018-07-23 8:24 GMT+02:00 Henri Heiskanen <henri.heiska...@gmail.com>: > Hi, > > With state bootstrapping I mean loading the state with initial data before > starting the actual job. For example, in our case I would like to load > information like registration date of our users (>5 years of data) so that > I can enrich our event data in streaming (5 days retention). > > Before watching the presentation by Lyft, I was loading this data per key > from Cassandra DB in the mapper if the state was not found. > > Br, > Henkka > > Br, > Henkka > > On Fri, Jul 20, 2018 at 7:03 PM Vino yang <yanghua1...@gmail.com> wrote: > >> Hi Henkka, >> >> If you want to customize the datastream text source for your purpose. You >> can use a read counter, if the value of counter would not change in a >> interval you can guess all the data has been read. Just a idea, you can >> choose other solution. >> >> About creating a savepoint automatically on job exists, it sounds a good >> idea. I did not know any plan about this, I would try to submit this idea >> to the community. >> >> And about "how to bootstrap a state", what does that mean? can you >> explain this? >> >> Thank, vino >> >> >> On 2018-07-20 20:00 , Henri Heiskanen <henri.heiska...@gmail.com> Wrote: >> >> Hi, >> >> Thanks. Just to clarify, where would you then invoke the savepoint >> creation? I basically need to know when all data is read, create a >> savepoint and then exit. I think I could just as well use the >> PROCESS_CONTINUOSLY, monitor the stream outside and then use rest api to >> cancel with savepoint. >> >> Any plans to have feature where I could choose Flink to make a savepoint >> on job exists? I am also keen on hearing other ideas how to bootstrap a >> state. I was initially thinking of just reading data from Cassandra if no >> state available. >> >> Br, >> Henkka >> >> On Thu, Jul 19, 2018 at 3:15 PM vino yang <yanghua1...@gmail.com> wrote: >> >>> Hi Henkka, >>> >>> The behavior of the text file source meets expectation. Flink will not >>> keep your source task thread when it exit from it's invoke method. That >>> means you should keep your source task alive. So to implement this, you >>> should customize a text file source (implement SourceFunction interface). >>> >>> For your requirement, you can check a no more data idle time, if >>> expire, then exit, finally the job will stop. >>> >>> You can also refer the implementation of other source connectors. >>> >>> Thanks, vino. >>> >>> 2018-07-19 19:52 GMT+08:00 Henri Heiskanen <henri.heiska...@gmail.com>: >>> >>>> Hi, >>>> >>>> I've been looking into how to initialise large state and especially >>>> checked this presentation by Lyft referenced in this group as well: >>>> https://www.youtube.com/watch?v=WdMcyN5QZZQ >>>> >>>> In our use case we would like to load roughly 4 billion entries into >>>> this state and I believe loading this data from s3, creating a savepoint >>>> and then restarting in streaming mode from a savepoint would work very >>>> well. In the presentation I get an impression that I could read from s3 and >>>> when all done (without any custom termination detector etc) I could just >>>> make a savepoint by calling the rest api from the app. However, I've >>>> noticed that if I read data from files the job will auto-terminate when all >>>> data is read and job appears not to be running even if I add the sleep in >>>> the main program (very simple app attached below). >>>> >>>> I could use FileProcessingMode.PROCESS_CONTINUOUSLY to prevent the job >>>> from terminating and create the savepoint from outside the app, but that >>>> would require termination detection etc and would make the solution less >>>> clean. >>>> >>>> Has anyone more details how I could accomplish this? >>>> >>>> Br, >>>> Henkka >>>> >>>> public class StreamingJob { >>>> >>>> public static void main(String[] args) throws Exception { >>>> if (args.length == 0) { >>>> args = "--initFile init.csv".split(" "); >>>> } >>>> >>>> // set up the streaming execution environment >>>> final StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> >>>> ParameterTool params = ParameterTool.fromArgs(args); >>>> >>>> String initFile = params.get("initFile"); >>>> if (initFile != null) { >>>> env.readTextFile(initFile).map(new MapFunction<String, >>>> Tuple4<String, String, String, String>>() { >>>> @Override >>>> public Tuple4<String, String, String, String> map(String s) >>>> throws Exception { >>>> String[] data = s.split(","); >>>> return new Tuple4<String, String, String, String>(data[0], >>>> data[1], data[2], data[3]); >>>> } >>>> }).keyBy(0, 1).map(new ProfileInitMapper()); >>>> } >>>> >>>> // execute program >>>> env.execute("Flink Streaming Java API Skeleton"); >>>> >>>> // when all data read, save the state >>>> Thread.sleep(10000); >>>> } >>>> } >>>> >>>> >>>> >>>> >>>