[ 
https://issues.apache.org/jira/browse/FLINK-14184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17316420#comment-17316420
 ] 

Bob Tiernay edited comment on FLINK-14184 at 4/7/21, 3:22 PM:
--------------------------------------------------------------

This is a very important feature for a number of use cases (JMX registration 
for session clusters, background per-JVM threads, dependency injection 
frameworks, etc.). I've seen numerous posts on the user list that try to work 
around this issue in user space and it is currently intractable:
 * 
[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-lifecycle-listener-that-gets-notified-when-a-topology-starts-stops-on-a-task-manager-td30148.html]
 * 
[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/entrypoint-for-executing-job-in-task-manager-td17415.html]

Curious if the current work is slated for the 1.13 release. Big thanks for your 
efforts here!

 


was (Author: btiernay):
This is a very important feature for a number of use cases (JMX registration 
for session clusters, background per-JVM threads, etc.). I've seen numerous 
posts on the user list that try to work around this issue in user space and it 
is currently intractable:
 * 
[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-lifecycle-listener-that-gets-notified-when-a-topology-starts-stops-on-a-task-manager-td30148.html]
 * 
[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/entrypoint-for-executing-job-in-task-manager-td17415.html]

Curious if the current work is slated for the 1.13 release. Big thanks for your 
efforts here!

 

> Provide a stage listener API to be invoked per task manager
> -----------------------------------------------------------
>
>                 Key: FLINK-14184
>                 URL: https://issues.apache.org/jira/browse/FLINK-14184
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Task
>            Reporter: Stephen Connolly
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Often times a topology has nodes that need to use 3rd party APIs. Not every 
> 3rd party API is written in a good style for usage from within Flink.
> At present, implementing a `Rich___` will provide each stage with the 
> `open(...)` and `close()` callbacks, as the stage is accepted for execution 
> on each task manager.
> There is, however, a need for being able to listen for the first stage being 
> opened on any given task manager as well as the last stage being closed. 
> Critically the last stage being closed is the opportunity to release any 
> resources that are shared across multiple stages in the topology, e.g. 
> Database connection pools, Async HTTP Client thread pools, etc.
> Without such a clean-up hook, the connections and threads can act as GC roots 
> that prevent the topology's classloader from being unloaded and result in a 
> memory and resource leak in the task manager... nevermind that if it is a 
> Database connection pool, it may also be consuming resources from the 
> database.
> There are three workarounds available at present:
>  # Each stage just allocates its own resources and cleans up afterwards. This 
> is, in many ways, the ideal... however this can result in higher than 
> intended database connections, e.f. as each stage that accesses the database 
> stage needs to have a separate database connection rather than letting the 
> whole topology share the use of one or two connections through a connection 
> pool. Similarly, if the 3rd party library uses a static singleton for the 
> whole classloader there is no way for the independent stages to know when it 
> is safe to shut down the singleton
>  # Implement a reference counting proxy for the 3rd party API. This is a lot 
> of work, you need to ensure that deserialization of the proxy returns a 
> classloader singleton (so you can maintain the reference counts) and if the 
> count goes wrong you have leaked the resource
>  # Use a ReferenceQueue backed proxy. This is even more complex than 
> implementing reference counting, but has the advantage of not requiring the 
> count be maintained correctly. On the other hand, it does not provide for 
> eager release of the resources.
> If Flink provided a listener contract that could be registered with the 
> execution environment then this would allow the resources to be cleared out. 
> My proposed interface would look something like
> {code:java}
> public interface EnvironmentLocalTopologyListener extends Serializable {
>   /** 
>    * Called immediately prior to the first {@link 
> RichFunction#open(Configuration)}
>    * being invoked for the topology on the current task manager JVM for this
>    * classloader. Will not be called again unless {#close()} has been invoked 
> first.
>    * Use this method to eagerly initialize any ClassLoader scoped resources 
> that
>    * are pooled across the stages of the topology.
>    *
>    * @param parameters // I am unsure if this makes sense
>    */ 
>   default void open(Configuration parameters) throws Exception {}
>   /**
>    * Called after the last {@link RichFunction#close()} has completed and the
>    * topology is effectively being stopped (for the current ClassLoader).
>    * This method will only be invoked if a call to {@link 
> #open(Configuration)}
>    * was attempted, and will be invoked irrespective of whether the call to
>    * {@link #open(Configuration)} terminated normally or exceptionally.
>    * Use this method to release any ClassLoader scoped resources that have 
> been
>    * pooled across the stages of the topology.
>    */
>   default void close() throws Exception {}
>   /**
>    * Decorate the threads that are used to invoke the stages of the topology.
>    * Use this method, for example, to seed the {@link org.slf4j.MDC} with
>    * topology specific details, e.g.
>    * <pre>
>    * Runnable decorate(Runnable task) {
>    *   return () -> {
>    *     try (MDC.MDCClosable ctx = MDC.putCloseable("foo", "bar")){
>    *       task.run();
>    *     };
>    * }
>    * </pre>
>    *
>    * @param task // might not be the most appropriate type, I haven't 
>    *             // checked how Flink implements dispatch. May or may not
>    *             // want a parameters argument also. 
>    */
>   default Runnable decorate(Runnable task) { return task; }
> }{code}
> (Names subject to change)
> Then you would use this something like
> {code:java}
> env.addEnvironmentLocalTopologyListener(...);
> ...
> env.execute(...); {code}
> The listener would be serialized to each task manager and then before the 
> first task is executed the `open(...)` method would get invoked. Each thread 
> that is running a task would be decorated by the listener, and then once all 
> the stages are stopped the `close()` method would be invoked.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to