Hi,

Some suggestions from my side:
- synchronized (checkpointLock) to some work and ctx.collect?
- Put Thread.sleep(interval) out of try catch? Maybe should not
swallow interrupt exception (Like cancel the job).

Best,
Jingsong Lee

On Fri, May 8, 2020 at 2:52 AM Senthil Kumar <senthi...@vmware.com> wrote:

> I am implementing a source function which periodically wakes up and
> consumes data from S3.
>
>
>
> My currently implementation is like so.
>
> Following: 
> *org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction*
>
>
>
> Is it safe to simply swallow any and all exceptions in the run method and
> just rely on this.isRunning variable to quit the run() method?
>
>
>
> Cheers
>
> Kumar
>
>
>
> ---
>
>
>
> @Override
> *public void *cancel() {
>     *this*.*isRunning *= *false*;   // Set volatile state variable, initially 
> set to true on Class
> }
>
>
>
> @Override
> *public void *run(SourceFunction.SourceContext<OUT> ctx) {
>     *while *(*this*.*isRunning*) {
>         *try *{
>             OUT out = /* Do some work */
>             ctx.collect(out);
>
>             Thread.*sleep*(*this*.*sleepIntervalHours ** 60 * 60 * 1000);
> *// Hours to milli seconds         *} *catch*(Throwable t) {
>
>             // Simply swallow
>         }
>     }
> }
>
>
>


-- 
Best, Jingsong Lee

Reply via email to