I understand. Thanks for looking into it Senthil!

Kostas

On Tue, Jun 9, 2020 at 7:32 PM Senthil Kumar <senthi...@vmware.com> wrote:
>
> OK, will do and report back.
>
> We are on 1.9.1,
>
> 1.10 will take some time __
>
> On 6/9/20, 2:06 AM, "Kostas Kloudas" <kklou...@gmail.com> wrote:
>
>     Hi Senthil,
>
>     From a quick look at the code, it seems that the cancel() of your
>     source function should be called, and the thread that it is running on
>     should be interrupted.
>
>     In order to pin down the problem and help us see if this is an actual
>     bug, could you please:
>     1) enable debug logging and see if you can spot some lines like this:
>
>     "Starting checkpoint (XXXX-ID) SYNC_SAVEPOINT on task XXXXX" or sth
>     similar with synchronous savepoint in it
>
>     and any other message afterwards with XXXX-ID in it to see if the
>     savepoint is completed successfully.
>
>     2) could you see if this behavior persists in the FLINK-1.10?
>
>     Thanks,
>     Kostas
>
>     On Tue, Jun 2, 2020 at 4:20 PM Senthil Kumar <senthi...@vmware.com> wrote:
>     >
>     > Robert,
>     >
>     >
>     >
>     > Thank you once again! We are currently doing the “short” Thread.sleep() 
> approach. Seems to be working fine.
>     >
>     >
>     >
>     > Cheers
>     >
>     > Kumar
>     >
>     >
>     >
>     > From: Robert Metzger <rmetz...@apache.org>
>     > Date: Tuesday, June 2, 2020 at 2:40 AM
>     > To: Senthil Kumar <senthi...@vmware.com>
>     > Cc: "user@flink.apache.org" <user@flink.apache.org>
>     > Subject: Re: Age old stop vs cancel debate
>     >
>     >
>     >
>     > Hi Kumar,
>     >
>     >
>     > this is more a Java question than a Flink question now :) If it is 
> easily possible from your code, then I would regularly check the isRunning 
> flag (by having short Thread.sleeps()) to have a proper cancellation behavior.
>     >
>     > If this makes your code very complicated, then you could work with 
> manually interrupting your worker thread. I would only use this method if you 
> are sure your code (and the libraries you are using) are properly handling 
> interrupts.
>     >
>     > Sorry that I can not give you a more actionable response. It depends a 
> lot on the structure of your code and the libraries you are calling into.
>     >
>     >
>     >
>     > Best,
>     >
>     > Robert
>     >
>     >
>     >
>     >
>     >
>     > On Fri, May 29, 2020 at 10:48 PM Senthil Kumar <senthi...@vmware.com> 
> wrote:
>     >
>     > Hi Robert,
>     >
>     >
>     >
>     > Would appreciate more insights please.
>     >
>     >
>     >
>     > What we are noticing: When the flink job is issued a stop command, the 
> Thread.sleep is not receiving the InterruptedException
>     >
>     >
>     >
>     > It certainly receives the exception when the flink job is issued a 
> cancel command.
>     >
>     >
>     >
>     > In both cases (cancel and stop) the cancel() method is getting called 
> (to set the isRunning variable to false)
>     >
>     >
>     >
>     > However, given that the thread does not get interrupted in stop, we are 
> not in a position to check the isRunning variable.
>     >
>     >
>     >
>     >
>     >
>     > For now, we are doing a Thread.sleep  every 5 minutes (instead of the 
> normal interval which is in hours).
>     >
>     > Sleeping for 5 minutes gives us a chance to check the isRunning 
> variable.
>     >
>     >
>     >
>     > Another approach would be to save the currentThread 
> (Thread.currentThread()) before doing a Thread.sleep())
>     >
>     > and manually calling Thread.interrupt() from the cancel function.
>     >
>     >
>     >
>     > What is your recommendation?
>     >
>     >
>     >
>     > Cheers
>     >
>     > Kumar
>     >
>     >
>     >
>     >
>     >
>     > From: Robert Metzger <rmetz...@apache.org>
>     > Date: Friday, May 29, 2020 at 4:38 AM
>     > To: Senthil Kumar <senthi...@vmware.com>
>     > Cc: "user@flink.apache.org" <user@flink.apache.org>
>     > Subject: Re: Age old stop vs cancel debate
>     >
>     >
>     >
>     > Hi Kumar,
>     >
>     >
>     >
>     > They way you've implemented your custom source sounds like the right 
> way: Having a "running" flag checked by the run() method and changing it in 
> cancel().
>     >
>     > Also, it is good that you are properly handling the interrupt set by 
> Flink (some people ignore InterruptedExceptions, which make it difficult 
> (basically impossible) for Flink to stop the job)
>     >
>     >
>     >
>     > Best,
>     >
>     > Robert
>     >
>     >
>     >
>     >
>     >
>     > On Wed, May 27, 2020 at 7:38 PM Senthil Kumar <senthi...@vmware.com> 
> wrote:
>     >
>     > We are on flink 1.9.0
>     >
>     >
>     >
>     > I have a custom SourceFunction, where I rely on isRunning set to false 
> via the cancel() function to exit out of the run loop.
>     >
>     > My run loop essentially gets the data from S3, and then simply sleeps 
> (Thread.sleep) for a specified time interval.
>     >
>     >
>     >
>     > When a job gets cancelled, the SourceFunction.cancel() is called, which 
> sets the isRunning to false.
>     >
>     > In addition, the Thread.sleep gets interrupted, a check Is made on the 
> isRunning variable (set to false now) and the run loop is exited.
>     >
>     >
>     >
>     > We noticed that when we “stop” the flink job, the Thread.sleep does not 
> get interrupted.
>     >
>     > It also appears that SoruceFunction.cancel() is not getting called 
> (which seems like the correct behavior for “stop”)
>     >
>     >
>     >
>     > My question: what’s the “right” way to exit the run() loop when the 
> flink job receives a stop command?
>     >
>     >
>     >
>     > My understanding was that there was a Stoppable interface (which got 
> removed in 1.9.0)
>     >
>     >
>     >
>     > Would appreciate any insights.
>     >
>     >
>     >
>     > Cheers
>     >
>     > Kumar
>
>

Reply via email to