This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new d375c0a [FLINK-10655][rpc] fix RemoteRpcInvocation not overwriting ObjectInputStream's ClassNotFoundException d375c0a is described below commit d375c0a5824629c701c15ea228ac1ce22e46dc3b Author: Nico Kruber <n...@data-artisans.com> AuthorDate: Tue Oct 23 18:47:02 2018 +0200 [FLINK-10655][rpc] fix RemoteRpcInvocation not overwriting ObjectInputStream's ClassNotFoundException --- .../typeutils/base/EnumSerializerUpgradeTest.java | 37 ++------------ .../apache/flink/testutils/ClassLoaderUtils.java | 59 ++++++++++++++++++++++ .../runtime/rpc/messages/RemoteRpcInvocation.java | 42 ++++++++++++--- .../runtime/classloading/ClassLoaderTest.java | 59 +++++++++++++++++++++- 4 files changed, 155 insertions(+), 42 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java index e906f62..fb11945 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java @@ -23,21 +23,17 @@ import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerialization import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.testutils.ClassLoaderUtils; import org.apache.flink.util.TestLogger; + import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import javax.tools.JavaCompiler; -import javax.tools.ToolProvider; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileWriter; import java.io.IOException; -import java.net.URL; -import java.net.URLClassLoader; public class EnumSerializerUpgradeTest extends TestLogger { @@ -87,7 +83,7 @@ public class EnumSerializerUpgradeTest extends TestLogger { private static TypeSerializerSchemaCompatibility checkCompatibility(String enumSourceA, String enumSourceB) throws IOException, ClassNotFoundException { - ClassLoader classLoader = compileAndLoadEnum( + ClassLoader classLoader = ClassLoaderUtils.compileAndLoadJava( temporaryFolder.newFolder(), ENUM_NAME + ".java", enumSourceA); EnumSerializer enumSerializer = new EnumSerializer(classLoader.loadClass(ENUM_NAME)); @@ -103,7 +99,7 @@ public class EnumSerializerUpgradeTest extends TestLogger { snapshotBytes = outBuffer.toByteArray(); } - ClassLoader classLoader2 = compileAndLoadEnum( + ClassLoader classLoader2 = ClassLoaderUtils.compileAndLoadJava( temporaryFolder.newFolder(), ENUM_NAME + ".java", enumSourceB); TypeSerializerSnapshot restoredSnapshot; @@ -118,29 +114,4 @@ public class EnumSerializerUpgradeTest extends TestLogger { EnumSerializer enumSerializer2 = new EnumSerializer(classLoader2.loadClass(ENUM_NAME)); return restoredSnapshot.resolveSchemaCompatibility(enumSerializer2); } - - private static ClassLoader compileAndLoadEnum(File root, String filename, String source) throws IOException { - File file = writeSourceFile(root, filename, source); - - compileClass(file); - - return new URLClassLoader( - new URL[]{root.toURI().toURL()}, - Thread.currentThread().getContextClassLoader()); - } - - private static File writeSourceFile(File root, String filename, String source) throws IOException { - File file = new File(root, filename); - FileWriter fileWriter = new FileWriter(file); - - fileWriter.write(source); - fileWriter.close(); - - return file; - } - - private static int compileClass(File sourceFile) { - JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); - return compiler.run(null, null, null, "-proc:none", sourceFile.getPath()); - } } diff --git a/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java b/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java new file mode 100644 index 0000000..0688c1d --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/testutils/ClassLoaderUtils.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.testutils; + +import javax.tools.JavaCompiler; +import javax.tools.ToolProvider; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; + +/** + * Utilities to create class loaders. + */ +public class ClassLoaderUtils { + public static URLClassLoader compileAndLoadJava(File root, String filename, String source) throws + IOException { + File file = writeSourceFile(root, filename, source); + + compileClass(file); + + return new URLClassLoader( + new URL[]{root.toURI().toURL()}, + Thread.currentThread().getContextClassLoader()); + } + + private static File writeSourceFile(File root, String filename, String source) throws IOException { + File file = new File(root, filename); + FileWriter fileWriter = new FileWriter(file); + + fileWriter.write(source); + fileWriter.close(); + + return file; + } + + private static int compileClass(File sourceFile) { + JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); + return compiler.run(null, null, null, "-proc:none", sourceFile.getPath()); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java index 7b9fb88..486816d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteRpcInvocation.java @@ -203,12 +203,18 @@ public class RemoteRpcInvocation implements RpcInvocation, Serializable { try { parameterTypes[i] = (Class<?>) ois.readObject(); } catch (IOException e) { + StringBuilder incompleteMethod = getIncompleteMethodString(i, 0); throw new IOException("Could not deserialize " + i + "th parameter type of method " + - methodName + '.', e); + incompleteMethod + '.', e); } catch (ClassNotFoundException e) { - throw new ClassNotFoundException("Could not deserialize " + i + "th " + - "parameter type of method " + methodName + ". This indicates that the parameter " + - "type is not part of the system class loader.", e); + // note: wrapping this CNFE into another CNFE does not overwrite the Exception + // stored in the ObjectInputStream (see ObjectInputStream#readSerialData) + // -> add a suppressed exception that adds a more specific message + StringBuilder incompleteMethod = getIncompleteMethodString(i, 0); + e.addSuppressed(new ClassNotFoundException("Could not deserialize " + i + "th " + + "parameter type of method " + incompleteMethod + ". This indicates that the parameter " + + "type is not part of the system class loader.")); + throw e; } } @@ -221,17 +227,37 @@ public class RemoteRpcInvocation implements RpcInvocation, Serializable { try { args[i] = ois.readObject(); } catch (IOException e) { + StringBuilder incompleteMethod = getIncompleteMethodString(length, i); throw new IOException("Could not deserialize " + i + "th argument of method " + - methodName + '.', e); + incompleteMethod + '.', e); } catch (ClassNotFoundException e) { - throw new ClassNotFoundException("Could not deserialize " + i + "th " + - "argument of method " + methodName + ". This indicates that the argument " + - "type is not part of the system class loader.", e); + // note: wrapping this CNFE into another CNFE does not overwrite the Exception + // stored in the ObjectInputStream (see ObjectInputStream#readSerialData) + // -> add a suppressed exception that adds a more specific message + StringBuilder incompleteMethod = getIncompleteMethodString(length, i); + e.addSuppressed(new ClassNotFoundException("Could not deserialize " + i + "th " + + "argument of method " + incompleteMethod + ". This indicates that the argument " + + "type is not part of the system class loader.")); + throw e; } } } else { args = null; } } + + private StringBuilder getIncompleteMethodString(int lastMethodTypeIdx, int lastArgumentIdx) { + StringBuilder incompleteMethod = new StringBuilder(); + incompleteMethod.append(methodName).append('('); + for (int i = 0; i < lastMethodTypeIdx; ++i) { + incompleteMethod.append(parameterTypes[i].getCanonicalName()); + if (i < lastArgumentIdx) { + incompleteMethod.append(": ").append(args[i]); + } + incompleteMethod.append(", "); + } + incompleteMethod.append("...)"); // some parameters could not be deserialized + return incompleteMethod; + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/classloading/ClassLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/classloading/ClassLoaderTest.java index c02278c..7c664ce 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/classloading/ClassLoaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/classloading/ClassLoaderTest.java @@ -19,21 +19,78 @@ package org.apache.flink.runtime.classloading; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; +import org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation; +import org.apache.flink.testutils.ClassLoaderUtils; +import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; +import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; import java.net.URL; import java.net.URLClassLoader; +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.isA; +import static org.hamcrest.Matchers.hasItemInArray; +import static org.hamcrest.Matchers.hasProperty; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; /** - * Tests for classloading and class loder utilities. + * Tests for classloading and class loader utilities. */ public class ClassLoaderTest extends TestLogger { + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testMessageDecodingWithUnavailableClass() throws Exception { + final ClassLoader systemClassLoader = getClass().getClassLoader(); + + final String className = "UserClass"; + final URLClassLoader userClassLoader = ClassLoaderUtils.compileAndLoadJava( + temporaryFolder.newFolder(), + className + ".java", + "import java.io.Serializable;\n" + + "public class " + className + " implements Serializable {}"); + + RemoteRpcInvocation method = new RemoteRpcInvocation( + "test", + new Class<?>[] { + int.class, + Class.forName(className, false, userClassLoader)}, + new Object[] { + 1, + Class.forName(className, false, userClassLoader).newInstance()}); + + SerializedValue<RemoteRpcInvocation> serializedMethod = new SerializedValue<>(method); + + expectedException.expect(ClassNotFoundException.class); + expectedException.expect( + allOf( + isA(ClassNotFoundException.class), + hasProperty("suppressed", + hasItemInArray( + allOf( + isA(ClassNotFoundException.class), + hasProperty("message", + containsString("Could not deserialize 1th parameter type of method test(int, ...)."))))))); + + RemoteRpcInvocation deserializedMethod = serializedMethod.deserializeValue(systemClassLoader); + deserializedMethod.getMethodName(); + + userClassLoader.close(); + } + @Test public void testParentFirstClassLoading() throws Exception { final ClassLoader parentClassLoader = getClass().getClassLoader();