Hi Chesnay, That is an interesting proposal, thank you! I was doing something similar with the OutputFormat#close() method respecting the Format's parallelism. Using FinalizeOnMaster will make things easier. But the problem is that several OutputFormats must be synchronized externally - every output must check whether other outputs finished already... Quite cumbersome. Also there is a problem with exceptions - the OutputFormats can be never open and never closed.
Mark ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐ On Monday, June 8, 2020 5:50 PM, Chesnay Schepler <ches...@apache.org> wrote: > This goes in the right direction; have a look at > org.apache.flink.api.common.io.FinalizeOnMaster, which an OutputFormat can > implement to run something on the Master after all subtasks have been closed. > > On 08/06/2020 17:25, Andrey Zagrebin wrote: > >> Hi Mark, >> >> I do not know how you output the results in your pipeline. >> If you use DataSet#output(OutputFormat<T> outputFormat), you could try to >> extend the format with a custom close method which should be called once the >> task of the sink batch operator is done in the task manager. >> I also cc Aljoscha, maybe, he has more ideas. >> >> Best, >> Andrey >> >> On Sun, Jun 7, 2020 at 1:35 PM Mark Davis <moda...@protonmail.com> wrote: >> >>> Hi Jeff, >>> >>> Unfortunately this is not good enough for me. >>> My clients are very volatile, they start a batch and can go away any moment >>> without waiting for it to finish. Think of an elastic web application or an >>> AWS Lambda. >>> >>> I hopped to find something what could be deployed to the cluster together >>> with the batch code. Maybe a hook to a job manager or similar. I do not >>> plan to run anything heavy there, just some formal cleanups. >>> Is there something like this? >>> >>> Thank you! >>> >>> Mark >>> >>> ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐ >>> On Saturday, June 6, 2020 4:29 PM, Jeff Zhang <zjf...@gmail.com> wrote: >>> >>>> It would run in the client side where ExecutionEnvironment is created. >>>> >>>> Mark Davis <moda...@protonmail.com> 于2020年6月6日周六 下午8:14写道: >>>> >>>>> Hi Jeff, >>>>> >>>>> Thank you very much! That is exactly what I need. >>>>> >>>>> Where the listener code will run in the cluster deployment(YARN, k8s)? >>>>> Will it be sent over the network? >>>>> >>>>> Thank you! >>>>> >>>>> Mark >>>>> >>>>> ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐ >>>>> On Friday, June 5, 2020 6:13 PM, Jeff Zhang <zjf...@gmail.com> wrote: >>>>> >>>>>> You can try JobListener which you can register to ExecutionEnvironment. >>>>>> >>>>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java >>>>>> >>>>>> Mark Davis <moda...@protonmail.com> 于2020年6月6日周六 上午12:00写道: >>>>>> >>>>>>> Hi there, >>>>>>> >>>>>>> I am running a Batch job with several outputs. >>>>>>> Is there a way to run some code(e.g. release a distributed lock) after >>>>>>> all outputs are finished? >>>>>>> >>>>>>> Currently I do this in a try-finally block around >>>>>>> ExecutionEnvironment.execute() call, but I have to switch to the >>>>>>> detached execution mode - in this mode the finally block is never run. >>>>>>> >>>>>>> Thank you! >>>>>>> >>>>>>> Mark >>>>>> >>>>>> -- >>>>>> Best Regards >>>>>> >>>>>> Jeff Zhang >>>> >>>> -- >>>> Best Regards >>>> >>>> Jeff Zhang