[ https://issues.apache.org/jira/browse/FLINK-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17316420#comment-17316420 ]
Bob Tiernay edited comment on FLINK-14184 at 4/7/21, 3:22 PM: -------------------------------------------------------------- This is a very important feature for a number of use cases (JMX registration for session clusters, background per-JVM threads, dependency injection frameworks, etc.). I've seen numerous posts on the user list that try to work around this issue in user space and it is currently intractable: * [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-lifecycle-listener-that-gets-notified-when-a-topology-starts-stops-on-a-task-manager-td30148.html] * [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/entrypoint-for-executing-job-in-task-manager-td17415.html] Curious if the current work is slated for the 1.13 release. Big thanks for your efforts here! was (Author: btiernay): This is a very important feature for a number of use cases (JMX registration for session clusters, background per-JVM threads, etc.). I've seen numerous posts on the user list that try to work around this issue in user space and it is currently intractable: * [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-lifecycle-listener-that-gets-notified-when-a-topology-starts-stops-on-a-task-manager-td30148.html] * [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/entrypoint-for-executing-job-in-task-manager-td17415.html] Curious if the current work is slated for the 1.13 release. Big thanks for your efforts here! > Provide a stage listener API to be invoked per task manager > ----------------------------------------------------------- > > Key: FLINK-14184 > URL: https://issues.apache.org/jira/browse/FLINK-14184 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task > Reporter: Stephen Connolly > Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Often times a topology has nodes that need to use 3rd party APIs. Not every > 3rd party API is written in a good style for usage from within Flink. > At present, implementing a `Rich___` will provide each stage with the > `open(...)` and `close()` callbacks, as the stage is accepted for execution > on each task manager. > There is, however, a need for being able to listen for the first stage being > opened on any given task manager as well as the last stage being closed. > Critically the last stage being closed is the opportunity to release any > resources that are shared across multiple stages in the topology, e.g. > Database connection pools, Async HTTP Client thread pools, etc. > Without such a clean-up hook, the connections and threads can act as GC roots > that prevent the topology's classloader from being unloaded and result in a > memory and resource leak in the task manager... nevermind that if it is a > Database connection pool, it may also be consuming resources from the > database. > There are three workarounds available at present: > # Each stage just allocates its own resources and cleans up afterwards. This > is, in many ways, the ideal... however this can result in higher than > intended database connections, e.f. as each stage that accesses the database > stage needs to have a separate database connection rather than letting the > whole topology share the use of one or two connections through a connection > pool. Similarly, if the 3rd party library uses a static singleton for the > whole classloader there is no way for the independent stages to know when it > is safe to shut down the singleton > # Implement a reference counting proxy for the 3rd party API. This is a lot > of work, you need to ensure that deserialization of the proxy returns a > classloader singleton (so you can maintain the reference counts) and if the > count goes wrong you have leaked the resource > # Use a ReferenceQueue backed proxy. This is even more complex than > implementing reference counting, but has the advantage of not requiring the > count be maintained correctly. On the other hand, it does not provide for > eager release of the resources. > If Flink provided a listener contract that could be registered with the > execution environment then this would allow the resources to be cleared out. > My proposed interface would look something like > {code:java} > public interface EnvironmentLocalTopologyListener extends Serializable { > /** > * Called immediately prior to the first {@link > RichFunction#open(Configuration)} > * being invoked for the topology on the current task manager JVM for this > * classloader. Will not be called again unless {#close()} has been invoked > first. > * Use this method to eagerly initialize any ClassLoader scoped resources > that > * are pooled across the stages of the topology. > * > * @param parameters // I am unsure if this makes sense > */ > default void open(Configuration parameters) throws Exception {} > /** > * Called after the last {@link RichFunction#close()} has completed and the > * topology is effectively being stopped (for the current ClassLoader). > * This method will only be invoked if a call to {@link > #open(Configuration)} > * was attempted, and will be invoked irrespective of whether the call to > * {@link #open(Configuration)} terminated normally or exceptionally. > * Use this method to release any ClassLoader scoped resources that have > been > * pooled across the stages of the topology. > */ > default void close() throws Exception {} > /** > * Decorate the threads that are used to invoke the stages of the topology. > * Use this method, for example, to seed the {@link org.slf4j.MDC} with > * topology specific details, e.g. > * <pre> > * Runnable decorate(Runnable task) { > * return () -> { > * try (MDC.MDCClosable ctx = MDC.putCloseable("foo", "bar")){ > * task.run(); > * }; > * } > * </pre> > * > * @param task // might not be the most appropriate type, I haven't > * // checked how Flink implements dispatch. May or may not > * // want a parameters argument also. > */ > default Runnable decorate(Runnable task) { return task; } > }{code} > (Names subject to change) > Then you would use this something like > {code:java} > env.addEnvironmentLocalTopologyListener(...); > ... > env.execute(...); {code} > The listener would be serialized to each task manager and then before the > first task is executed the `open(...)` method would get invoked. Each thread > that is running a task would be decorated by the listener, and then once all > the stages are stopped the `close()` method would be invoked. > -- This message was sent by Atlassian Jira (v8.3.4#803005)