Hi Kumar,

They way you've implemented your custom source sounds like the right way:
Having a "running" flag checked by the run() method and changing it in
cancel().
Also, it is good that you are properly handling the interrupt set by Flink
(some people ignore InterruptedExceptions, which make it difficult
(basically impossible) for Flink to stop the job)

Best,
Robert


On Wed, May 27, 2020 at 7:38 PM Senthil Kumar <senthi...@vmware.com> wrote:

> We are on flink 1.9.0
>
>
>
> I have a custom SourceFunction, where I rely on isRunning set to false via
> the cancel() function to exit out of the run loop.
>
> My run loop essentially gets the data from S3, and then simply sleeps
> (Thread.sleep) for a specified time interval.
>
>
>
> When a job gets cancelled, the SourceFunction.cancel() is called, which
> sets the isRunning to false.
>
> In addition, the Thread.sleep gets interrupted, a check Is made on the
> isRunning variable (set to false now) and the run loop is exited.
>
>
>
> We noticed that when we “stop” the flink job, the Thread.sleep does not
> get interrupted.
>
> It also appears that SoruceFunction.cancel() is not getting called (which
> seems like the correct behavior for “stop”)
>
>
>
> My question: what’s the “right” way to exit the run() loop when the flink
> job receives a stop command?
>
>
>
> My understanding was that there was a Stoppable interface (which got
> removed in 1.9.0)
>
>
>
> Would appreciate any insights.
>
>
>
> Cheers
>
> Kumar
>

Reply via email to