A (Rich)SourceFunction that does not implement RichParallelSourceFunction is always run with a parallelism of 1.

On 19/06/2019 14:36, Flavio Pompermaier wrote:
My sourcefunction is intrinsically single-thread. Is there a way to force this aspect? I can't find a real difference between a RichParallelSourceFunction and a RichSourceFunction.
Is this last (RichSourceFunction) implicitly using parallelism = 1?

On Wed, Jun 19, 2019 at 2:25 PM Chesnay Schepler <ches...@apache.org <mailto:ches...@apache.org>> wrote:

    It returns a list of states so that state can be re-distributed if
    the parallelism changes.

    If you hard-code the interface to return a single value then
    you're implicitly locking the parallelism.
    When you reduce the parallelism you'd no longer be able to restore
    all state, since you have less instances than stored state.

    On 19/06/2019 14:19, Flavio Pompermaier wrote:
    It's not clear to me why the source checkpoint returns a list of
    object...when it could be useful to use a list instead of a
    single value?
    The documentation says The returned list should contain one entry
    for redistributable unit of state" but this is not very clear to me..

    Best,
    Flavio

    On Wed, Jun 19, 2019 at 12:40 PM Chesnay Schepler
    <ches...@apache.org <mailto:ches...@apache.org>> wrote:

        This looks fine to me.

        What exactly were you worried about?

        On 19/06/2019 12:33, Flavio Pompermaier wrote:
        > Hi to all,
        > in my use case I have to ingest data from a rest service,
        where I
        > periodically poll the data (of course a queue would be a
        better choice
        > but this doesn't depend on me).
        >
        > So I wrote a RichSourceFunction that starts a thread that
        poll for new
        > data.
        > However, I'd like to restart from the last "from" value (in
        the case
        > the job is stopped).
        >
        > My initial thought was to write somewhere the last used
        date and, on
        > job restart, read that date (from a file for example).
        However, Flink
        > stateful source should be a better choice here...am I
        wrong? So I
        > made  my source function implementing ListCheckpointed<String>:
        >
        > @Override
        > public List<String> snapshotState(long checkpointId, long
        timestamp)
        > throws Exception {
        >    return
        Collections.singletonList(pollingThread.getDateFromAsString());
        > }
        > @Override
        > public void restoreState(List<String> state) throws Exception {
        >     for (String dateFrom : state) {
        >          startDateStr = dateFrom;
        >      }
        > }
        >
        > @Override
        > public void run(SourceContext<MyEvent> ctx) throws Exception {
        >        final Object lock = ctx.getCheckpointLock();
        >        Client httpClient = getHttpClient();
        >        try {
        >               pollingThread = new
        MyPollingThread.Builder(baseUrl,
        > httpClient)//
        >               .setStartDate(startDateStr, datePatternStr)//
        >               .build();
        >               // start the polling thread
        >               new Thread(pr).start();
        >         .... (etc)
        > }
        >
        > Is this the correct approach or did I misunderstood how
        stateful
        > source functions work?
        >
        > Best,
        > Flavio






Reply via email to