My sourcefunction is intrinsically single-thread. Is there a way to force
this aspect?
I can't find a real difference between a RichParallelSourceFunction and
a RichSourceFunction.
Is this last (RichSourceFunction) implicitly using parallelism = 1?

On Wed, Jun 19, 2019 at 2:25 PM Chesnay Schepler <ches...@apache.org> wrote:

> It returns a list of states so that state can be re-distributed if the
> parallelism changes.
>
> If you hard-code the interface to return a single value then you're
> implicitly locking the parallelism.
> When you reduce the parallelism you'd no longer be able to restore all
> state, since you have less instances than stored state.
>
> On 19/06/2019 14:19, Flavio Pompermaier wrote:
>
> It's not clear to me why the source checkpoint returns a list of
> object...when it could be useful to use a list instead of a single value?
> The documentation says The returned list should contain one entry for
> redistributable unit of state" but this is not very clear to me..
>
> Best,
> Flavio
>
> On Wed, Jun 19, 2019 at 12:40 PM Chesnay Schepler <ches...@apache.org>
> wrote:
>
>> This looks fine to me.
>>
>> What exactly were you worried about?
>>
>> On 19/06/2019 12:33, Flavio Pompermaier wrote:
>> > Hi to all,
>> > in my use case I have to ingest data from a rest service, where I
>> > periodically poll the data (of course a queue would be a better choice
>> > but this doesn't depend on me).
>> >
>> > So I wrote a RichSourceFunction that starts a thread that poll for new
>> > data.
>> > However, I'd like to restart from the last "from" value (in the case
>> > the job is stopped).
>> >
>> > My initial thought was to write somewhere the last used date and, on
>> > job restart, read that date (from a file for example). However, Flink
>> > stateful source should be a better choice here...am I wrong? So I
>> > made  my source function implementing ListCheckpointed<String>:
>> >
>> > @Override
>> > public List<String> snapshotState(long checkpointId, long timestamp)
>> > throws Exception {
>> >    return
>> Collections.singletonList(pollingThread.getDateFromAsString());
>> > }
>> > @Override
>> > public void restoreState(List<String> state) throws Exception {
>> >     for (String dateFrom : state) {
>> >          startDateStr = dateFrom;
>> >      }
>> > }
>> >
>> > @Override
>> > public void run(SourceContext<MyEvent> ctx) throws Exception {
>> >        final Object lock = ctx.getCheckpointLock();
>> >        Client httpClient = getHttpClient();
>> >        try {
>> >               pollingThread = new MyPollingThread.Builder(baseUrl,
>> > httpClient)//
>> >               .setStartDate(startDateStr, datePatternStr)//
>> >               .build();
>> >               // start the polling thread
>> >               new Thread(pr).start();
>> >         .... (etc)
>> > }
>> >
>> > Is this the correct approach or did I misunderstood how stateful
>> > source functions work?
>> >
>> > Best,
>> > Flavio
>>
>>
>>
>
>

Reply via email to