azagrebin commented on a change in pull request #13355:
URL: https://github.com/apache/flink/pull/13355#discussion_r485755685



##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
##########
@@ -123,6 +123,17 @@
         */
        ClassLoader getUserCodeClassLoader();
 
+       /**
+        * Registers a release hook for the user code class loader which is 
executed just before the
+        * user code class loader is being released.
+        * Registration only happens if no hook has been registered under this 
name already.

Review comment:
       ```suggestion
         * Registers a custom hook for the user code class loader release.
         *
         * <p>The release hook is executed just before the user code class 
loader is being released.
         * Registration only happens if no hook has been registered under this 
name already.
   ```
   Maybe not the final version. I think we usually keep the first 
title-sentence short. Also for other such doc comments

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
##########
@@ -123,6 +123,17 @@
         */
        ClassLoader getUserCodeClassLoader();
 
+       /**
+        * Registers a release hook for the user code class loader which is 
executed just before the
+        * user code class loader is being released.
+        * Registration only happens if no hook has been registered under this 
name already.
+        *
+        * @param releaseHookName name of the release hook.
+        * @param releaseHook releaseHook which is executed just before the 
user code class loader is being released

Review comment:
       ```suggestion
         * @param releaseHook release hook which is executed just before the 
user code class loader is being released
   ```

##########
File path: flink-connectors/flink-connector-cassandra/pom.xml
##########
@@ -187,6 +187,13 @@ under the License.
                        <scope>test</scope>
                        <type>test-jar</type>
                </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>

Review comment:
       why do we need all these `flink-core/test-jar` changes? are they related 
to the PR?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
##########
@@ -63,8 +64,8 @@
 
        
        public void setup(TaskConfig config, String taskName, Collector<OT> 
outputCollector,
-                       AbstractInvokable parent, ClassLoader 
userCodeClassLoader, ExecutionConfig executionConfig,
-                       Map<String, Accumulator<?,?>> accumulatorMap)
+                                         AbstractInvokable parent, 
UserCodeClassLoader userCodeClassLoader, ExecutionConfig executionConfig,

Review comment:
       Why do we need to pass here `UserCodeClassLoader`?
   I do not miss something, we only use it `asClassLoader` afterwards.
   Then same for `BatchTask#initOutputs`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
##########
@@ -504,6 +506,41 @@ public void closingLibraryCacheManager_closesClassLoader() 
throws IOException {
                assertTrue(classLoader.isClosed());
        }
 
+       @Test
+       public void releaseUserCodeClassLoader_willRunReleaseHooks() throws 
Exception {
+               final BlobLibraryCacheManager libraryCacheManager = new 
TestingBlobLibraryCacheManagerBuilder().build();
+
+               final LibraryCacheManager.ClassLoaderLease classLoaderLease = 
libraryCacheManager.registerClassLoaderLease(new JobID());
+               final UserCodeClassLoader userCodeClassLoader = 
classLoaderLease.getOrResolveClassLoader(Collections.emptyList(), 
Collections.emptyList());
+
+               final OneShotLatch releaseHookLatch = new OneShotLatch();
+               userCodeClassLoader.registerReleaseHookIfAbsent("test", 
releaseHookLatch::trigger);
+
+               // this should trigger the release of the class loader
+               classLoaderLease.release();
+
+               releaseHookLatch.await();
+       }
+
+       @Test
+       public void releaseUserCodeClassLoader_willRegisterOnce() throws 
Exception {

Review comment:
       ```suggestion
        public void releaseUserCodeClassLoader_willRegisterOnce() throws 
IOException, InterruptedException {
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
##########
@@ -63,8 +64,8 @@
 
        
        public void setup(TaskConfig config, String taskName, Collector<OT> 
outputCollector,
-                       AbstractInvokable parent, ClassLoader 
userCodeClassLoader, ExecutionConfig executionConfig,
-                       Map<String, Accumulator<?,?>> accumulatorMap)
+                                         AbstractInvokable parent, 
UserCodeClassLoader userCodeClassLoader, ExecutionConfig executionConfig,

Review comment:
       the formatting got off

##########
File path: 
flink-connectors/flink-connector-kinesis/src/test/resources/log4j2-test.properties
##########
@@ -0,0 +1,29 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF

Review comment:
       is this new logging file related to the PR?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
##########
@@ -504,6 +506,41 @@ public void closingLibraryCacheManager_closesClassLoader() 
throws IOException {
                assertTrue(classLoader.isClosed());
        }
 
+       @Test
+       public void releaseUserCodeClassLoader_willRunReleaseHooks() throws 
Exception {

Review comment:
       ```suggestion
        public void releaseUserCodeClassLoader_willRunReleaseHooks() throws 
IOException, InterruptedException {
   ```
   maybe nit but it will safe a bit from IDE warnings which might be useful in 
other cases

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
##########
@@ -95,4 +96,5 @@
                 */
                void release();
        }
+

Review comment:
       unintentional new line?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -157,7 +158,7 @@
 
        private final FatalErrorHandler fatalErrorHandler;
 
-       private final ClassLoader userCodeLoader;
+       private final UserCodeClassLoader userCodeLoader;

Review comment:
       How is the `UserCodeClassLoader` going to be used in `JobMaster`?
   atm I see only `asClassLoader` usages.
   is there a further plan how to expose the `UserCodeClassLoader` in JM user 
hooks?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
##########
@@ -413,6 +425,20 @@ private void 
verifyClassLoader(Collection<PermanentBlobKey> requiredLibraries, C
                 * and the cached libraries are deleted immediately.
                 */
                private void releaseClassLoader() {
+                       Set<Map.Entry<String, Runnable>> hooks = 
releaseHooks.entrySet();
+                       if (!hooks.isEmpty()) {
+                               LOG.debug("Running {} class loader shutdown 
hook(s): {}.", hooks.size(), releaseHooks.keySet());
+                               for (Map.Entry<String, Runnable> hookEntry : 
hooks) {
+                                       try {
+                                               hookEntry.getValue().run();

Review comment:
       maybe it is better to log each hook run?
   I would also consider moving the loop into `runHooks` method or so




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to