Hi to all,
in my use case I have to ingest data from a rest service, where I
periodically poll the data (of course a queue would be a better choice but
this doesn't depend on me).

So I wrote a RichSourceFunction that starts a thread that poll for new data.
However, I'd like to restart from the last "from" value (in the case the
job is stopped).

My initial thought was to write somewhere the last used date and, on job
restart, read that date (from a file for example). However, Flink stateful
source should be a better choice here...am I wrong? So I made  my source
function implementing ListCheckpointed<String>:

@Override
public List<String> snapshotState(long checkpointId, long timestamp) throws
Exception {
   return Collections.singletonList(pollingThread.getDateFromAsString());
}
@Override
public void restoreState(List<String> state) throws Exception {
    for (String dateFrom : state) {
         startDateStr = dateFrom;
     }
}

@Override
public void run(SourceContext<MyEvent> ctx) throws Exception {
       final Object lock = ctx.getCheckpointLock();
       Client httpClient = getHttpClient();
       try {
              pollingThread = new MyPollingThread.Builder(baseUrl,
httpClient)//
              .setStartDate(startDateStr, datePatternStr)//
              .build();
              // start the polling thread
              new Thread(pr).start();
        .... (etc)
}

Is this the correct approach or did I misunderstood how stateful source
functions work?

Best,
Flavio

Reply via email to