Hi Kumar, this is more a Java question than a Flink question now :) If it is easily possible from your code, then I would regularly check the isRunning flag (by having short Thread.sleeps()) to have a proper cancellation behavior. If this makes your code very complicated, then you could work with manually interrupting your worker thread. I would only use this method if you are sure your code (and the libraries you are using) are properly handling interrupts. Sorry that I can not give you a more actionable response. It depends a lot on the structure of your code and the libraries you are calling into.
Best, Robert On Fri, May 29, 2020 at 10:48 PM Senthil Kumar <senthi...@vmware.com> wrote: > Hi Robert, > > > > Would appreciate more insights please. > > > > What we are noticing: When the flink job is issued a stop command, the > Thread.sleep is not receiving the InterruptedException > > > > It certainly receives the exception when the flink job is issued a cancel > command. > > > > In both cases (cancel and stop) the cancel() method is getting called (to > set the isRunning variable to false) > > > > However, given that the thread does not get interrupted in stop, we are > not in a position to check the isRunning variable. > > > > > > For now, we are doing a Thread.sleep every 5 minutes (instead of the > normal interval which is in hours). > > Sleeping for 5 minutes gives us a chance to check the isRunning variable. > > > > Another approach would be to save the currentThread > (Thread.currentThread()) before doing a Thread.sleep()) > > and manually calling Thread.interrupt() from the cancel function. > > > > What is your recommendation? > > > > Cheers > > Kumar > > > > > > *From: *Robert Metzger <rmetz...@apache.org> > *Date: *Friday, May 29, 2020 at 4:38 AM > *To: *Senthil Kumar <senthi...@vmware.com> > *Cc: *"user@flink.apache.org" <user@flink.apache.org> > *Subject: *Re: Age old stop vs cancel debate > > > > Hi Kumar, > > > > They way you've implemented your custom source sounds like the right way: > Having a "running" flag checked by the run() method and changing it in > cancel(). > > Also, it is good that you are properly handling the interrupt set by Flink > (some people ignore InterruptedExceptions, which make it difficult > (basically impossible) for Flink to stop the job) > > > > Best, > > Robert > > > > > > On Wed, May 27, 2020 at 7:38 PM Senthil Kumar <senthi...@vmware.com> > wrote: > > We are on flink 1.9.0 > > > > I have a custom SourceFunction, where I rely on isRunning set to false via > the cancel() function to exit out of the run loop. > > My run loop essentially gets the data from S3, and then simply sleeps > (Thread.sleep) for a specified time interval. > > > > When a job gets cancelled, the SourceFunction.cancel() is called, which > sets the isRunning to false. > > In addition, the Thread.sleep gets interrupted, a check Is made on the > isRunning variable (set to false now) and the run loop is exited. > > > > We noticed that when we “stop” the flink job, the Thread.sleep does not > get interrupted. > > It also appears that SoruceFunction.cancel() is not getting called (which > seems like the correct behavior for “stop”) > > > > My question: what’s the “right” way to exit the run() loop when the flink > job receives a stop command? > > > > My understanding was that there was a Stoppable interface (which got > removed in 1.9.0) > > > > Would appreciate any insights. > > > > Cheers > > Kumar > >