This looks fine to me.

What exactly were you worried about?

On 19/06/2019 12:33, Flavio Pompermaier wrote:
Hi to all,
in my use case I have to ingest data from a rest service, where I periodically poll the data (of course a queue would be a better choice but this doesn't depend on me).

So I wrote a RichSourceFunction that starts a thread that poll for new data. However, I'd like to restart from the last "from" value (in the case the job is stopped).

My initial thought was to write somewhere the last used date and, on job restart, read that date (from a file for example). However, Flink stateful source should be a better choice here...am I wrong? So I made my source function implementing ListCheckpointed<String>:

@Override
public List<String> snapshotState(long checkpointId, long timestamp) throws Exception {
   return Collections.singletonList(pollingThread.getDateFromAsString());
}
@Override
public void restoreState(List<String> state) throws Exception {
    for (String dateFrom : state) {
         startDateStr = dateFrom;
     }
}

@Override
public void run(SourceContext<MyEvent> ctx) throws Exception {
       final Object lock = ctx.getCheckpointLock();
       Client httpClient = getHttpClient();
       try {
pollingThread = new MyPollingThread.Builder(baseUrl, httpClient)//
              .setStartDate(startDateStr, datePatternStr)//
              .build();
              // start the polling thread
              new Thread(pr).start();
        .... (etc)
}

Is this the correct approach or did I misunderstood how stateful source functions work?

Best,
Flavio


Reply via email to