Hi, One clarification. `RichFunction#close` is of course called always, not only after internal failure. It’s called after internal failure, external failure or clean shutdown.
`SourceFunction#cancel` is intended to inform the `SourceFunction` to cleanly exit it’s `#run` method/loop (note SIGINT will be issued anyway). In this case `#close` also will be called after source’s threads exit. Piotrek > On 25 May 2020, at 21:37, Laurent Exsteens <laurent.exste...@euranova.eu> > wrote: > > Thank you, we'll try that. > > On Mon, May 25, 2020, 21:09 Piotr Nowojski <pi...@ververica.com > <mailto:pi...@ververica.com>> wrote: > Hi, > > Cancel method is being invoked only when SourceTask is being cancelled from > the outside, by JobManager - for example after detecting a failure of a > different Task. > > > What is the proper way to handle this issue? Is there some kind of closable > > source interface we should implement? > > Have you tried implementing > `org.apache.flink.api.common.functions.RichFunction#close` (extending > `AbstractRichFunction` and overloading `#close`)? This method should be > invoked when StreamTask is disposing it’s operators after an internal failure. > > Piotrek > >> On 25 May 2020, at 10:49, Laurent Exsteens <laurent.exste...@euranova.eu >> <mailto:laurent.exste...@euranova.eu>> wrote: >> >> Hello, >> >> we had to implement a specific source to read files in a certain way. The >> files we read are a NAS mounted through NFS. >> >> If an error occurs in a map after this specific source when the file is >> still being read, the file is never closed, resulting in the task manager >> keeping the file open (apparently) indefinitely, and the file not allowed to >> be moved until the task manager releases it >> We then have to kill the full task manager in order to release the file. >> >> I already added a closing of the file in the cancel method of the source. >> But this does not seem to be sufficient. >> >> What is the proper way to handle this issue? Is there some kind of closable >> source interface we should implement? >> >> Thanks in advance for your help. >> >> Best Regards, >> >> Laurent. >> >> -- >> Laurent Exsteens >> Data Engineer >> (M) +32 (0) 486 20 48 36 >> >> EURA NOVA >> Rue Emile Francqui, 4 >> 1435 Mont-Saint-Guibert >> (T) +32 10 75 02 00 <tel:%2B32%2010%2075%2002%2000> >> >> euranova.eu <http://euranova.eu/> >> research.euranova.eu <http://research.euranova.eu/> >> ♻ Be green, keep it on the screen > > > ♻ Be green, keep it on the screen