[GitHub] [flink] AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader.
AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader. URL: https://github.com/apache/flink/pull/11303#discussion_r394150929 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java ## @@ -82,4 +89,61 @@ public static ResolveOrder fromString(String resolveOrder) { super(urls, parent); } } + + /** +* Ensures that holding a reference on the context class loader outliving the scope of user code does not prevent +* the user classloader to be garbage collected (FLINK-16245). +* +* This classloader delegates to the actual user classloader. Upon {@link #close()}, the delegate is nulled +* and can be garbage collected. Additional class resolution will be resolved solely through the bootstrap +* classloader and most likely result in ClassNotFound exceptions. +*/ + private static class SafetyNetWrapperClassLoader extends URLClassLoader + implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(SafetyNetWrapperClassLoader.class); + + private FlinkUserCodeClassLoader inner; + + SafetyNetWrapperClassLoader(FlinkUserCodeClassLoader inner) { + super(new URL[0], null); + this.inner = inner; + } + + @Override + public void close() { + if (inner != null) { + try { + inner.close(); + } catch (IOException e) { + LOG.warn("Could not close user classloader", e); + } + } + inner = null; + } + + @Override + protected Class loadClass(String name, boolean resolve) throws ClassNotFoundException { + if (inner == null) { + return super.loadClass(name, resolve); Review comment: > No, with fail early I meant that the (leaked) thread with the leaked classloader reference would crash ASAP if it tries to load a class when the CL was closed. That sounds completely fine with me, but are we sure the CL is only leaked by leaking the thread? I understood that the Apache Commons Logging framework lives outside of the thread (in app classloader) and keeps the reference to the user classloader for the whole TM lifecycle. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader.
AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader. URL: https://github.com/apache/flink/pull/11303#discussion_r393616067 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java ## @@ -82,4 +89,61 @@ public static ResolveOrder fromString(String resolveOrder) { super(urls, parent); } } + + /** +* Ensures that holding a reference on the context class loader outliving the scope of user code does not prevent +* the user classloader to be garbage collected (FLINK-16245). +* +* This classloader delegates to the actual user classloader. Upon {@link #close()}, the delegate is nulled +* and can be garbage collected. Additional class resolution will be resolved solely through the bootstrap +* classloader and most likely result in ClassNotFound exceptions. +*/ + private static class SafetyNetWrapperClassLoader extends URLClassLoader + implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(SafetyNetWrapperClassLoader.class); + + private FlinkUserCodeClassLoader inner; + + SafetyNetWrapperClassLoader(FlinkUserCodeClassLoader inner) { + super(new URL[0], null); + this.inner = inner; + } + + @Override + public void close() { + if (inner != null) { + try { + inner.close(); + } catch (IOException e) { + LOG.warn("Could not close user classloader", e); + } + } + inner = null; + } + + @Override + protected Class loadClass(String name, boolean resolve) throws ClassNotFoundException { + if (inner == null) { + return super.loadClass(name, resolve); Review comment: I thought that's what you meant with fail early. If you look into https://issues.apache.org/jira/browse/FLINK-11205, you can see the main motivation for this PR. Here Stephan wrote > Some thoughts on how to approach this are: > ... > As a final safety-net, the TMs kill/restart themselves when the metaspace blows up FLINK-16225 > ... > Joey Echeverria A generic mechanism to prevent leaks through ClassLoader caching (as in Apache Commons Logging) would be FLINK-16245 (use a delegating class loader where we drop the reference to the real one when closing it). Btw, this probably answers your initial point in this thread: throwing ISE it is. Now, I just need to figure out how to make sure that this exception will actually let TM fail instead of any later job. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader.
AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader. URL: https://github.com/apache/flink/pull/11303#discussion_r393616067 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java ## @@ -82,4 +89,61 @@ public static ResolveOrder fromString(String resolveOrder) { super(urls, parent); } } + + /** +* Ensures that holding a reference on the context class loader outliving the scope of user code does not prevent +* the user classloader to be garbage collected (FLINK-16245). +* +* This classloader delegates to the actual user classloader. Upon {@link #close()}, the delegate is nulled +* and can be garbage collected. Additional class resolution will be resolved solely through the bootstrap +* classloader and most likely result in ClassNotFound exceptions. +*/ + private static class SafetyNetWrapperClassLoader extends URLClassLoader + implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(SafetyNetWrapperClassLoader.class); + + private FlinkUserCodeClassLoader inner; + + SafetyNetWrapperClassLoader(FlinkUserCodeClassLoader inner) { + super(new URL[0], null); + this.inner = inner; + } + + @Override + public void close() { + if (inner != null) { + try { + inner.close(); + } catch (IOException e) { + LOG.warn("Could not close user classloader", e); + } + } + inner = null; + } + + @Override + protected Class loadClass(String name, boolean resolve) throws ClassNotFoundException { + if (inner == null) { + return super.loadClass(name, resolve); Review comment: I thought that's what you meant with fail early. If you look into https://issues.apache.org/jira/browse/FLINK-11205, you can see the main motivation for this PR. Here Stephan wrote > Some thoughts on how to approach this are: > ... > As a final safety-net, the TMs kill/restart themselves when the metaspace blows up FLINK-16225 Btw, this answers your initial point in this thread: throwing ISE it is. Now, I just need to figure out how to make sure that this exception will actually let TM fail instead of any later job. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader.
AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader. URL: https://github.com/apache/flink/pull/11303#discussion_r393553966 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java ## @@ -82,4 +89,61 @@ public static ResolveOrder fromString(String resolveOrder) { super(urls, parent); } } + + /** +* Ensures that holding a reference on the context class loader outliving the scope of user code does not prevent +* the user classloader to be garbage collected (FLINK-16245). +* +* This classloader delegates to the actual user classloader. Upon {@link #close()}, the delegate is nulled +* and can be garbage collected. Additional class resolution will be resolved solely through the bootstrap +* classloader and most likely result in ClassNotFound exceptions. +*/ + private static class SafetyNetWrapperClassLoader extends URLClassLoader + implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(SafetyNetWrapperClassLoader.class); + + private FlinkUserCodeClassLoader inner; + + SafetyNetWrapperClassLoader(FlinkUserCodeClassLoader inner) { + super(new URL[0], null); + this.inner = inner; + } + + @Override + public void close() { + if (inner != null) { + try { + inner.close(); + } catch (IOException e) { + LOG.warn("Could not close user classloader", e); + } + } + inner = null; + } + + @Override + protected Class loadClass(String name, boolean resolve) throws ClassNotFoundException { + if (inner == null) { + return super.loadClass(name, resolve); Review comment: > Given that the CL is only closed when all tasks on the TE have stopped, how could it affect a job? Leftover threads, sure, but what happens to those shouldn't affect job execution, and failing as early as possible might actually be a benefit. I actually meant it the other way around. What happens if we have job1 and job2, where the CL of job1 is leaked forward while executing job2. The TM would die taking job2 with it with a somewhat unrelated error. In particular, I'd think it would be pretty confusing/inperformant for larger batch clusters. @StephanEwen WDYT? But since I'm also a fan of failing early, I'd go that way once Stephan agrees (I kinda had the impression that he wanted it to be graceful). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader.
AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader. URL: https://github.com/apache/flink/pull/11303#discussion_r393237279 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java ## @@ -82,4 +89,61 @@ public static ResolveOrder fromString(String resolveOrder) { super(urls, parent); } } + + /** +* Ensures that holding a reference on the context class loader outliving the scope of user code does not prevent +* the user classloader to be garbage collected (FLINK-16245). +* +* This classloader delegates to the actual user classloader. Upon {@link #close()}, the delegate is nulled +* and can be garbage collected. Additional class resolution will be resolved solely through the bootstrap +* classloader and most likely result in ClassNotFound exceptions. +*/ + private static class SafetyNetWrapperClassLoader extends URLClassLoader + implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(SafetyNetWrapperClassLoader.class); + + private FlinkUserCodeClassLoader inner; + + SafetyNetWrapperClassLoader(FlinkUserCodeClassLoader inner) { + super(new URL[0], null); + this.inner = inner; + } + + @Override + public void close() { + if (inner != null) { + try { + inner.close(); + } catch (IOException e) { + LOG.warn("Could not close user classloader", e); + } + } + inner = null; + } + + @Override + protected Class loadClass(String name, boolean resolve) throws ClassNotFoundException { + if (inner == null) { + return super.loadClass(name, resolve); Review comment: I understand where you are coming from, but this will effectively make any job with leaked context classloader fail eventually, even if just basic Java stuff needs to be loaded. If we want to go that route, we should think twice what to throw. An `IllegalStateException` would be the correct one, but does it fit in all cases? We could also throw a `ClassNotFoundException` to not change the behavior too much and provide a good description, but that may trigger some unwanted fallback logic in user libraries. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader.
AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader. URL: https://github.com/apache/flink/pull/11303#discussion_r393232164 ## File path: flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java ## @@ -396,14 +397,16 @@ private TaskExecutorFailureStrategy(int failAfterCallNumber) { @Override void fail(int trackingIndex) throws Exception { //noinspection OverlyBroadCatchBlock - try { - restartTaskManager(); - } catch (InterruptedException e) { - // ignore the exception, task should have been failed while stopping TM - Thread.currentThread().interrupt(); - } catch (Throwable t) { - failureTracker.unrelatedFailure(t); - throw t; + try (TemporaryClassLoaderContext unused = TemporaryClassLoaderContext.of(ClassLoader.getSystemClassLoader())) { Review comment: The test is badly written, but I haven't found a better solution. We are restarting a task manager from within a UDF (with context classloader set to user class loader). Restarting means closing TM, which closes context class loader, such that starting the TM fails. (I had hoped that the commit message is clear enough on the issue and happy to take suggestions) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader.
AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader. URL: https://github.com/apache/flink/pull/11303#discussion_r393234335 ## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java ## @@ -240,7 +240,11 @@ public void closeSession(String sessionId) throws SqlExecutionException { } }); // Remove the session's ExecutionContext from contextMap. - this.contextMap.remove(sessionId); + try { + this.contextMap.remove(sessionId).close(); + } catch (IOException e) { + // ignore any throwable to keep the clean up running Review comment: Should we then also log on cancelQuery on L237? I tried to keep it symmetric. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader.
AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader. URL: https://github.com/apache/flink/pull/11303#discussion_r393232164 ## File path: flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java ## @@ -396,14 +397,16 @@ private TaskExecutorFailureStrategy(int failAfterCallNumber) { @Override void fail(int trackingIndex) throws Exception { //noinspection OverlyBroadCatchBlock - try { - restartTaskManager(); - } catch (InterruptedException e) { - // ignore the exception, task should have been failed while stopping TM - Thread.currentThread().interrupt(); - } catch (Throwable t) { - failureTracker.unrelatedFailure(t); - throw t; + try (TemporaryClassLoaderContext unused = TemporaryClassLoaderContext.of(ClassLoader.getSystemClassLoader())) { Review comment: The test is badly written, but I haven't found a better solution. We are restarting a task manager from within a UDF (with context classloader set to user class loader). Restarting means closing TM, which closes context class loader, such that starting the TM fails. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader.
AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader. URL: https://github.com/apache/flink/pull/11303#discussion_r392210373 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java ## @@ -82,4 +91,78 @@ public static ResolveOrder fromString(String resolveOrder) { super(urls, parent); } } + + /** +* Ensures that holding a reference on the context class loader outliving the scope of user code does not prevent +* the user classloader to be garbage collected (FLINK-16245). +* +* This classloader delegates to the actual user classloader. Upon {@link #close()}, the delegate is nulled +* and can be garbage collected. Additional class resolution will be resolved solely through the bootstrap +* classloader and most likely result in ClassNotFound exceptions. +* +* @param the classloader type that also needs to be closeable. +*/ + private static class SafetyNetWrapperClassLoader extends CloseableClassLoader + implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(SafetyNetWrapperClassLoader.class); + + private T inner; + + SafetyNetWrapperClassLoader(T inner) { + super(null); + this.inner = inner; + } + + @Override + public void close() { + if (inner != null) { + try { + inner.close(); + } catch (IOException e) { + LOG.warn("Could not close user classloader", e); + } + } + inner = null; + } + + @Override + protected Class loadClass(String name, boolean resolve) throws ClassNotFoundException { + if (inner == null) { + return super.loadClass(name, resolve); + } + + synchronized (getClassLoadingLock(name)) { Review comment: Added a common ancestor now and used delegation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader.
AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader. URL: https://github.com/apache/flink/pull/11303#discussion_r392122863 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java ## @@ -82,4 +91,78 @@ public static ResolveOrder fromString(String resolveOrder) { super(urls, parent); } } + + /** +* Ensures that holding a reference on the context class loader outliving the scope of user code does not prevent +* the user classloader to be garbage collected (FLINK-16245). +* +* This classloader delegates to the actual user classloader. Upon {@link #close()}, the delegate is nulled +* and can be garbage collected. Additional class resolution will be resolved solely through the bootstrap +* classloader and most likely result in ClassNotFound exceptions. +* +* @param the classloader type that also needs to be closeable. +*/ + private static class SafetyNetWrapperClassLoader extends CloseableClassLoader + implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(SafetyNetWrapperClassLoader.class); + + private T inner; + + SafetyNetWrapperClassLoader(T inner) { + super(null); + this.inner = inner; + } + + @Override + public void close() { + if (inner != null) { + try { + inner.close(); + } catch (IOException e) { + LOG.warn("Could not close user classloader", e); + } + } + inner = null; + } + + @Override + protected Class loadClass(String name, boolean resolve) throws ClassNotFoundException { + if (inner == null) { + return super.loadClass(name, resolve); + } + + synchronized (getClassLoadingLock(name)) { + final Class loadedClass = findLoadedClass(name); + if (loadedClass != null) { + return resolveIfNeeded(resolve, loadedClass); + } + + return resolveIfNeeded(resolve, inner.loadClass(name)); + } + } + + private Class resolveIfNeeded(final boolean resolve, final Class loadedClass) { Review comment: Added a common ancestor now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader.
AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader. URL: https://github.com/apache/flink/pull/11303#discussion_r391564148 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java ## @@ -82,4 +91,78 @@ public static ResolveOrder fromString(String resolveOrder) { super(urls, parent); } } + + /** +* Ensures that holding a reference on the context class loader outliving the scope of user code does not prevent +* the user classloader to be garbage collected (FLINK-16245). +* +* This classloader delegates to the actual user classloader. Upon {@link #close()}, the delegate is nulled +* and can be garbage collected. Additional class resolution will be resolved solely through the bootstrap +* classloader and most likely result in ClassNotFound exceptions. +* +* @param the classloader type that also needs to be closeable. +*/ + private static class SafetyNetWrapperClassLoader extends CloseableClassLoader + implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(SafetyNetWrapperClassLoader.class); + + private T inner; + + SafetyNetWrapperClassLoader(T inner) { + super(null); + this.inner = inner; + } + + @Override + public void close() { + if (inner != null) { + try { + inner.close(); + } catch (IOException e) { + LOG.warn("Could not close user classloader", e); + } + } + inner = null; + } + + @Override + protected Class loadClass(String name, boolean resolve) throws ClassNotFoundException { + if (inner == null) { + return super.loadClass(name, resolve); + } + + synchronized (getClassLoadingLock(name)) { + final Class loadedClass = findLoadedClass(name); + if (loadedClass != null) { + return resolveIfNeeded(resolve, loadedClass); + } + + return resolveIfNeeded(resolve, inner.loadClass(name)); + } + } + + private Class resolveIfNeeded(final boolean resolve, final Class loadedClass) { Review comment: No, still not possible. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader.
AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader. URL: https://github.com/apache/flink/pull/11303#discussion_r390953027 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java ## @@ -18,35 +18,44 @@ package org.apache.flink.runtime.execution.librarycache; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.util.ChildFirstClassLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; import java.net.URL; import java.net.URLClassLoader; +import java.util.Enumeration; /** * Gives the URLClassLoader a nicer name for debugging purposes. */ public class FlinkUserCodeClassLoaders { - public static URLClassLoader parentFirst(URL[] urls, ClassLoader parent) { + @VisibleForTesting + static URLClassLoader parentFirst(URL[] urls, ClassLoader parent) { Review comment: I was not considering it initially because delegation pattern on implementation level felt wrong, but it's messy for ClassLoader anyways and it would solve this discussion, so I adjusted the PR. Now we just need to converge on `loadClass`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader.
AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader. URL: https://github.com/apache/flink/pull/11303#discussion_r388301548 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java ## @@ -82,4 +91,78 @@ public static ResolveOrder fromString(String resolveOrder) { super(urls, parent); } } + + /** +* Ensures that holding a reference on the context class loader outliving the scope of user code does not prevent +* the user classloader to be garbage collected (FLINK-16245). +* +* This classloader delegates to the actual user classloader. Upon {@link #close()}, the delegate is nulled +* and can be garbage collected. Additional class resolution will be resolved solely through the bootstrap +* classloader and most likely result in ClassNotFound exceptions. +* +* @param the classloader type that also needs to be closeable. +*/ + private static class SafetyNetWrapperClassLoader extends CloseableClassLoader + implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(SafetyNetWrapperClassLoader.class); + + private T inner; + + SafetyNetWrapperClassLoader(T inner) { + super(null); + this.inner = inner; + } + + @Override + public void close() { + if (inner != null) { + try { + inner.close(); + } catch (IOException e) { + LOG.warn("Could not close user classloader", e); + } + } + inner = null; + } + + @Override + protected Class loadClass(String name, boolean resolve) throws ClassNotFoundException { + if (inner == null) { + return super.loadClass(name, resolve); + } + + synchronized (getClassLoadingLock(name)) { Review comment: We would need to introduce a common ancestor to `Flink[Parent|Child]FirstClassLoader` that does nothing but exposes `loadClass`. I'm really not convinced that this is better than do the resolution as is. It would certainly not be more concise and imho it would increase the complexity. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader.
AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader. URL: https://github.com/apache/flink/pull/11303#discussion_r388299382 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java ## @@ -18,35 +18,44 @@ package org.apache.flink.runtime.execution.librarycache; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.util.ChildFirstClassLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; import java.net.URL; import java.net.URLClassLoader; +import java.util.Enumeration; /** * Gives the URLClassLoader a nicer name for debugging purposes. */ public class FlinkUserCodeClassLoaders { - public static URLClassLoader parentFirst(URL[] urls, ClassLoader parent) { + @VisibleForTesting + static URLClassLoader parentFirst(URL[] urls, ClassLoader parent) { Review comment: Hm it's not public API and they could achieve the same thing through `#create` with one extra parameter. Unless I change the signature of this method, this method cannot use the safety net (or safety net would also need to be URLClassLoader which I wanted to avoid), which may also be fine, but I wanted to point that out explicitly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader.
AHeise commented on a change in pull request #11303: [FLINK-16245] Decoupling user classloader from context classloader. URL: https://github.com/apache/flink/pull/11303#discussion_r387586947 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java ## @@ -82,4 +91,78 @@ public static ResolveOrder fromString(String resolveOrder) { super(urls, parent); } } + + /** +* Ensures that holding a reference on the context class loader outliving the scope of user code does not prevent +* the user classloader to be garbage collected (FLINK-16245). +* +* This classloader delegates to the actual user classloader. Upon {@link #close()}, the delegate is nulled +* and can be garbage collected. Additional class resolution will be resolved solely through the bootstrap +* classloader and most likely result in ClassNotFound exceptions. +* +* @param the classloader type that also needs to be closeable. +*/ + private static class SafetyNetWrapperClassLoader extends CloseableClassLoader + implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(SafetyNetWrapperClassLoader.class); + + private T inner; + + SafetyNetWrapperClassLoader(T inner) { + super(null); + this.inner = inner; + } + + @Override + public void close() { + if (inner != null) { + try { + inner.close(); + } catch (IOException e) { + LOG.warn("Could not close user classloader", e); + } + } + inner = null; + } + + @Override + protected Class loadClass(String name, boolean resolve) throws ClassNotFoundException { + if (inner == null) { + return super.loadClass(name, resolve); + } + + synchronized (getClassLoadingLock(name)) { Review comment: `inner` is typically a `URLClassLoader`. We could subclass it, but I'd actually like to avoid that unless you have a strong opinion on that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11303: [Flink-16245] Decoupling user classloader from context classloader.
AHeise commented on a change in pull request #11303: [Flink-16245] Decoupling user classloader from context classloader. URL: https://github.com/apache/flink/pull/11303#discussion_r387574848 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java ## @@ -82,4 +91,78 @@ public static ResolveOrder fromString(String resolveOrder) { super(urls, parent); } } + + /** +* Ensures that holding a reference on the context class loader outliving the scope of user code does not prevent +* the user classloader to be garbage collected (FLINK-16245). +* +* This classloader delegates to the actual user classloader. Upon {@link #close()}, the delegate is nulled +* and can be garbage collected. Additional class resolution will be resolved solely through the bootstrap +* classloader and most likely result in ClassNotFound exceptions. +* +* @param the classloader type that also needs to be closeable. +*/ + private static class SafetyNetWrapperClassLoader extends CloseableClassLoader + implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(SafetyNetWrapperClassLoader.class); + + private T inner; + + SafetyNetWrapperClassLoader(T inner) { + super(null); + this.inner = inner; + } + + @Override + public void close() { + if (inner != null) { + try { + inner.close(); + } catch (IOException e) { + LOG.warn("Could not close user classloader", e); + } + } + inner = null; + } + + @Override + protected Class loadClass(String name, boolean resolve) throws ClassNotFoundException { + if (inner == null) { + return super.loadClass(name, resolve); + } + + synchronized (getClassLoadingLock(name)) { Review comment: `loadClass(String name, boolean resolve)` is protected, `loadClass(String name)` is not resolving by default. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] AHeise commented on a change in pull request #11303: [Flink-16245] Decoupling user classloader from context classloader.
AHeise commented on a change in pull request #11303: [Flink-16245] Decoupling user classloader from context classloader. URL: https://github.com/apache/flink/pull/11303#discussion_r387574830 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java ## @@ -82,4 +91,78 @@ public static ResolveOrder fromString(String resolveOrder) { super(urls, parent); } } + + /** +* Ensures that holding a reference on the context class loader outliving the scope of user code does not prevent +* the user classloader to be garbage collected (FLINK-16245). +* +* This classloader delegates to the actual user classloader. Upon {@link #close()}, the delegate is nulled +* and can be garbage collected. Additional class resolution will be resolved solely through the bootstrap +* classloader and most likely result in ClassNotFound exceptions. +* +* @param the classloader type that also needs to be closeable. +*/ + private static class SafetyNetWrapperClassLoader extends CloseableClassLoader + implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(SafetyNetWrapperClassLoader.class); + + private T inner; + + SafetyNetWrapperClassLoader(T inner) { + super(null); + this.inner = inner; + } + + @Override + public void close() { + if (inner != null) { + try { + inner.close(); + } catch (IOException e) { + LOG.warn("Could not close user classloader", e); + } + } + inner = null; + } + + @Override + protected Class loadClass(String name, boolean resolve) throws ClassNotFoundException { + if (inner == null) { + return super.loadClass(name, resolve); + } + + synchronized (getClassLoadingLock(name)) { + final Class loadedClass = findLoadedClass(name); + if (loadedClass != null) { + return resolveIfNeeded(resolve, loadedClass); + } + + return resolveIfNeeded(resolve, inner.loadClass(name)); + } + } + + private Class resolveIfNeeded(final boolean resolve, final Class loadedClass) { Review comment: `loadClass(String name, boolean resolve)` is protected, `loadClass(String name)` is not resolving by default. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services