Ok great! Thanks everybody for the support On Wed, Jun 19, 2019 at 3:05 PM Chesnay Schepler <ches...@apache.org> wrote:
> A (Rich)SourceFunction that does not implement RichParallelSourceFunction > is always run with a parallelism of 1. > > On 19/06/2019 14:36, Flavio Pompermaier wrote: > > My sourcefunction is intrinsically single-thread. Is there a way to force > this aspect? > I can't find a real difference between a RichParallelSourceFunction and > a RichSourceFunction. > Is this last (RichSourceFunction) implicitly using parallelism = 1? > > On Wed, Jun 19, 2019 at 2:25 PM Chesnay Schepler <ches...@apache.org> > wrote: > >> It returns a list of states so that state can be re-distributed if the >> parallelism changes. >> >> If you hard-code the interface to return a single value then you're >> implicitly locking the parallelism. >> When you reduce the parallelism you'd no longer be able to restore all >> state, since you have less instances than stored state. >> >> On 19/06/2019 14:19, Flavio Pompermaier wrote: >> >> It's not clear to me why the source checkpoint returns a list of >> object...when it could be useful to use a list instead of a single value? >> The documentation says The returned list should contain one entry for >> redistributable unit of state" but this is not very clear to me.. >> >> Best, >> Flavio >> >> On Wed, Jun 19, 2019 at 12:40 PM Chesnay Schepler <ches...@apache.org> >> wrote: >> >>> This looks fine to me. >>> >>> What exactly were you worried about? >>> >>> On 19/06/2019 12:33, Flavio Pompermaier wrote: >>> > Hi to all, >>> > in my use case I have to ingest data from a rest service, where I >>> > periodically poll the data (of course a queue would be a better choice >>> > but this doesn't depend on me). >>> > >>> > So I wrote a RichSourceFunction that starts a thread that poll for new >>> > data. >>> > However, I'd like to restart from the last "from" value (in the case >>> > the job is stopped). >>> > >>> > My initial thought was to write somewhere the last used date and, on >>> > job restart, read that date (from a file for example). However, Flink >>> > stateful source should be a better choice here...am I wrong? So I >>> > made my source function implementing ListCheckpointed<String>: >>> > >>> > @Override >>> > public List<String> snapshotState(long checkpointId, long timestamp) >>> > throws Exception { >>> > return >>> Collections.singletonList(pollingThread.getDateFromAsString()); >>> > } >>> > @Override >>> > public void restoreState(List<String> state) throws Exception { >>> > for (String dateFrom : state) { >>> > startDateStr = dateFrom; >>> > } >>> > } >>> > >>> > @Override >>> > public void run(SourceContext<MyEvent> ctx) throws Exception { >>> > final Object lock = ctx.getCheckpointLock(); >>> > Client httpClient = getHttpClient(); >>> > try { >>> > pollingThread = new MyPollingThread.Builder(baseUrl, >>> > httpClient)// >>> > .setStartDate(startDateStr, datePatternStr)// >>> > .build(); >>> > // start the polling thread >>> > new Thread(pr).start(); >>> > .... (etc) >>> > } >>> > >>> > Is this the correct approach or did I misunderstood how stateful >>> > source functions work? >>> > >>> > Best, >>> > Flavio >>> >>> >>> >> >> > >