This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 137b65c5f23e5ba546ce52bd15fa7aa528ab95de
Author: slinkydeveloper <francescogu...@gmail.com>
AuthorDate: Wed Jan 5 16:30:11 2022 +0100

    [FLINK-25487][core][table-planner-loader] Improve verbosity of classloading 
errors
    
    This closes #18283.
---
 .../core/classloading/ComponentClassLoader.java    | 56 +++++++++++++++-------
 .../core/classloading/SubmoduleClassLoader.java    |  4 +-
 .../org/apache/flink/core/plugin/PluginLoader.java |  8 +++-
 .../classloading/ComponentClassLoaderTest.java     | 45 +++++++++++++----
 .../flink/table/planner/loader/PlannerModule.java  | 50 ++++++++++++-------
 5 files changed, 118 insertions(+), 45 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/classloading/ComponentClassLoader.java
 
b/flink-core/src/main/java/org/apache/flink/core/classloading/ComponentClassLoader.java
index 52d886b..76e6b32 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/classloading/ComponentClassLoader.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/classloading/ComponentClassLoader.java
@@ -28,6 +28,8 @@ import java.net.URLClassLoader;
 import java.util.Arrays;
 import java.util.Enumeration;
 import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
 
 /**
  * A {@link URLClassLoader} that restricts which classes can be loaded to 
those contained within the
@@ -62,17 +64,22 @@ public class ComponentClassLoader extends URLClassLoader {
     private final String[] ownerFirstResourcePrefixes;
     private final String[] componentFirstResourcePrefixes;
 
+    private final Map<String, String> knownPackagePrefixesModuleAssociation;
+
     public ComponentClassLoader(
             URL[] classpath,
             ClassLoader ownerClassLoader,
             String[] ownerFirstPackages,
-            String[] componentFirstPackages) {
+            String[] componentFirstPackages,
+            Map<String, String> knownPackagePrefixesModuleAssociation) {
         super(classpath, PLATFORM_OR_BOOTSTRAP_LOADER);
         this.ownerClassLoader = ownerClassLoader;
 
         this.ownerFirstPackages = ownerFirstPackages;
         this.componentFirstPackages = componentFirstPackages;
 
+        this.knownPackagePrefixesModuleAssociation = 
knownPackagePrefixesModuleAssociation;
+
         ownerFirstResourcePrefixes = 
convertPackagePrefixesToPathPrefixes(ownerFirstPackages);
         componentFirstResourcePrefixes =
                 convertPackagePrefixesToPathPrefixes(componentFirstPackages);
@@ -86,22 +93,39 @@ public class ComponentClassLoader extends URLClassLoader {
     protected Class<?> loadClass(final String name, final boolean resolve)
             throws ClassNotFoundException {
         synchronized (getClassLoadingLock(name)) {
-            final Class<?> loadedClass = findLoadedClass(name);
-            if (loadedClass != null) {
-                return resolveIfNeeded(resolve, loadedClass);
-            }
-
-            if (isComponentFirstClass(name)) {
-                return loadClassFromComponentFirst(name, resolve);
+            try {
+                final Class<?> loadedClass = findLoadedClass(name);
+                if (loadedClass != null) {
+                    return resolveIfNeeded(resolve, loadedClass);
+                }
+
+                if (isComponentFirstClass(name)) {
+                    return loadClassFromComponentFirst(name, resolve);
+                }
+                if (isOwnerFirstClass(name)) {
+                    return loadClassFromOwnerFirst(name, resolve);
+                }
+
+                // making this behavior configurable 
(component-only/component-first/owner-first)
+                // would allow this class to subsume the 
FlinkUserCodeClassLoader (with an added
+                // exception handler)
+                return loadClassFromComponentOnly(name, resolve);
+            } catch (ClassNotFoundException e) {
+                // If we know the package of this class
+                Optional<String> foundAssociatedModule =
+                        
knownPackagePrefixesModuleAssociation.entrySet().stream()
+                                .filter(entry -> 
name.startsWith(entry.getKey()))
+                                .map(Map.Entry::getValue)
+                                .findFirst();
+                if (foundAssociatedModule.isPresent()) {
+                    throw new ClassNotFoundException(
+                            String.format(
+                                    "Class '%s' not found. Perhaps you forgot 
to add the module '%s' to the classpath?",
+                                    name, foundAssociatedModule.get()),
+                            e);
+                }
+                throw e;
             }
-            if (isOwnerFirstClass(name)) {
-                return loadClassFromOwnerFirst(name, resolve);
-            }
-
-            // making this behavior configurable 
(component-only/component-first/owner-first) would
-            // allow this class to subsume the FlinkUserCodeClassLoader (with 
an added exception
-            // handler)
-            return loadClassFromComponentOnly(name, resolve);
         }
     }
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/classloading/SubmoduleClassLoader.java
 
b/flink-core/src/main/java/org/apache/flink/core/classloading/SubmoduleClassLoader.java
index 87c489a..61aa139 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/classloading/SubmoduleClassLoader.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/classloading/SubmoduleClassLoader.java
@@ -20,6 +20,7 @@ package org.apache.flink.core.classloading;
 import org.apache.flink.configuration.CoreOptions;
 
 import java.net.URL;
+import java.util.Collections;
 
 /**
  * Loads all classes from the submodule jar, except for explicitly 
white-listed packages.
@@ -39,6 +40,7 @@ public class SubmoduleClassLoader extends 
ComponentClassLoader {
                 classpath,
                 parentClassLoader,
                 CoreOptions.PARENT_FIRST_LOGGING_PATTERNS,
-                new String[] {"org.apache.flink"});
+                new String[] {"org.apache.flink"},
+                Collections.emptyMap());
     }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java 
b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java
index a81877d..3847b29 100644
--- a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java
@@ -31,6 +31,7 @@ import javax.annotation.concurrent.ThreadSafe;
 import java.io.IOException;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.ServiceLoader;
 
@@ -151,7 +152,12 @@ public class PluginLoader implements AutoCloseable {
                 URL[] pluginResourceURLs,
                 ClassLoader flinkClassLoader,
                 String[] allowedFlinkPackages) {
-            super(pluginResourceURLs, flinkClassLoader, allowedFlinkPackages, 
new String[0]);
+            super(
+                    pluginResourceURLs,
+                    flinkClassLoader,
+                    allowedFlinkPackages,
+                    new String[0],
+                    Collections.emptyMap());
         }
     }
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/classloading/ComponentClassLoaderTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/classloading/ComponentClassLoaderTest.java
index 1d795a1..8d8ee4e 100644
--- 
a/flink-core/src/test/java/org/apache/flink/core/classloading/ComponentClassLoaderTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/core/classloading/ComponentClassLoaderTest.java
@@ -67,7 +67,8 @@ public class ComponentClassLoaderTest extends TestLogger {
                 new TestUrlClassLoader(NON_EXISTENT_CLASS_NAME, 
CLASS_RETURNED_BY_OWNER);
 
         final ComponentClassLoader componentClassLoader =
-                new ComponentClassLoader(new URL[0], owner, new String[0], new 
String[0]);
+                new ComponentClassLoader(
+                        new URL[0], owner, new String[0], new String[0], 
Collections.emptyMap());
 
         componentClassLoader.loadClass(NON_EXISTENT_CLASS_NAME);
     }
@@ -79,7 +80,11 @@ public class ComponentClassLoaderTest extends TestLogger {
 
         final ComponentClassLoader componentClassLoader =
                 new ComponentClassLoader(
-                        new URL[0], owner, new String[] 
{CLASS_TO_LOAD.getName()}, new String[0]);
+                        new URL[0],
+                        owner,
+                        new String[] {CLASS_TO_LOAD.getName()},
+                        new String[0],
+                        Collections.emptyMap());
 
         final Class<?> loadedClass = 
componentClassLoader.loadClass(CLASS_TO_LOAD.getName());
         assertThat(loadedClass, sameInstance(CLASS_RETURNED_BY_OWNER));
@@ -91,7 +96,11 @@ public class ComponentClassLoaderTest extends TestLogger {
 
         final ComponentClassLoader componentClassLoader =
                 new ComponentClassLoader(
-                        new URL[0], owner, new String[] 
{CLASS_TO_LOAD.getName()}, new String[0]);
+                        new URL[0],
+                        owner,
+                        new String[] {CLASS_TO_LOAD.getName()},
+                        new String[0],
+                        Collections.emptyMap());
 
         final Class<?> loadedClass = 
componentClassLoader.loadClass(CLASS_TO_LOAD.getName());
         assertThat(loadedClass, sameInstance(CLASS_TO_LOAD));
@@ -104,7 +113,11 @@ public class ComponentClassLoaderTest extends TestLogger {
 
         final ComponentClassLoader componentClassLoader =
                 new ComponentClassLoader(
-                        new URL[0], owner, new String[0], new String[] 
{CLASS_TO_LOAD.getName()});
+                        new URL[0],
+                        owner,
+                        new String[0],
+                        new String[] {CLASS_TO_LOAD.getName()},
+                        Collections.emptyMap());
 
         final Class<?> loadedClass = 
componentClassLoader.loadClass(CLASS_TO_LOAD.getName());
         assertThat(loadedClass, sameInstance(CLASS_TO_LOAD));
@@ -117,7 +130,11 @@ public class ComponentClassLoaderTest extends TestLogger {
 
         final ComponentClassLoader componentClassLoader =
                 new ComponentClassLoader(
-                        new URL[0], owner, new String[0], new String[] 
{NON_EXISTENT_CLASS_NAME});
+                        new URL[0],
+                        owner,
+                        new String[0],
+                        new String[] {NON_EXISTENT_CLASS_NAME},
+                        Collections.emptyMap());
 
         final Class<?> loadedClass = 
componentClassLoader.loadClass(NON_EXISTENT_CLASS_NAME);
         assertThat(loadedClass, sameInstance(CLASS_RETURNED_BY_OWNER));
@@ -132,7 +149,8 @@ public class ComponentClassLoaderTest extends TestLogger {
         TestUrlClassLoader owner = new TestUrlClassLoader();
 
         final ComponentClassLoader componentClassLoader =
-                new ComponentClassLoader(new URL[0], owner, new String[0], new 
String[0]);
+                new ComponentClassLoader(
+                        new URL[0], owner, new String[0], new String[0], 
Collections.emptyMap());
 
         
assertThat(componentClassLoader.getResource(NON_EXISTENT_RESOURCE_NAME), 
nullValue());
         assertThat(
@@ -147,7 +165,11 @@ public class ComponentClassLoaderTest extends TestLogger {
 
         final ComponentClassLoader componentClassLoader =
                 new ComponentClassLoader(
-                        new URL[] {}, owner, new String[] {resourceToLoad}, 
new String[0]);
+                        new URL[] {},
+                        owner,
+                        new String[] {resourceToLoad},
+                        new String[0],
+                        Collections.emptyMap());
 
         final URL loadedResource = 
componentClassLoader.getResource(resourceToLoad);
         assertThat(loadedResource, sameInstance(RESOURCE_RETURNED_BY_OWNER));
@@ -162,7 +184,8 @@ public class ComponentClassLoaderTest extends TestLogger {
                         new URL[] {TMP.getRoot().toURI().toURL()},
                         owner,
                         new String[] {resourceToLoad},
-                        new String[0]);
+                        new String[0],
+                        Collections.emptyMap());
 
         final URL loadedResource = 
componentClassLoader.getResource(resourceToLoad);
         assertThat(loadedResource.toString(), containsString(resourceToLoad));
@@ -178,7 +201,8 @@ public class ComponentClassLoaderTest extends TestLogger {
                         new URL[] {TMP.getRoot().toURI().toURL()},
                         owner,
                         new String[0],
-                        new String[] {resourceToLoad});
+                        new String[] {resourceToLoad},
+                        Collections.emptyMap());
 
         final URL loadedResource = 
componentClassLoader.getResource(resourceToLoad);
         assertThat(loadedResource.toString(), containsString(resourceToLoad));
@@ -194,7 +218,8 @@ public class ComponentClassLoaderTest extends TestLogger {
                         new URL[0],
                         owner,
                         new String[0],
-                        new String[] {NON_EXISTENT_RESOURCE_NAME});
+                        new String[] {NON_EXISTENT_RESOURCE_NAME},
+                        Collections.emptyMap());
 
         final URL loadedResource = 
componentClassLoader.getResource(NON_EXISTENT_RESOURCE_NAME);
         assertThat(loadedResource, sameInstance(RESOURCE_RETURNED_BY_OWNER));
diff --git 
a/flink-table/flink-table-planner-loader/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
 
b/flink-table/flink-table-planner-loader/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
index 2630fc9..7d72fdc 100644
--- 
a/flink-table/flink-table-planner-loader/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
+++ 
b/flink-table/flink-table-planner-loader/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
@@ -37,6 +37,8 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Stream;
 
@@ -55,9 +57,36 @@ class PlannerModule {
      */
     static final String FLINK_TABLE_PLANNER_FAT_JAR = 
"flink-table-planner.jar";
 
-    static final String HINT_USAGE =
+    private static final String HINT_USAGE =
             "mvn clean package -pl 
flink-table/flink-table-planner,flink-table/flink-table-planner-loader 
-DskipTests";
 
+    private static final String[] OWNER_CLASSPATH =
+            Stream.concat(
+                            
Arrays.stream(CoreOptions.PARENT_FIRST_LOGGING_PATTERNS),
+                            Stream.of(
+                                    // These packages are shipped either by
+                                    // flink-table-runtime or flink-dist itself
+                                    "org.codehaus.janino",
+                                    "org.codehaus.commons",
+                                    "org.apache.commons.lang3",
+                                    // Used by org.reflections
+                                    "javassist"))
+                    .toArray(String[]::new);
+
+    private static final String[] COMPONENT_CLASSPATH = new String[] 
{"org.apache.flink"};
+
+    private static final Map<String, String> KNOWN_MODULE_ASSOCIATIONS = new 
HashMap<>();
+
+    static {
+        KNOWN_MODULE_ASSOCIATIONS.put("org.apache.flink.table.runtime", 
"flink-table-runtime");
+        KNOWN_MODULE_ASSOCIATIONS.put("org.apache.flink.formats.raw", 
"flink-table-runtime");
+
+        KNOWN_MODULE_ASSOCIATIONS.put("org.codehaus.janino", 
"flink-table-runtime");
+        KNOWN_MODULE_ASSOCIATIONS.put("org.codehaus.commons", 
"flink-table-runtime");
+        KNOWN_MODULE_ASSOCIATIONS.put(
+                "org.apache.flink.table.shaded.com.jayway", 
"flink-table-runtime");
+    }
+
     private final ClassLoader submoduleClassLoader;
 
     private PlannerModule() {
@@ -85,26 +114,13 @@ class PlannerModule {
 
             IOUtils.copyBytes(resourceStream, Files.newOutputStream(tempFile));
 
-            String[] ownerClassPath =
-                    Stream.concat(
-                                    
Arrays.stream(CoreOptions.PARENT_FIRST_LOGGING_PATTERNS),
-                                    Stream.of(
-                                            // These packages are shipped 
either by
-                                            // flink-table-runtime or 
flink-dist itself
-                                            "org.codehaus.janino",
-                                            "org.codehaus.commons",
-                                            "org.apache.commons.lang3",
-                                            // Used by org.reflections
-                                            "javassist"))
-                            .toArray(String[]::new);
-            String[] componentClassPath = new String[] {"org.apache.flink"};
-
             this.submoduleClassLoader =
                     new ComponentClassLoader(
                             new URL[] {tempFile.toUri().toURL()},
                             flinkClassLoader,
-                            ownerClassPath,
-                            componentClassPath);
+                            OWNER_CLASSPATH,
+                            COMPONENT_CLASSPATH,
+                            KNOWN_MODULE_ASSOCIATIONS);
         } catch (IOException e) {
             throw new TableException(
                     "Could not initialize the table planner components 
loader.", e);

Reply via email to