[FLINK-8499] [core] Force Kryo to be parent-first loaded.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/15cb057b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/15cb057b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/15cb057b Branch: refs/heads/release-1.4 Commit: 15cb057bffd32ba8a853b46b207a5b7ea6bba430 Parents: da8446e Author: Stephan Ewen <[email protected]> Authored: Tue Jan 23 19:58:10 2018 +0100 Committer: Stephan Ewen <[email protected]> Committed: Wed Jan 24 18:06:14 2018 +0100 ---------------------------------------------------------------------- .../apache/flink/configuration/CoreOptions.java | 2 +- .../formats/avro/AvroKryoClassloadingTest.java | 89 ++++++++++++++++++++ .../core/testutils/FilteredClassLoader.java | 60 +++++++++++++ 3 files changed, 150 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/15cb057b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java index c48e5ef..27f39a4 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java @@ -36,7 +36,7 @@ public class CoreOptions { public static final ConfigOption<String> ALWAYS_PARENT_FIRST_LOADER = ConfigOptions .key("classloader.parent-first-patterns") - .defaultValue("java.;scala.;org.apache.flink.;org.apache.hadoop.;javax.annotation.;org.slf4j;org.apache.log4j;org.apache.logging.log4j;ch.qos.logback"); + .defaultValue("java.;scala.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.slf4j;org.apache.log4j;org.apache.logging.log4j;ch.qos.logback"); // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/15cb057b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java new file mode 100644 index 0000000..6eaca15 --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoClassloadingTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.core.testutils.FilteredClassLoader; +import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils; +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; + +import com.esotericsoftware.kryo.Kryo; +import org.junit.Test; + +import java.lang.reflect.Method; +import java.net.URL; +import java.util.LinkedHashMap; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +/** + * This test makes sure that reversed classloading works for the Avro/Kryo integration when + * Kryo is in the application jar file. + * + * <p>If Kryo is not loaded consistently through the same classloader (parent-first), the following + * error happens: + * + * <pre> + * java.lang.VerifyError: Bad type on operand stack + * Exception Details: + * Location: + * org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.addAvroGenericDataArrayRegistration(Ljava/util/LinkedHashMap;)V @23: invokespecial + * Reason: + * Type 'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList' (current frame, stack[7]) is not assignable to 'com/esotericsoftware/kryo/Serializer' + * Current Frame: + * bci: @23 + * flags: { } + * locals: { 'org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils', 'java/util/LinkedHashMap' } + * stack: { 'java/util/LinkedHashMap', 'java/lang/String', uninitialized 6, uninitialized 6, 'java/lang/Class', uninitialized 12, uninitialized 12, 'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList' } + * Bytecode: + * 0x0000000: 2b12 05b6 000b bb00 0c59 1205 bb00 0d59 + * 0x0000010: bb00 0659 b700 0eb7 000f b700 10b6 0011 + * 0x0000020: 57b1 + * </pre> + */ +public class AvroKryoClassloadingTest { + + @Test + public void testKryoInChildClasspath() throws Exception { + final Class<?> avroClass = AvroKryoSerializerUtils.class; + + final URL avroLocation = avroClass.getProtectionDomain().getCodeSource().getLocation(); + final URL kryoLocation = Kryo.class.getProtectionDomain().getCodeSource().getLocation(); + + final ClassLoader parentClassLoader = new FilteredClassLoader( + avroClass.getClassLoader(), AvroKryoSerializerUtils.class.getName()); + + final ClassLoader userAppClassLoader = FlinkUserCodeClassLoaders.childFirst( + new URL[] { avroLocation, kryoLocation }, + parentClassLoader, + CoreOptions.ALWAYS_PARENT_FIRST_LOADER.defaultValue().split(";")); + + final Class<?> userLoadedAvroClass = Class.forName(avroClass.getName(), false, userAppClassLoader); + assertNotEquals(avroClass, userLoadedAvroClass); + + // call the 'addAvroGenericDataArrayRegistration(...)' method + final Method m = userLoadedAvroClass.getMethod("addAvroGenericDataArrayRegistration", LinkedHashMap.class); + + final LinkedHashMap<String, ?> map = new LinkedHashMap<>(); + m.invoke(userLoadedAvroClass.newInstance(), map); + + assertEquals(1, map.size()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/15cb057b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FilteredClassLoader.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FilteredClassLoader.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FilteredClassLoader.java new file mode 100644 index 0000000..f04393b --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FilteredClassLoader.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.testutils; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Objects; + +/** + * A ClassLoader that filters out certain classes (by name) and throws a ClassNotFoundException + * when they should be loaded. + * + * <p>This utility is useful when trying to eliminate certain classes from a class loader + * force loading them through another class loader. + */ +public class FilteredClassLoader extends ClassLoader { + + /** The set of class names for the filtered classes. */ + private final HashSet<String> filteredClassNames; + + /** + * Creates a new filtered classloader. + * + * @param delegate The class loader that is filtered by this classloader. + * @param filteredClassNames The class names to filter out. + */ + public FilteredClassLoader(ClassLoader delegate, String... filteredClassNames) { + super(Objects.requireNonNull(delegate)); + + this.filteredClassNames = new HashSet<>(Arrays.asList(filteredClassNames)); + } + + @Override + protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { + synchronized (this) { + if (filteredClassNames.contains(name)) { + throw new ClassNotFoundException(name); + } + else { + return super.loadClass(name, resolve); + } + } + } +}
