Hi Henkka,

You might want to consider implementing a dedicated job for state
bootstrapping that uses the same operator UUID and state names. That might
be easier than integrating the logic into your regular job.

I think you have to use the monitoring file source because AFAIK it won't
be possible to start a savepoint once a task is finished, because Flink is
not able to inject a checkpoint / savepoint barrier into finished tasks.
Detecting that all data was read is of course tricky, but you could monitor
the processed records count metrics and take a save point once they don't
change anymore.

Best, Fabian

2018-07-23 8:24 GMT+02:00 Henri Heiskanen <henri.heiska...@gmail.com>:

> Hi,
>
> With state bootstrapping I mean loading the state with initial data before
> starting the actual job. For example, in our case I would like to load
> information like registration date of our users (>5 years of data) so that
> I can enrich our event data in streaming (5 days retention).
>
> Before watching the presentation by Lyft, I was loading this data per key
> from Cassandra DB in the mapper if the state was not found.
>
> Br,
> Henkka
>
> Br,
> Henkka
>
> On Fri, Jul 20, 2018 at 7:03 PM Vino yang <yanghua1...@gmail.com> wrote:
>
>> Hi Henkka,
>>
>> If you want to customize the datastream text source for your purpose. You
>> can use a read counter, if the value of counter would not change in a
>> interval you can guess all the data has been read. Just a idea, you can
>> choose other solution.
>>
>> About creating a savepoint automatically on job exists, it sounds a good
>> idea. I did not know any plan about this, I would try to submit this idea
>> to the community.
>>
>> And about "how to bootstrap a state", what does that mean? can you
>> explain this?
>>
>> Thank, vino
>>
>>
>> On 2018-07-20 20:00 , Henri Heiskanen <henri.heiska...@gmail.com> Wrote:
>>
>> Hi,
>>
>> Thanks. Just to clarify, where would you then invoke the savepoint
>> creation? I basically need to know when all data is read, create a
>> savepoint and then exit. I think I could just as well use the
>> PROCESS_CONTINUOSLY, monitor the stream outside and then use rest api to
>> cancel with savepoint.
>>
>> Any plans to have feature where I could choose Flink to make a savepoint
>> on job exists? I am also keen on hearing other ideas how to bootstrap a
>> state. I was initially thinking of just reading data from Cassandra if no
>> state available.
>>
>> Br,
>> Henkka
>>
>> On Thu, Jul 19, 2018 at 3:15 PM vino yang <yanghua1...@gmail.com> wrote:
>>
>>> Hi Henkka,
>>>
>>> The behavior of the text file source meets expectation. Flink will not
>>> keep your source task thread when it exit from it's invoke method. That
>>> means you should keep your source task alive. So to implement this, you
>>> should customize a text file source (implement SourceFunction interface).
>>>
>>>  For your requirement, you can check a no more data idle time, if
>>> expire, then exit, finally the job will stop.
>>>
>>> You can also refer the implementation of other source connectors.
>>>
>>> Thanks, vino.
>>>
>>> 2018-07-19 19:52 GMT+08:00 Henri Heiskanen <henri.heiska...@gmail.com>:
>>>
>>>> Hi,
>>>>
>>>> I've been looking into how to initialise large state and especially
>>>> checked this presentation by Lyft referenced in this group as well:
>>>> https://www.youtube.com/watch?v=WdMcyN5QZZQ
>>>>
>>>> In our use case we would like to load roughly 4 billion entries into
>>>> this state and I believe loading this data from s3, creating a savepoint
>>>> and then restarting in streaming mode from a savepoint would work very
>>>> well. In the presentation I get an impression that I could read from s3 and
>>>> when all done (without any custom termination detector etc) I could just
>>>> make a savepoint by calling the rest api from the app. However, I've
>>>> noticed that if I read data from files the job will auto-terminate when all
>>>> data is read and job appears not to be running even if I add the sleep in
>>>> the main program (very simple app attached below).
>>>>
>>>> I could use FileProcessingMode.PROCESS_CONTINUOUSLY to prevent the job
>>>> from terminating and create the savepoint from outside the app, but that
>>>> would require termination detection etc and would make the solution less
>>>> clean.
>>>>
>>>> Has anyone more details how I could accomplish this?
>>>>
>>>> Br,
>>>> Henkka
>>>>
>>>> public class StreamingJob {
>>>>
>>>>    public static void main(String[] args) throws Exception {
>>>>       if (args.length == 0) {
>>>>          args = "--initFile init.csv".split(" ");
>>>>       }
>>>>
>>>>       // set up the streaming execution environment
>>>>       final StreamExecutionEnvironment env = 
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>
>>>>       ParameterTool params = ParameterTool.fromArgs(args);
>>>>
>>>>       String initFile = params.get("initFile");
>>>>       if (initFile != null) {
>>>>          env.readTextFile(initFile).map(new MapFunction<String, 
>>>> Tuple4<String, String, String, String>>() {
>>>>             @Override
>>>>             public Tuple4<String, String, String, String> map(String s) 
>>>> throws Exception {
>>>>                String[] data = s.split(",");
>>>>                return new Tuple4<String, String, String, String>(data[0], 
>>>> data[1], data[2], data[3]);
>>>>             }
>>>>          }).keyBy(0, 1).map(new ProfileInitMapper());
>>>>       }
>>>>
>>>>       // execute program
>>>>       env.execute("Flink Streaming Java API Skeleton");
>>>>
>>>>       // when all data read, save the state
>>>>       Thread.sleep(10000);
>>>>    }
>>>> }
>>>>
>>>>
>>>>
>>>>
>>>

Reply via email to