Repository: flink
Updated Branches:
  refs/heads/master 641a0d436 -> b1e508645


[hotfix] [core] Improve error messages of the Java Closure Cleaner


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b1e50864
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b1e50864
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b1e50864

Branch: refs/heads/master
Commit: b1e5086455257e5b7d967fd0715f5a2ab30734aa
Parents: 641a0d4
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Apr 5 11:59:54 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Apr 5 11:59:54 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/api/java/ClosureCleaner.java   | 144 ++++++++++++++-----
 1 file changed, 108 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b1e50864/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
index 8eaebb8..2f22a75 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
@@ -18,62 +18,110 @@
 
 package org.apache.flink.api.java;
 
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.util.InstantiationUtil;
+
 import org.objectweb.asm.ClassReader;
 import org.objectweb.asm.ClassVisitor;
 import org.objectweb.asm.MethodVisitor;
 import org.objectweb.asm.Opcodes;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
 
+/**
+ * The closure cleaner is a utility that tries to truncate the closure 
(enclosing instance)
+ * of non-static inner classes (created for inline transformation functions). 
That makes non-static
+ * inner classes in many cases serializable, where Java's default behavior 
renders them non-serializable
+ * without good reason.
+ */
 @Internal
 public class ClosureCleaner {
+       
        private static Logger LOG = 
LoggerFactory.getLogger(ClosureCleaner.class);
-
-       private static ClassReader getClassReader(Class<?> cls) {
-               String className = cls.getName().replaceFirst("^.*\\.", "") + 
".class";
-               try {
-                       return new 
ClassReader(cls.getResourceAsStream(className));
-               } catch (IOException e) {
-                       throw new RuntimeException("Could not create 
ClassReader: " + e);
-               }
-       }
-
+       
+       /**
+        * Tries to clean the closure of the given object, if the object is a 
non-static inner
+        * class.
+        * 
+        * @param func The object whose closure should be cleaned.
+        * @param checkSerializable Flag to indicate whether serializability 
should be checked after
+        *                          the closure cleaning attempt.
+        * 
+        * @throws InvalidProgramException Thrown, if 'checkSerializable' is 
true, and the object was
+        *                                 not serializable after the closure 
cleaning.
+        * 
+        * @throws RuntimeException A RuntimeException may be thrown, if the 
code of the class could not
+        *                          be loaded, in order to process during teh 
closure cleaning.
+        */
        public static void clean(Object func, boolean checkSerializable) {
-               Class<?> cls = func.getClass();
+               if (func == null) {
+                       return;
+               }
+               
+               final Class<?> cls = func.getClass();
 
                // First find the field name of the "this$0" field, this can
-               // be "field$x" depending on the nesting
+               // be "this$x" depending on the nesting
+               boolean closureAccessed = false;
+               
                for (Field f: cls.getDeclaredFields()) {
                        if (f.getName().startsWith("this$")) {
-                               // found our field:
-                               cleanThis0(func, cls, f.getName());
+                               // found a closure referencing field - now try 
to clean
+                               closureAccessed |= cleanThis0(func, cls, 
f.getName());
                        }
                }
-
+               
                if (checkSerializable) {
-                       ensureSerializable(func);
+                       try {
+                               InstantiationUtil.serializeObject(func);
+                       }
+                       catch (Exception e) {
+                               String functionType = 
getSuperClassOrInterfaceName(func.getClass());
+                               
+                               String msg = functionType == null ?
+                                               (func + " is not 
serializable.") :
+                                               ("The implementation of the " + 
functionType + " is not serializable.");
+                               
+                               
+                               if (closureAccessed) {
+                                       msg += " The implementation accesses 
fields of its enclosing class, which is " +
+                                                       "a common reason for 
non-serializability. " +
+                                                       "A common solution is 
to make the function a proper (non-inner) class, or" +
+                                                       "a static inner class.";
+                               } else {
+                                       msg += " The object probably contains 
or references non serializable fields.";
+                               }
+                               
+                               throw new InvalidProgramException(msg, e);
+                       }
                }
        }
 
-       private static void cleanThis0(Object func, Class<?> cls, String 
this0Name) {
-
+       public static void ensureSerializable(Object obj) {
+               try {
+                       InstantiationUtil.serializeObject(obj);
+               } catch (Exception e) {
+                       throw new InvalidProgramException("Object " + obj + " 
is not serializable", e);
+               }
+       }
+       
+       private static boolean cleanThis0(Object func, Class<?> cls, String 
this0Name) {
+               
                This0AccessFinder this0Finder = new 
This0AccessFinder(this0Name);
-
                getClassReader(cls).accept(this0Finder, 0);
-
-
+               
+               final boolean accessesClosure = this0Finder.isThis0Accessed();
+                               
                if (LOG.isDebugEnabled()) {
-                       LOG.debug(this0Name + " is accessed: " + 
this0Finder.isThis0Accessed());
+                       LOG.debug(this0Name + " is accessed: " + 
accessesClosure);
                }
 
-               if (!this0Finder.isThis0Accessed()) {
+               if (!accessesClosure) {
                        Field this0;
                        try {
                                this0 = 
func.getClass().getDeclaredField(this0Name);
@@ -81,30 +129,54 @@ public class ClosureCleaner {
                                // has no this$0, just return
                                throw new RuntimeException("Could not set " + 
this0Name + ": " + e);
                        }
-                       this0.setAccessible(true);
+                       
                        try {
+                               this0.setAccessible(true);
                                this0.set(func, null);
-                       } catch (IllegalAccessException e) {
+                       }
+                       catch (Exception e) {
                                // should not happen, since we use setAccessible
-                               throw new RuntimeException("Could not set " + 
this0Name + ": " + e);
+                               throw new RuntimeException("Could not set " + 
this0Name + " to null. " + e.getMessage(), e);
                        }
                }
+               
+               return accessesClosure;
        }
-
-
-       public static void ensureSerializable(Object obj) {
+       
+       private static ClassReader getClassReader(Class<?> cls) {
+               String className = cls.getName().replaceFirst("^.*\\.", "") + 
".class";
                try {
-                       InstantiationUtil.serializeObject(obj);
-               } catch (Exception e) {
-                       throw new InvalidProgramException("Object " + obj + " 
not serializable", e);
+                       return new 
ClassReader(cls.getResourceAsStream(className));
+               } catch (IOException e) {
+                       throw new RuntimeException("Could not create 
ClassReader: " + e.getMessage(), e);
+               }
+       }
+       
+       
+       private static String getSuperClassOrInterfaceName(Class<?> cls) {
+               Class<?> superclass = cls.getSuperclass();
+               if (superclass.getName().startsWith("org.apache.flink")) {
+                       return superclass.getSimpleName();
+               } else {
+                       for (Class<?> inFace : cls.getInterfaces()) {
+                               if 
(inFace.getName().startsWith("org.apache.flink")) {
+                                       return inFace.getSimpleName();
+                               }
+                       }
+                       return null;
                }
        }
-
 }
 
+/**
+ * This visitor walks methods and finds accesses to the field with the 
reference to
+ * the enclosing class.
+ */
 class This0AccessFinder extends ClassVisitor {
-       private boolean isThis0Accessed = false;
-       private String this0Name;
+
+       private final String this0Name;
+       private boolean isThis0Accessed;
+       
 
        public This0AccessFinder(String this0Name) {
                super(Opcodes.ASM5);

Reply via email to