This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new b3d0e51bdf9 [FLINK-32265][runtime] Use system classloader in JM for jobs without extra jars b3d0e51bdf9 is described below commit b3d0e51bdf9374848b03843115c97fa092e2e6b2 Author: Shammon FY <zjur...@gmail.com> AuthorDate: Mon Jun 26 19:10:04 2023 +0800 [FLINK-32265][runtime] Use system classloader in JM for jobs without extra jars --- .../librarycache/BlobLibraryCacheManager.java | 43 +++++++++++++------ .../jobmaster/JobManagerSharedServices.java | 3 +- .../runtime/taskexecutor/TaskManagerServices.java | 3 +- .../librarycache/BlobLibraryCacheManagerTest.java | 50 ++++++++++++++++------ .../BlobLibraryCacheRecoveryITCase.java | 15 ++++++- 5 files changed, 87 insertions(+), 27 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java index e2723dd2e1e..9af717008b6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java @@ -37,6 +37,7 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; +import java.io.Closeable; import java.io.IOException; import java.net.URL; import java.net.URLClassLoader; @@ -75,12 +76,18 @@ public class BlobLibraryCacheManager implements LibraryCacheManager { private final ClassLoaderFactory classLoaderFactory; + /** If true, it will use system class loader when the jars and classpaths of job are empty. */ + private final boolean wrapsSystemClassLoader; + // -------------------------------------------------------------------------------------------- public BlobLibraryCacheManager( - PermanentBlobService blobService, ClassLoaderFactory classLoaderFactory) { + PermanentBlobService blobService, + ClassLoaderFactory classLoaderFactory, + boolean wrapsSystemClassLoader) { this.blobService = checkNotNull(blobService); this.classLoaderFactory = checkNotNull(classLoaderFactory); + this.wrapsSystemClassLoader = wrapsSystemClassLoader; } @Override @@ -226,11 +233,17 @@ public class BlobLibraryCacheManager implements LibraryCacheManager { verifyIsNotReleased(); if (resolvedClassLoader == null) { + boolean systemClassLoader = + wrapsSystemClassLoader && libraries.isEmpty() && classPaths.isEmpty(); resolvedClassLoader = new ResolvedClassLoader( - createUserCodeClassLoader(jobId, libraries, classPaths), + systemClassLoader + ? ClassLoader.getSystemClassLoader() + : createUserCodeClassLoader( + jobId, libraries, classPaths), libraries, - classPaths); + classPaths, + systemClassLoader); } else { resolvedClassLoader.verifyClassLoader(libraries, classPaths); } @@ -357,7 +370,7 @@ public class BlobLibraryCacheManager implements LibraryCacheManager { } private static final class ResolvedClassLoader implements UserCodeClassLoader { - private final URLClassLoader classLoader; + private final ClassLoader classLoader; /** * Set of BLOB keys used for a previous job/task registration. @@ -375,12 +388,15 @@ public class BlobLibraryCacheManager implements LibraryCacheManager { */ private final Set<String> classPaths; + private final boolean wrapsSystemClassLoader; + private final Map<String, Runnable> releaseHooks; private ResolvedClassLoader( - URLClassLoader classLoader, + ClassLoader classLoader, Collection<PermanentBlobKey> requiredLibraries, - Collection<URL> requiredClassPaths) { + Collection<URL> requiredClassPaths, + boolean wrapsSystemClassLoader) { this.classLoader = classLoader; // NOTE: do not store the class paths, i.e. URLs, into a set for performance reasons @@ -391,6 +407,7 @@ public class BlobLibraryCacheManager implements LibraryCacheManager { classPaths.add(url.toString()); } this.libraries = new HashSet<>(requiredLibraries); + this.wrapsSystemClassLoader = wrapsSystemClassLoader; this.releaseHooks = new HashMap<>(); } @@ -447,12 +464,14 @@ public class BlobLibraryCacheManager implements LibraryCacheManager { private void releaseClassLoader() { runReleaseHooks(); - try { - classLoader.close(); - } catch (IOException e) { - LOG.warn( - "Failed to release user code class loader for " - + Arrays.toString(libraries.toArray())); + if (!wrapsSystemClassLoader) { + try { + ((Closeable) classLoader).close(); + } catch (IOException e) { + LOG.warn( + "Failed to release user code class loader for " + + Arrays.toString(libraries.toArray())); + } } // clear potential references to user-classes in the singleton cache TypeFactory.defaultInstance().clearCache(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java index d403893c06e..a20d06e7032 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java @@ -162,7 +162,8 @@ public class JobManagerSharedServices { classLoaderResolveOrder), alwaysParentFirstLoaderPatterns, failOnJvmMetaspaceOomError ? fatalErrorHandler : null, - checkClassLoaderLeak)); + checkClassLoaderLeak), + true); final int numberCPUCores = Hardware.getNumberCPUCores(); final int jobManagerFuturePoolSize = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index bf4e11486b5..976ab04e166 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -368,7 +368,8 @@ public class TaskManagerServices { taskManagerServicesConfiguration .getAlwaysParentFirstLoaderPatterns(), failOnJvmMetaspaceOomError ? fatalErrorHandler : null, - checkClassLoaderLeak)); + checkClassLoaderLeak), + false); final SlotAllocationSnapshotPersistenceService slotAllocationSnapshotPersistenceService; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java index 163a2d63b3c..dda0e2a9580 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java @@ -35,6 +35,8 @@ import org.apache.flink.util.UserCodeClassLoader; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.File; import java.io.IOException; @@ -42,6 +44,7 @@ import java.net.InetSocketAddress; import java.net.URL; import java.net.URLClassLoader; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -58,10 +61,18 @@ import static org.junit.Assert.fail; import static org.junit.Assume.assumeTrue; /** Tests for {@link BlobLibraryCacheManager}. */ +@RunWith(Parameterized.class) public class BlobLibraryCacheManagerTest extends TestLogger { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Parameterized.Parameters(name = "Use system class loader: {0}") + public static List<Boolean> useSystemClassLoader() { + return Arrays.asList(true, false); + } + + @Parameterized.Parameter public boolean wrapsSystemClassLoader; + /** * Tests that the {@link BlobLibraryCacheManager} cleans up after the class loader leases for * different jobs are closed. @@ -117,7 +128,7 @@ public class BlobLibraryCacheManagerTest extends TestLogger { assertEquals(1, libCache.getNumberOfManagedJobs()); assertEquals(1, libCache.getNumberOfReferenceHolders(jobId1)); assertEquals(0, libCache.getNumberOfReferenceHolders(jobId2)); - assertEquals(2, checkFilesExist(jobId1, keys1, cache, true)); + assertEquals(2, checkFilesExist(jobId1, keys1, cache, false)); checkFileCountForJob(2, jobId1, server); checkFileCountForJob(2, jobId1, cache); assertEquals(0, checkFilesExist(jobId2, keys2, cache, false)); @@ -148,10 +159,10 @@ public class BlobLibraryCacheManagerTest extends TestLogger { assertEquals(2, libCache.getNumberOfManagedJobs()); assertEquals(1, libCache.getNumberOfReferenceHolders(jobId1)); assertEquals(1, libCache.getNumberOfReferenceHolders(jobId2)); - assertEquals(2, checkFilesExist(jobId1, keys1, cache, true)); + assertEquals(2, checkFilesExist(jobId1, keys1, cache, false)); checkFileCountForJob(2, jobId1, server); checkFileCountForJob(2, jobId1, cache); - assertEquals(1, checkFilesExist(jobId2, keys2, cache, true)); + assertEquals(1, checkFilesExist(jobId2, keys2, cache, false)); checkFileCountForJob(1, jobId2, server); checkFileCountForJob(1, jobId2, cache); @@ -160,10 +171,10 @@ public class BlobLibraryCacheManagerTest extends TestLogger { assertEquals(1, libCache.getNumberOfManagedJobs()); assertEquals(0, libCache.getNumberOfReferenceHolders(jobId1)); assertEquals(1, libCache.getNumberOfReferenceHolders(jobId2)); - assertEquals(2, checkFilesExist(jobId1, keys1, cache, true)); + assertEquals(2, checkFilesExist(jobId1, keys1, cache, false)); checkFileCountForJob(2, jobId1, server); checkFileCountForJob(2, jobId1, cache); - assertEquals(1, checkFilesExist(jobId2, keys2, cache, true)); + assertEquals(1, checkFilesExist(jobId2, keys2, cache, false)); checkFileCountForJob(1, jobId2, server); checkFileCountForJob(1, jobId2, cache); @@ -172,10 +183,10 @@ public class BlobLibraryCacheManagerTest extends TestLogger { assertEquals(0, libCache.getNumberOfManagedJobs()); assertEquals(0, libCache.getNumberOfReferenceHolders(jobId1)); assertEquals(0, libCache.getNumberOfReferenceHolders(jobId2)); - assertEquals(2, checkFilesExist(jobId1, keys1, cache, true)); + assertEquals(2, checkFilesExist(jobId1, keys1, cache, false)); checkFileCountForJob(2, jobId1, server); checkFileCountForJob(2, jobId1, cache); - assertEquals(1, checkFilesExist(jobId2, keys2, cache, true)); + assertEquals(1, checkFilesExist(jobId2, keys2, cache, false)); checkFileCountForJob(1, jobId2, server); checkFileCountForJob(1, jobId2, cache); @@ -478,7 +489,9 @@ public class BlobLibraryCacheManagerTest extends TestLogger { final LibraryCacheManager.ClassLoaderLease classLoaderLease2 = libraryCacheManager.registerClassLoaderLease(jobId); - classLoaderLease1.getOrResolveClassLoader(Collections.emptyList(), Collections.emptyList()); + UserCodeClassLoader userCodeClassLoader = + classLoaderLease1.getOrResolveClassLoader( + Collections.emptyList(), Collections.emptyList()); classLoaderLease1.release(); @@ -486,7 +499,12 @@ public class BlobLibraryCacheManagerTest extends TestLogger { classLoaderLease2.release(); - assertTrue(classLoader.isClosed()); + if (wrapsSystemClassLoader) { + assertEquals(userCodeClassLoader.asClassLoader(), ClassLoader.getSystemClassLoader()); + assertFalse(classLoader.isClosed()); + } else { + assertTrue(classLoader.isClosed()); + } } @Test @@ -531,11 +549,18 @@ public class BlobLibraryCacheManagerTest extends TestLogger { final LibraryCacheManager.ClassLoaderLease classLoaderLease = libraryCacheManager.registerClassLoaderLease(new JobID()); - classLoaderLease.getOrResolveClassLoader(Collections.emptyList(), Collections.emptyList()); + UserCodeClassLoader userCodeClassLoader = + classLoaderLease.getOrResolveClassLoader( + Collections.emptyList(), Collections.emptyList()); libraryCacheManager.shutdown(); - assertTrue(classLoader.isClosed()); + if (wrapsSystemClassLoader) { + assertEquals(userCodeClassLoader.asClassLoader(), ClassLoader.getSystemClassLoader()); + assertFalse(classLoader.isClosed()); + } else { + assertTrue(classLoader.isClosed()); + } } @Test @@ -628,7 +653,8 @@ public class BlobLibraryCacheManagerTest extends TestLogger { } BlobLibraryCacheManager build() { - return new BlobLibraryCacheManager(permanentBlobCache, classLoaderFactory); + return new BlobLibraryCacheManager( + permanentBlobCache, classLoaderFactory, wrapsSystemClassLoader); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java index 8aa960aa102..95eb16ab4ff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java @@ -35,6 +35,8 @@ import org.hamcrest.collection.IsEmptyCollection; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.File; import java.io.FileInputStream; @@ -42,6 +44,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -51,9 +54,17 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; /** Integration test for {@link BlobLibraryCacheManager}. */ +@RunWith(Parameterized.class) public class BlobLibraryCacheRecoveryITCase extends TestLogger { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Parameterized.Parameters(name = "Use system class loader: {0}") + public static List<Boolean> useSystemClassLoader() { + return Arrays.asList(true, false); + } + + @Parameterized.Parameter public boolean wrapsSystemClassLoader; /** * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs are recoverable from * any participating BlobLibraryCacheManager. @@ -90,7 +101,9 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger { server[i] = new BlobServer(config, temporaryFolder.newFolder(), blobStoreService); server[i].start(); serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort()); - libServer[i] = new BlobLibraryCacheManager(server[i], classLoaderFactory); + libServer[i] = + new BlobLibraryCacheManager( + server[i], classLoaderFactory, wrapsSystemClassLoader); } // Random data