Hi,

Thanks. Just to clarify, where would you then invoke the savepoint
creation? I basically need to know when all data is read, create a
savepoint and then exit. I think I could just as well use the
PROCESS_CONTINUOSLY, monitor the stream outside and then use rest api to
cancel with savepoint.

Any plans to have feature where I could choose Flink to make a savepoint on
job exists? I am also keen on hearing other ideas how to bootstrap a state.
I was initially thinking of just reading data from Cassandra if no state
available.

Br,
Henkka

On Thu, Jul 19, 2018 at 3:15 PM vino yang <yanghua1...@gmail.com> wrote:

> Hi Henkka,
>
> The behavior of the text file source meets expectation. Flink will not
> keep your source task thread when it exit from it's invoke method. That
> means you should keep your source task alive. So to implement this, you
> should customize a text file source (implement SourceFunction interface).
>
>  For your requirement, you can check a no more data idle time, if expire,
> then exit, finally the job will stop.
>
> You can also refer the implementation of other source connectors.
>
> Thanks, vino.
>
> 2018-07-19 19:52 GMT+08:00 Henri Heiskanen <henri.heiska...@gmail.com>:
>
>> Hi,
>>
>> I've been looking into how to initialise large state and especially
>> checked this presentation by Lyft referenced in this group as well:
>> https://www.youtube.com/watch?v=WdMcyN5QZZQ
>>
>> In our use case we would like to load roughly 4 billion entries into this
>> state and I believe loading this data from s3, creating a savepoint and
>> then restarting in streaming mode from a savepoint would work very well. In
>> the presentation I get an impression that I could read from s3 and when all
>> done (without any custom termination detector etc) I could just make a
>> savepoint by calling the rest api from the app. However, I've noticed that
>> if I read data from files the job will auto-terminate when all data is read
>> and job appears not to be running even if I add the sleep in the main
>> program (very simple app attached below).
>>
>> I could use FileProcessingMode.PROCESS_CONTINUOUSLY to prevent the job
>> from terminating and create the savepoint from outside the app, but that
>> would require termination detection etc and would make the solution less
>> clean.
>>
>> Has anyone more details how I could accomplish this?
>>
>> Br,
>> Henkka
>>
>> public class StreamingJob {
>>
>>    public static void main(String[] args) throws Exception {
>>       if (args.length == 0) {
>>          args = "--initFile init.csv".split(" ");
>>       }
>>
>>       // set up the streaming execution environment
>>       final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>       ParameterTool params = ParameterTool.fromArgs(args);
>>
>>       String initFile = params.get("initFile");
>>       if (initFile != null) {
>>          env.readTextFile(initFile).map(new MapFunction<String, 
>> Tuple4<String, String, String, String>>() {
>>             @Override
>>             public Tuple4<String, String, String, String> map(String s) 
>> throws Exception {
>>                String[] data = s.split(",");
>>                return new Tuple4<String, String, String, String>(data[0], 
>> data[1], data[2], data[3]);
>>             }
>>          }).keyBy(0, 1).map(new ProfileInitMapper());
>>       }
>>
>>       // execute program
>>       env.execute("Flink Streaming Java API Skeleton");
>>
>>       // when all data read, save the state
>>       Thread.sleep(10000);
>>    }
>> }
>>
>>
>>
>>
>

Reply via email to