Ok great! Thanks everybody for the support

On Wed, Jun 19, 2019 at 3:05 PM Chesnay Schepler <ches...@apache.org> wrote:

> A (Rich)SourceFunction that does not implement RichParallelSourceFunction
> is always run with a parallelism of 1.
>
> On 19/06/2019 14:36, Flavio Pompermaier wrote:
>
> My sourcefunction is intrinsically single-thread. Is there a way to force
> this aspect?
> I can't find a real difference between a RichParallelSourceFunction and
> a RichSourceFunction.
> Is this last (RichSourceFunction) implicitly using parallelism = 1?
>
> On Wed, Jun 19, 2019 at 2:25 PM Chesnay Schepler <ches...@apache.org>
> wrote:
>
>> It returns a list of states so that state can be re-distributed if the
>> parallelism changes.
>>
>> If you hard-code the interface to return a single value then you're
>> implicitly locking the parallelism.
>> When you reduce the parallelism you'd no longer be able to restore all
>> state, since you have less instances than stored state.
>>
>> On 19/06/2019 14:19, Flavio Pompermaier wrote:
>>
>> It's not clear to me why the source checkpoint returns a list of
>> object...when it could be useful to use a list instead of a single value?
>> The documentation says The returned list should contain one entry for
>> redistributable unit of state" but this is not very clear to me..
>>
>> Best,
>> Flavio
>>
>> On Wed, Jun 19, 2019 at 12:40 PM Chesnay Schepler <ches...@apache.org>
>> wrote:
>>
>>> This looks fine to me.
>>>
>>> What exactly were you worried about?
>>>
>>> On 19/06/2019 12:33, Flavio Pompermaier wrote:
>>> > Hi to all,
>>> > in my use case I have to ingest data from a rest service, where I
>>> > periodically poll the data (of course a queue would be a better choice
>>> > but this doesn't depend on me).
>>> >
>>> > So I wrote a RichSourceFunction that starts a thread that poll for new
>>> > data.
>>> > However, I'd like to restart from the last "from" value (in the case
>>> > the job is stopped).
>>> >
>>> > My initial thought was to write somewhere the last used date and, on
>>> > job restart, read that date (from a file for example). However, Flink
>>> > stateful source should be a better choice here...am I wrong? So I
>>> > made  my source function implementing ListCheckpointed<String>:
>>> >
>>> > @Override
>>> > public List<String> snapshotState(long checkpointId, long timestamp)
>>> > throws Exception {
>>> >    return
>>> Collections.singletonList(pollingThread.getDateFromAsString());
>>> > }
>>> > @Override
>>> > public void restoreState(List<String> state) throws Exception {
>>> >     for (String dateFrom : state) {
>>> >          startDateStr = dateFrom;
>>> >      }
>>> > }
>>> >
>>> > @Override
>>> > public void run(SourceContext<MyEvent> ctx) throws Exception {
>>> >        final Object lock = ctx.getCheckpointLock();
>>> >        Client httpClient = getHttpClient();
>>> >        try {
>>> >               pollingThread = new MyPollingThread.Builder(baseUrl,
>>> > httpClient)//
>>> >               .setStartDate(startDateStr, datePatternStr)//
>>> >               .build();
>>> >               // start the polling thread
>>> >               new Thread(pr).start();
>>> >         .... (etc)
>>> > }
>>> >
>>> > Is this the correct approach or did I misunderstood how stateful
>>> > source functions work?
>>> >
>>> > Best,
>>> > Flavio
>>>
>>>
>>>
>>
>>
>
>

Reply via email to