Hi Kumar,

this is more a Java question than a Flink question now :) If it is easily
possible from your code, then I would regularly check the isRunning flag
(by having short Thread.sleeps()) to have a proper cancellation behavior.
If this makes your code very complicated, then you could work with manually
interrupting your worker thread. I would only use this method if you are
sure your code (and the libraries you are using) are properly handling
interrupts.
Sorry that I can not give you a more actionable response. It depends a lot
on the structure of your code and the libraries you are calling into.

Best,
Robert


On Fri, May 29, 2020 at 10:48 PM Senthil Kumar <senthi...@vmware.com> wrote:

> Hi Robert,
>
>
>
> Would appreciate more insights please.
>
>
>
> What we are noticing: When the flink job is issued a stop command, the
> Thread.sleep is not receiving the InterruptedException
>
>
>
> It certainly receives the exception when the flink job is issued a cancel
> command.
>
>
>
> In both cases (cancel and stop) the cancel() method is getting called (to
> set the isRunning variable to false)
>
>
>
> However, given that the thread does not get interrupted in stop, we are
> not in a position to check the isRunning variable.
>
>
>
>
>
> For now, we are doing a Thread.sleep  every 5 minutes (instead of the
> normal interval which is in hours).
>
> Sleeping for 5 minutes gives us a chance to check the isRunning variable.
>
>
>
> Another approach would be to save the currentThread
> (Thread.currentThread()) before doing a Thread.sleep())
>
> and manually calling Thread.interrupt() from the cancel function.
>
>
>
> What is your recommendation?
>
>
>
> Cheers
>
> Kumar
>
>
>
>
>
> *From: *Robert Metzger <rmetz...@apache.org>
> *Date: *Friday, May 29, 2020 at 4:38 AM
> *To: *Senthil Kumar <senthi...@vmware.com>
> *Cc: *"user@flink.apache.org" <user@flink.apache.org>
> *Subject: *Re: Age old stop vs cancel debate
>
>
>
> Hi Kumar,
>
>
>
> They way you've implemented your custom source sounds like the right way:
> Having a "running" flag checked by the run() method and changing it in
> cancel().
>
> Also, it is good that you are properly handling the interrupt set by Flink
> (some people ignore InterruptedExceptions, which make it difficult
> (basically impossible) for Flink to stop the job)
>
>
>
> Best,
>
> Robert
>
>
>
>
>
> On Wed, May 27, 2020 at 7:38 PM Senthil Kumar <senthi...@vmware.com>
> wrote:
>
> We are on flink 1.9.0
>
>
>
> I have a custom SourceFunction, where I rely on isRunning set to false via
> the cancel() function to exit out of the run loop.
>
> My run loop essentially gets the data from S3, and then simply sleeps
> (Thread.sleep) for a specified time interval.
>
>
>
> When a job gets cancelled, the SourceFunction.cancel() is called, which
> sets the isRunning to false.
>
> In addition, the Thread.sleep gets interrupted, a check Is made on the
> isRunning variable (set to false now) and the run loop is exited.
>
>
>
> We noticed that when we “stop” the flink job, the Thread.sleep does not
> get interrupted.
>
> It also appears that SoruceFunction.cancel() is not getting called (which
> seems like the correct behavior for “stop”)
>
>
>
> My question: what’s the “right” way to exit the run() loop when the flink
> job receives a stop command?
>
>
>
> My understanding was that there was a Stoppable interface (which got
> removed in 1.9.0)
>
>
>
> Would appreciate any insights.
>
>
>
> Cheers
>
> Kumar
>
>

Reply via email to