This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b3d0e51bdf9 [FLINK-32265][runtime] Use system classloader in JM for 
jobs without extra jars
b3d0e51bdf9 is described below

commit b3d0e51bdf9374848b03843115c97fa092e2e6b2
Author: Shammon FY <zjur...@gmail.com>
AuthorDate: Mon Jun 26 19:10:04 2023 +0800

    [FLINK-32265][runtime] Use system classloader in JM for jobs without extra 
jars
---
 .../librarycache/BlobLibraryCacheManager.java      | 43 +++++++++++++------
 .../jobmaster/JobManagerSharedServices.java        |  3 +-
 .../runtime/taskexecutor/TaskManagerServices.java  |  3 +-
 .../librarycache/BlobLibraryCacheManagerTest.java  | 50 ++++++++++++++++------
 .../BlobLibraryCacheRecoveryITCase.java            | 15 ++++++-
 5 files changed, 87 insertions(+), 27 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index e2723dd2e1e..9af717008b6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -37,6 +37,7 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -75,12 +76,18 @@ public class BlobLibraryCacheManager implements 
LibraryCacheManager {
 
     private final ClassLoaderFactory classLoaderFactory;
 
+    /** If true, it will use system class loader when the jars and classpaths 
of job are empty. */
+    private final boolean wrapsSystemClassLoader;
+
     // 
--------------------------------------------------------------------------------------------
 
     public BlobLibraryCacheManager(
-            PermanentBlobService blobService, ClassLoaderFactory 
classLoaderFactory) {
+            PermanentBlobService blobService,
+            ClassLoaderFactory classLoaderFactory,
+            boolean wrapsSystemClassLoader) {
         this.blobService = checkNotNull(blobService);
         this.classLoaderFactory = checkNotNull(classLoaderFactory);
+        this.wrapsSystemClassLoader = wrapsSystemClassLoader;
     }
 
     @Override
@@ -226,11 +233,17 @@ public class BlobLibraryCacheManager implements 
LibraryCacheManager {
                 verifyIsNotReleased();
 
                 if (resolvedClassLoader == null) {
+                    boolean systemClassLoader =
+                            wrapsSystemClassLoader && libraries.isEmpty() && 
classPaths.isEmpty();
                     resolvedClassLoader =
                             new ResolvedClassLoader(
-                                    createUserCodeClassLoader(jobId, 
libraries, classPaths),
+                                    systemClassLoader
+                                            ? 
ClassLoader.getSystemClassLoader()
+                                            : createUserCodeClassLoader(
+                                                    jobId, libraries, 
classPaths),
                                     libraries,
-                                    classPaths);
+                                    classPaths,
+                                    systemClassLoader);
                 } else {
                     resolvedClassLoader.verifyClassLoader(libraries, 
classPaths);
                 }
@@ -357,7 +370,7 @@ public class BlobLibraryCacheManager implements 
LibraryCacheManager {
     }
 
     private static final class ResolvedClassLoader implements 
UserCodeClassLoader {
-        private final URLClassLoader classLoader;
+        private final ClassLoader classLoader;
 
         /**
          * Set of BLOB keys used for a previous job/task registration.
@@ -375,12 +388,15 @@ public class BlobLibraryCacheManager implements 
LibraryCacheManager {
          */
         private final Set<String> classPaths;
 
+        private final boolean wrapsSystemClassLoader;
+
         private final Map<String, Runnable> releaseHooks;
 
         private ResolvedClassLoader(
-                URLClassLoader classLoader,
+                ClassLoader classLoader,
                 Collection<PermanentBlobKey> requiredLibraries,
-                Collection<URL> requiredClassPaths) {
+                Collection<URL> requiredClassPaths,
+                boolean wrapsSystemClassLoader) {
             this.classLoader = classLoader;
 
             // NOTE: do not store the class paths, i.e. URLs, into a set for 
performance reasons
@@ -391,6 +407,7 @@ public class BlobLibraryCacheManager implements 
LibraryCacheManager {
                 classPaths.add(url.toString());
             }
             this.libraries = new HashSet<>(requiredLibraries);
+            this.wrapsSystemClassLoader = wrapsSystemClassLoader;
 
             this.releaseHooks = new HashMap<>();
         }
@@ -447,12 +464,14 @@ public class BlobLibraryCacheManager implements 
LibraryCacheManager {
         private void releaseClassLoader() {
             runReleaseHooks();
 
-            try {
-                classLoader.close();
-            } catch (IOException e) {
-                LOG.warn(
-                        "Failed to release user code class loader for "
-                                + Arrays.toString(libraries.toArray()));
+            if (!wrapsSystemClassLoader) {
+                try {
+                    ((Closeable) classLoader).close();
+                } catch (IOException e) {
+                    LOG.warn(
+                            "Failed to release user code class loader for "
+                                    + Arrays.toString(libraries.toArray()));
+                }
             }
             // clear potential references to user-classes in the singleton 
cache
             TypeFactory.defaultInstance().clearCache();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
index d403893c06e..a20d06e7032 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java
@@ -162,7 +162,8 @@ public class JobManagerSharedServices {
                                         classLoaderResolveOrder),
                                 alwaysParentFirstLoaderPatterns,
                                 failOnJvmMetaspaceOomError ? fatalErrorHandler 
: null,
-                                checkClassLoaderLeak));
+                                checkClassLoaderLeak),
+                        true);
 
         final int numberCPUCores = Hardware.getNumberCPUCores();
         final int jobManagerFuturePoolSize =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index bf4e11486b5..976ab04e166 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -368,7 +368,8 @@ public class TaskManagerServices {
                                 taskManagerServicesConfiguration
                                         .getAlwaysParentFirstLoaderPatterns(),
                                 failOnJvmMetaspaceOomError ? fatalErrorHandler 
: null,
-                                checkClassLoaderLeak));
+                                checkClassLoaderLeak),
+                        false);
 
         final SlotAllocationSnapshotPersistenceService 
slotAllocationSnapshotPersistenceService;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 163a2d63b3c..dda0e2a9580 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -35,6 +35,8 @@ import org.apache.flink.util.UserCodeClassLoader;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.File;
 import java.io.IOException;
@@ -42,6 +44,7 @@ import java.net.InetSocketAddress;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -58,10 +61,18 @@ import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
 
 /** Tests for {@link BlobLibraryCacheManager}. */
+@RunWith(Parameterized.class)
 public class BlobLibraryCacheManagerTest extends TestLogger {
 
     @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
+    @Parameterized.Parameters(name = "Use system class loader: {0}")
+    public static List<Boolean> useSystemClassLoader() {
+        return Arrays.asList(true, false);
+    }
+
+    @Parameterized.Parameter public boolean wrapsSystemClassLoader;
+
     /**
      * Tests that the {@link BlobLibraryCacheManager} cleans up after the 
class loader leases for
      * different jobs are closed.
@@ -117,7 +128,7 @@ public class BlobLibraryCacheManagerTest extends TestLogger 
{
             assertEquals(1, libCache.getNumberOfManagedJobs());
             assertEquals(1, libCache.getNumberOfReferenceHolders(jobId1));
             assertEquals(0, libCache.getNumberOfReferenceHolders(jobId2));
-            assertEquals(2, checkFilesExist(jobId1, keys1, cache, true));
+            assertEquals(2, checkFilesExist(jobId1, keys1, cache, false));
             checkFileCountForJob(2, jobId1, server);
             checkFileCountForJob(2, jobId1, cache);
             assertEquals(0, checkFilesExist(jobId2, keys2, cache, false));
@@ -148,10 +159,10 @@ public class BlobLibraryCacheManagerTest extends 
TestLogger {
             assertEquals(2, libCache.getNumberOfManagedJobs());
             assertEquals(1, libCache.getNumberOfReferenceHolders(jobId1));
             assertEquals(1, libCache.getNumberOfReferenceHolders(jobId2));
-            assertEquals(2, checkFilesExist(jobId1, keys1, cache, true));
+            assertEquals(2, checkFilesExist(jobId1, keys1, cache, false));
             checkFileCountForJob(2, jobId1, server);
             checkFileCountForJob(2, jobId1, cache);
-            assertEquals(1, checkFilesExist(jobId2, keys2, cache, true));
+            assertEquals(1, checkFilesExist(jobId2, keys2, cache, false));
             checkFileCountForJob(1, jobId2, server);
             checkFileCountForJob(1, jobId2, cache);
 
@@ -160,10 +171,10 @@ public class BlobLibraryCacheManagerTest extends 
TestLogger {
             assertEquals(1, libCache.getNumberOfManagedJobs());
             assertEquals(0, libCache.getNumberOfReferenceHolders(jobId1));
             assertEquals(1, libCache.getNumberOfReferenceHolders(jobId2));
-            assertEquals(2, checkFilesExist(jobId1, keys1, cache, true));
+            assertEquals(2, checkFilesExist(jobId1, keys1, cache, false));
             checkFileCountForJob(2, jobId1, server);
             checkFileCountForJob(2, jobId1, cache);
-            assertEquals(1, checkFilesExist(jobId2, keys2, cache, true));
+            assertEquals(1, checkFilesExist(jobId2, keys2, cache, false));
             checkFileCountForJob(1, jobId2, server);
             checkFileCountForJob(1, jobId2, cache);
 
@@ -172,10 +183,10 @@ public class BlobLibraryCacheManagerTest extends 
TestLogger {
             assertEquals(0, libCache.getNumberOfManagedJobs());
             assertEquals(0, libCache.getNumberOfReferenceHolders(jobId1));
             assertEquals(0, libCache.getNumberOfReferenceHolders(jobId2));
-            assertEquals(2, checkFilesExist(jobId1, keys1, cache, true));
+            assertEquals(2, checkFilesExist(jobId1, keys1, cache, false));
             checkFileCountForJob(2, jobId1, server);
             checkFileCountForJob(2, jobId1, cache);
-            assertEquals(1, checkFilesExist(jobId2, keys2, cache, true));
+            assertEquals(1, checkFilesExist(jobId2, keys2, cache, false));
             checkFileCountForJob(1, jobId2, server);
             checkFileCountForJob(1, jobId2, cache);
 
@@ -478,7 +489,9 @@ public class BlobLibraryCacheManagerTest extends TestLogger 
{
         final LibraryCacheManager.ClassLoaderLease classLoaderLease2 =
                 libraryCacheManager.registerClassLoaderLease(jobId);
 
-        classLoaderLease1.getOrResolveClassLoader(Collections.emptyList(), 
Collections.emptyList());
+        UserCodeClassLoader userCodeClassLoader =
+                classLoaderLease1.getOrResolveClassLoader(
+                        Collections.emptyList(), Collections.emptyList());
 
         classLoaderLease1.release();
 
@@ -486,7 +499,12 @@ public class BlobLibraryCacheManagerTest extends 
TestLogger {
 
         classLoaderLease2.release();
 
-        assertTrue(classLoader.isClosed());
+        if (wrapsSystemClassLoader) {
+            assertEquals(userCodeClassLoader.asClassLoader(), 
ClassLoader.getSystemClassLoader());
+            assertFalse(classLoader.isClosed());
+        } else {
+            assertTrue(classLoader.isClosed());
+        }
     }
 
     @Test
@@ -531,11 +549,18 @@ public class BlobLibraryCacheManagerTest extends 
TestLogger {
 
         final LibraryCacheManager.ClassLoaderLease classLoaderLease =
                 libraryCacheManager.registerClassLoaderLease(new JobID());
-        classLoaderLease.getOrResolveClassLoader(Collections.emptyList(), 
Collections.emptyList());
+        UserCodeClassLoader userCodeClassLoader =
+                classLoaderLease.getOrResolveClassLoader(
+                        Collections.emptyList(), Collections.emptyList());
 
         libraryCacheManager.shutdown();
 
-        assertTrue(classLoader.isClosed());
+        if (wrapsSystemClassLoader) {
+            assertEquals(userCodeClassLoader.asClassLoader(), 
ClassLoader.getSystemClassLoader());
+            assertFalse(classLoader.isClosed());
+        } else {
+            assertTrue(classLoader.isClosed());
+        }
     }
 
     @Test
@@ -628,7 +653,8 @@ public class BlobLibraryCacheManagerTest extends TestLogger 
{
         }
 
         BlobLibraryCacheManager build() {
-            return new BlobLibraryCacheManager(permanentBlobCache, 
classLoaderFactory);
+            return new BlobLibraryCacheManager(
+                    permanentBlobCache, classLoaderFactory, 
wrapsSystemClassLoader);
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index 8aa960aa102..95eb16ab4ff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -35,6 +35,8 @@ import org.hamcrest.collection.IsEmptyCollection;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -42,6 +44,7 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -51,9 +54,17 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
 /** Integration test for {@link BlobLibraryCacheManager}. */
+@RunWith(Parameterized.class)
 public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 
     @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Parameterized.Parameters(name = "Use system class loader: {0}")
+    public static List<Boolean> useSystemClassLoader() {
+        return Arrays.asList(true, false);
+    }
+
+    @Parameterized.Parameter public boolean wrapsSystemClassLoader;
     /**
      * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs 
are recoverable from
      * any participating BlobLibraryCacheManager.
@@ -90,7 +101,9 @@ public class BlobLibraryCacheRecoveryITCase extends 
TestLogger {
                 server[i] = new BlobServer(config, 
temporaryFolder.newFolder(), blobStoreService);
                 server[i].start();
                 serverAddress[i] = new InetSocketAddress("localhost", 
server[i].getPort());
-                libServer[i] = new BlobLibraryCacheManager(server[i], 
classLoaderFactory);
+                libServer[i] =
+                        new BlobLibraryCacheManager(
+                                server[i], classLoaderFactory, 
wrapsSystemClassLoader);
             }
 
             // Random data

Reply via email to