Repository: flink Updated Branches: refs/heads/master 641a0d436 -> b1e508645
[hotfix] [core] Improve error messages of the Java Closure Cleaner Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b1e50864 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b1e50864 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b1e50864 Branch: refs/heads/master Commit: b1e5086455257e5b7d967fd0715f5a2ab30734aa Parents: 641a0d4 Author: Stephan Ewen <se...@apache.org> Authored: Tue Apr 5 11:59:54 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Apr 5 11:59:54 2016 +0200 ---------------------------------------------------------------------- .../apache/flink/api/java/ClosureCleaner.java | 144 ++++++++++++++----- 1 file changed, 108 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b1e50864/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java index 8eaebb8..2f22a75 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java @@ -18,62 +18,110 @@ package org.apache.flink.api.java; - import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.util.InstantiationUtil; + import org.objectweb.asm.ClassReader; import org.objectweb.asm.ClassVisitor; import org.objectweb.asm.MethodVisitor; import org.objectweb.asm.Opcodes; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.lang.reflect.Field; +/** + * The closure cleaner is a utility that tries to truncate the closure (enclosing instance) + * of non-static inner classes (created for inline transformation functions). That makes non-static + * inner classes in many cases serializable, where Java's default behavior renders them non-serializable + * without good reason. + */ @Internal public class ClosureCleaner { + private static Logger LOG = LoggerFactory.getLogger(ClosureCleaner.class); - - private static ClassReader getClassReader(Class<?> cls) { - String className = cls.getName().replaceFirst("^.*\\.", "") + ".class"; - try { - return new ClassReader(cls.getResourceAsStream(className)); - } catch (IOException e) { - throw new RuntimeException("Could not create ClassReader: " + e); - } - } - + + /** + * Tries to clean the closure of the given object, if the object is a non-static inner + * class. + * + * @param func The object whose closure should be cleaned. + * @param checkSerializable Flag to indicate whether serializability should be checked after + * the closure cleaning attempt. + * + * @throws InvalidProgramException Thrown, if 'checkSerializable' is true, and the object was + * not serializable after the closure cleaning. + * + * @throws RuntimeException A RuntimeException may be thrown, if the code of the class could not + * be loaded, in order to process during teh closure cleaning. + */ public static void clean(Object func, boolean checkSerializable) { - Class<?> cls = func.getClass(); + if (func == null) { + return; + } + + final Class<?> cls = func.getClass(); // First find the field name of the "this$0" field, this can - // be "field$x" depending on the nesting + // be "this$x" depending on the nesting + boolean closureAccessed = false; + for (Field f: cls.getDeclaredFields()) { if (f.getName().startsWith("this$")) { - // found our field: - cleanThis0(func, cls, f.getName()); + // found a closure referencing field - now try to clean + closureAccessed |= cleanThis0(func, cls, f.getName()); } } - + if (checkSerializable) { - ensureSerializable(func); + try { + InstantiationUtil.serializeObject(func); + } + catch (Exception e) { + String functionType = getSuperClassOrInterfaceName(func.getClass()); + + String msg = functionType == null ? + (func + " is not serializable.") : + ("The implementation of the " + functionType + " is not serializable."); + + + if (closureAccessed) { + msg += " The implementation accesses fields of its enclosing class, which is " + + "a common reason for non-serializability. " + + "A common solution is to make the function a proper (non-inner) class, or" + + "a static inner class."; + } else { + msg += " The object probably contains or references non serializable fields."; + } + + throw new InvalidProgramException(msg, e); + } } } - private static void cleanThis0(Object func, Class<?> cls, String this0Name) { - + public static void ensureSerializable(Object obj) { + try { + InstantiationUtil.serializeObject(obj); + } catch (Exception e) { + throw new InvalidProgramException("Object " + obj + " is not serializable", e); + } + } + + private static boolean cleanThis0(Object func, Class<?> cls, String this0Name) { + This0AccessFinder this0Finder = new This0AccessFinder(this0Name); - getClassReader(cls).accept(this0Finder, 0); - - + + final boolean accessesClosure = this0Finder.isThis0Accessed(); + if (LOG.isDebugEnabled()) { - LOG.debug(this0Name + " is accessed: " + this0Finder.isThis0Accessed()); + LOG.debug(this0Name + " is accessed: " + accessesClosure); } - if (!this0Finder.isThis0Accessed()) { + if (!accessesClosure) { Field this0; try { this0 = func.getClass().getDeclaredField(this0Name); @@ -81,30 +129,54 @@ public class ClosureCleaner { // has no this$0, just return throw new RuntimeException("Could not set " + this0Name + ": " + e); } - this0.setAccessible(true); + try { + this0.setAccessible(true); this0.set(func, null); - } catch (IllegalAccessException e) { + } + catch (Exception e) { // should not happen, since we use setAccessible - throw new RuntimeException("Could not set " + this0Name + ": " + e); + throw new RuntimeException("Could not set " + this0Name + " to null. " + e.getMessage(), e); } } + + return accessesClosure; } - - - public static void ensureSerializable(Object obj) { + + private static ClassReader getClassReader(Class<?> cls) { + String className = cls.getName().replaceFirst("^.*\\.", "") + ".class"; try { - InstantiationUtil.serializeObject(obj); - } catch (Exception e) { - throw new InvalidProgramException("Object " + obj + " not serializable", e); + return new ClassReader(cls.getResourceAsStream(className)); + } catch (IOException e) { + throw new RuntimeException("Could not create ClassReader: " + e.getMessage(), e); + } + } + + + private static String getSuperClassOrInterfaceName(Class<?> cls) { + Class<?> superclass = cls.getSuperclass(); + if (superclass.getName().startsWith("org.apache.flink")) { + return superclass.getSimpleName(); + } else { + for (Class<?> inFace : cls.getInterfaces()) { + if (inFace.getName().startsWith("org.apache.flink")) { + return inFace.getSimpleName(); + } + } + return null; } } - } +/** + * This visitor walks methods and finds accesses to the field with the reference to + * the enclosing class. + */ class This0AccessFinder extends ClassVisitor { - private boolean isThis0Accessed = false; - private String this0Name; + + private final String this0Name; + private boolean isThis0Accessed; + public This0AccessFinder(String this0Name) { super(Opcodes.ASM5);