[GitHub] [spark] maropu commented on a change in pull request #28463: [SPARK-31399][CORE] Support indylambda Scala closure in ClosureCleaner

2020-05-08 Thread GitBox


maropu commented on a change in pull request #28463:
URL: https://github.com/apache/spark/pull/28463#discussion_r422454936



##
File path: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
##
@@ -414,6 +434,296 @@ private[spark] object ClosureCleaner extends Logging {
   }
 }
 
+private[spark] object IndylambdaScalaClosures extends Logging {
+  // internal name of java.lang.invoke.LambdaMetafactory
+  val LambdaMetafactoryClassName = "java/lang/invoke/LambdaMetafactory"
+  // the method that Scala indylambda use for bootstrap method
+  val LambdaMetafactoryMethodName = "altMetafactory"
+  val LambdaMetafactoryMethodDesc = "(Ljava/lang/invoke/MethodHandles$Lookup;" 
+
+"Ljava/lang/String;Ljava/lang/invoke/MethodType;[Ljava/lang/Object;)" +
+"Ljava/lang/invoke/CallSite;"
+
+  /**
+   * Check if the given reference is a indylambda style Scala closure.
+   * If so (e.g. for Scala 2.12+ closures), return a non-empty serialization 
proxy
+   * (SerializedLambda) of the closure;
+   * otherwise (e.g. for Scala 2.11 closures) return None.
+   *
+   * @param maybeClosure the closure to check.
+   */
+  def getSerializationProxy(maybeClosure: AnyRef): Option[SerializedLambda] = {
+def isClosureCandidate(cls: Class[_]): Boolean = {
+  // TODO: maybe lift this restriction to support other functional 
interfaces in the future
+  val implementedInterfaces = ClassUtils.getAllInterfaces(cls).asScala
+  implementedInterfaces.exists(_.getName.startsWith("scala.Function"))
+}
+
+maybeClosure.getClass match {
+  // shortcut the fast check:
+  // 1. indylambda closure classes are generated by Java's 
LambdaMetafactory, and they're
+  //always synthetic.
+  // 2. We only care about Serializable closures, so let's check that as 
well
+  case c if !c.isSynthetic || !maybeClosure.isInstanceOf[Serializable] => 
None
+
+  case c if isClosureCandidate(c) =>
+try {
+  Option(inspect(maybeClosure)).filter(isIndylambdaScalaClosure)
+} catch {
+  case e: Exception =>
+logDebug("The given reference is not an indylambda Scala 
closure.", e)
+None
+}
+
+  case _ => None
+}
+  }
+
+  def isIndylambdaScalaClosure(lambdaProxy: SerializedLambda): Boolean = {
+lambdaProxy.getImplMethodKind == MethodHandleInfo.REF_invokeStatic &&
+  lambdaProxy.getImplMethodName.contains("$anonfun$")
+  }
+
+  def inspect(closure: AnyRef): SerializedLambda = {
+val writeReplace = closure.getClass.getDeclaredMethod("writeReplace")
+writeReplace.setAccessible(true)
+writeReplace.invoke(closure).asInstanceOf[SerializedLambda]
+  }
+
+  /**
+   * Check if the handle represents the LambdaMetafactory that indylambda 
Scala closures
+   * use for creating the lambda class and getting a closure instance.
+   */
+  def isLambdaMetafactory(bsmHandle: Handle): Boolean = {
+bsmHandle.getOwner == LambdaMetafactoryClassName &&
+  bsmHandle.getName == LambdaMetafactoryMethodName &&
+  bsmHandle.getDesc == LambdaMetafactoryMethodDesc
+  }
+
+  /**
+   * Check if the handle represents a target method that is:
+   * - a STATIC method that implements a Scala lambda body in the indylambda 
style
+   * - captures the enclosing `this`, i.e. the first argument is a reference 
to the same type as
+   *   the owning class.
+   * Returns true if both criteria above are met.
+   */
+  def isLambdaBodyCapturingOuter(handle: Handle, ownerInternalName: String): 
Boolean = {
+handle.getTag == H_INVOKESTATIC &&
+  handle.getName.contains("$anonfun$") &&
+  handle.getOwner == ownerInternalName &&
+  handle.getDesc.startsWith(s"(L$ownerInternalName;")
+  }
+
+  /**
+   * Check if the callee of a call site is a inner class constructor.
+   * - A constructor has to be invoked via INVOKESPECIAL
+   * - A constructor's internal name is "init" and the return type is 
"V" (void)
+   * - An inner class' first argument in the signature has to be a reference 
to the
+   *   enclosing "this", aka `$outer` in Scala.
+   */
+  def isInnerClassCtorCapturingOuter(
+  op: Int, owner: String, name: String, desc: String, callerInternalName: 
String): Boolean = {
+op == INVOKESPECIAL && name == "" && 
desc.startsWith(s"(L$callerInternalName;")
+  }
+
+  /**
+   * Scans an indylambda Scala closure, along with its lexically nested 
closures, and populate
+   * the accessed fields info on which fields on the outer object are accessed.
+   *
+   * This is equivalent to getInnerClosureClasses() + InnerClosureFinder + 
FieldAccessFinder fused
+   * into one for processing indylambda closures. The traversal order along 
the call graph is the
+   * same for all three combined, so they can be fused together easily while 
maintaining the same
+   * ordering as the existing implementation.
+   *
+   * Precondition: this function expects the `accessedFields` to be populated 
with all known
+   *   

[GitHub] [spark] maropu commented on a change in pull request #28463: [SPARK-31399][CORE] Support indylambda Scala closure in ClosureCleaner

2020-05-08 Thread GitBox


maropu commented on a change in pull request #28463:
URL: https://github.com/apache/spark/pull/28463#discussion_r422433941



##
File path: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
##
@@ -414,6 +434,296 @@ private[spark] object ClosureCleaner extends Logging {
   }
 }
 
+private[spark] object IndylambdaScalaClosures extends Logging {
+  // internal name of java.lang.invoke.LambdaMetafactory
+  val LambdaMetafactoryClassName = "java/lang/invoke/LambdaMetafactory"
+  // the method that Scala indylambda use for bootstrap method
+  val LambdaMetafactoryMethodName = "altMetafactory"
+  val LambdaMetafactoryMethodDesc = "(Ljava/lang/invoke/MethodHandles$Lookup;" 
+
+"Ljava/lang/String;Ljava/lang/invoke/MethodType;[Ljava/lang/Object;)" +
+"Ljava/lang/invoke/CallSite;"
+
+  /**
+   * Check if the given reference is a indylambda style Scala closure.
+   * If so (e.g. for Scala 2.12+ closures), return a non-empty serialization 
proxy
+   * (SerializedLambda) of the closure;
+   * otherwise (e.g. for Scala 2.11 closures) return None.
+   *
+   * @param maybeClosure the closure to check.
+   */
+  def getSerializationProxy(maybeClosure: AnyRef): Option[SerializedLambda] = {
+def isClosureCandidate(cls: Class[_]): Boolean = {
+  // TODO: maybe lift this restriction to support other functional 
interfaces in the future
+  val implementedInterfaces = ClassUtils.getAllInterfaces(cls).asScala
+  implementedInterfaces.exists(_.getName.startsWith("scala.Function"))
+}
+
+maybeClosure.getClass match {
+  // shortcut the fast check:
+  // 1. indylambda closure classes are generated by Java's 
LambdaMetafactory, and they're
+  //always synthetic.
+  // 2. We only care about Serializable closures, so let's check that as 
well
+  case c if !c.isSynthetic || !maybeClosure.isInstanceOf[Serializable] => 
None
+
+  case c if isClosureCandidate(c) =>
+try {
+  Option(inspect(maybeClosure)).filter(isIndylambdaScalaClosure)
+} catch {
+  case e: Exception =>
+logDebug("The given reference is not an indylambda Scala 
closure.", e)
+None
+}
+
+  case _ => None
+}
+  }
+
+  def isIndylambdaScalaClosure(lambdaProxy: SerializedLambda): Boolean = {
+lambdaProxy.getImplMethodKind == MethodHandleInfo.REF_invokeStatic &&
+  lambdaProxy.getImplMethodName.contains("$anonfun$")
+  }
+
+  def inspect(closure: AnyRef): SerializedLambda = {
+val writeReplace = closure.getClass.getDeclaredMethod("writeReplace")
+writeReplace.setAccessible(true)
+writeReplace.invoke(closure).asInstanceOf[SerializedLambda]
+  }
+
+  /**
+   * Check if the handle represents the LambdaMetafactory that indylambda 
Scala closures
+   * use for creating the lambda class and getting a closure instance.
+   */
+  def isLambdaMetafactory(bsmHandle: Handle): Boolean = {
+bsmHandle.getOwner == LambdaMetafactoryClassName &&
+  bsmHandle.getName == LambdaMetafactoryMethodName &&
+  bsmHandle.getDesc == LambdaMetafactoryMethodDesc
+  }
+
+  /**
+   * Check if the handle represents a target method that is:
+   * - a STATIC method that implements a Scala lambda body in the indylambda 
style
+   * - captures the enclosing `this`, i.e. the first argument is a reference 
to the same type as
+   *   the owning class.
+   * Returns true if both criteria above are met.
+   */
+  def isLambdaBodyCapturingOuter(handle: Handle, ownerInternalName: String): 
Boolean = {
+handle.getTag == H_INVOKESTATIC &&
+  handle.getName.contains("$anonfun$") &&
+  handle.getOwner == ownerInternalName &&
+  handle.getDesc.startsWith(s"(L$ownerInternalName;")
+  }
+
+  /**
+   * Check if the callee of a call site is a inner class constructor.
+   * - A constructor has to be invoked via INVOKESPECIAL
+   * - A constructor's internal name is "init" and the return type is 
"V" (void)
+   * - An inner class' first argument in the signature has to be a reference 
to the
+   *   enclosing "this", aka `$outer` in Scala.
+   */
+  def isInnerClassCtorCapturingOuter(
+  op: Int, owner: String, name: String, desc: String, callerInternalName: 
String): Boolean = {
+op == INVOKESPECIAL && name == "" && 
desc.startsWith(s"(L$callerInternalName;")
+  }
+
+  /**
+   * Scans an indylambda Scala closure, along with its lexically nested 
closures, and populate
+   * the accessed fields info on which fields on the outer object are accessed.
+   *
+   * This is equivalent to getInnerClosureClasses() + InnerClosureFinder + 
FieldAccessFinder fused
+   * into one for processing indylambda closures. The traversal order along 
the call graph is the
+   * same for all three combined, so they can be fused together easily while 
maintaining the same
+   * ordering as the existing implementation.
+   *
+   * Precondition: this function expects the `accessedFields` to be populated 
with all known
+   *   

[GitHub] [spark] maropu commented on a change in pull request #28463: [SPARK-31399][CORE] Support indylambda Scala closure in ClosureCleaner

2020-05-08 Thread GitBox


maropu commented on a change in pull request #28463:
URL: https://github.com/apache/spark/pull/28463#discussion_r422425889



##
File path: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
##
@@ -414,6 +434,296 @@ private[spark] object ClosureCleaner extends Logging {
   }
 }
 
+private[spark] object IndylambdaScalaClosures extends Logging {
+  // internal name of java.lang.invoke.LambdaMetafactory
+  val LambdaMetafactoryClassName = "java/lang/invoke/LambdaMetafactory"
+  // the method that Scala indylambda use for bootstrap method
+  val LambdaMetafactoryMethodName = "altMetafactory"
+  val LambdaMetafactoryMethodDesc = "(Ljava/lang/invoke/MethodHandles$Lookup;" 
+
+"Ljava/lang/String;Ljava/lang/invoke/MethodType;[Ljava/lang/Object;)" +
+"Ljava/lang/invoke/CallSite;"
+
+  /**
+   * Check if the given reference is a indylambda style Scala closure.
+   * If so (e.g. for Scala 2.12+ closures), return a non-empty serialization 
proxy
+   * (SerializedLambda) of the closure;
+   * otherwise (e.g. for Scala 2.11 closures) return None.
+   *
+   * @param maybeClosure the closure to check.
+   */
+  def getSerializationProxy(maybeClosure: AnyRef): Option[SerializedLambda] = {
+def isClosureCandidate(cls: Class[_]): Boolean = {
+  // TODO: maybe lift this restriction to support other functional 
interfaces in the future
+  val implementedInterfaces = ClassUtils.getAllInterfaces(cls).asScala
+  implementedInterfaces.exists(_.getName.startsWith("scala.Function"))
+}
+
+maybeClosure.getClass match {
+  // shortcut the fast check:
+  // 1. indylambda closure classes are generated by Java's 
LambdaMetafactory, and they're
+  //always synthetic.
+  // 2. We only care about Serializable closures, so let's check that as 
well
+  case c if !c.isSynthetic || !maybeClosure.isInstanceOf[Serializable] => 
None
+
+  case c if isClosureCandidate(c) =>
+try {
+  Option(inspect(maybeClosure)).filter(isIndylambdaScalaClosure)
+} catch {
+  case e: Exception =>
+logDebug("The given reference is not an indylambda Scala 
closure.", e)
+None
+}
+
+  case _ => None
+}
+  }
+
+  def isIndylambdaScalaClosure(lambdaProxy: SerializedLambda): Boolean = {
+lambdaProxy.getImplMethodKind == MethodHandleInfo.REF_invokeStatic &&
+  lambdaProxy.getImplMethodName.contains("$anonfun$")
+  }
+
+  def inspect(closure: AnyRef): SerializedLambda = {
+val writeReplace = closure.getClass.getDeclaredMethod("writeReplace")
+writeReplace.setAccessible(true)
+writeReplace.invoke(closure).asInstanceOf[SerializedLambda]
+  }
+
+  /**
+   * Check if the handle represents the LambdaMetafactory that indylambda 
Scala closures
+   * use for creating the lambda class and getting a closure instance.
+   */
+  def isLambdaMetafactory(bsmHandle: Handle): Boolean = {
+bsmHandle.getOwner == LambdaMetafactoryClassName &&
+  bsmHandle.getName == LambdaMetafactoryMethodName &&
+  bsmHandle.getDesc == LambdaMetafactoryMethodDesc
+  }
+
+  /**
+   * Check if the handle represents a target method that is:
+   * - a STATIC method that implements a Scala lambda body in the indylambda 
style
+   * - captures the enclosing `this`, i.e. the first argument is a reference 
to the same type as
+   *   the owning class.
+   * Returns true if both criteria above are met.
+   */
+  def isLambdaBodyCapturingOuter(handle: Handle, ownerInternalName: String): 
Boolean = {
+handle.getTag == H_INVOKESTATIC &&
+  handle.getName.contains("$anonfun$") &&
+  handle.getOwner == ownerInternalName &&
+  handle.getDesc.startsWith(s"(L$ownerInternalName;")
+  }
+
+  /**
+   * Check if the callee of a call site is a inner class constructor.
+   * - A constructor has to be invoked via INVOKESPECIAL
+   * - A constructor's internal name is "init" and the return type is 
"V" (void)
+   * - An inner class' first argument in the signature has to be a reference 
to the
+   *   enclosing "this", aka `$outer` in Scala.
+   */
+  def isInnerClassCtorCapturingOuter(
+  op: Int, owner: String, name: String, desc: String, callerInternalName: 
String): Boolean = {
+op == INVOKESPECIAL && name == "" && 
desc.startsWith(s"(L$callerInternalName;")
+  }
+
+  /**
+   * Scans an indylambda Scala closure, along with its lexically nested 
closures, and populate
+   * the accessed fields info on which fields on the outer object are accessed.
+   *
+   * This is equivalent to getInnerClosureClasses() + InnerClosureFinder + 
FieldAccessFinder fused
+   * into one for processing indylambda closures. The traversal order along 
the call graph is the
+   * same for all three combined, so they can be fused together easily while 
maintaining the same
+   * ordering as the existing implementation.
+   *
+   * Precondition: this function expects the `accessedFields` to be populated 
with all known
+   *   

[GitHub] [spark] maropu commented on a change in pull request #28463: [SPARK-31399][CORE] Support indylambda Scala closure in ClosureCleaner

2020-05-08 Thread GitBox


maropu commented on a change in pull request #28463:
URL: https://github.com/apache/spark/pull/28463#discussion_r422059408



##
File path: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
##
@@ -372,14 +342,64 @@ private[spark] object ClosureCleaner extends Logging {
 
   logDebug(s" +++ closure $func (${func.getClass.getName}) is now cleaned 
+++")
 } else {
-  logDebug(s"Cleaning lambda: ${lambdaFunc.get.getImplMethodName}")
+  val lambdaProxy = maybeIndylambdaProxy.get
+  val implMethodName = lambdaProxy.getImplMethodName
+
+  logDebug(s"Cleaning indylambda closure: $implMethodName")
+
+  // capturing class is the class that declared this lambda
+  val capturingClassName = lambdaProxy.getCapturingClass.replace('/', '.')
+  val classLoader = func.getClass.getClassLoader // this is the safest 
option
+  // scalastyle:off classforname
+  val capturingClass = Class.forName(capturingClassName, false, 
classLoader)
+  // scalastyle:on classforname
 
-  val captClass = 
Utils.classForName(lambdaFunc.get.getCapturingClass.replace('/', '.'),
-initialize = false, noSparkClassLoader = true)
   // Fail fast if we detect return statements in closures
-  getClassReader(captClass)
-.accept(new 
ReturnStatementFinder(Some(lambdaFunc.get.getImplMethodName)), 0)
-  logDebug(s" +++ Lambda closure (${lambdaFunc.get.getImplMethodName}) is 
now cleaned +++")
+  val capturingClassReader = getClassReader(capturingClass)
+  capturingClassReader.accept(new 
ReturnStatementFinder(Option(implMethodName)), 0)
+
+  val isClosureDeclaredInScalaRepl = 
capturingClassName.startsWith("$line") &&
+capturingClassName.endsWith("$iw")

Review comment:
   does this check return true iff the given class is a scala repl? I mean, 
no chance to accidentally conflict this pattern with that in user-provided 
scala programs?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28463: [SPARK-31399][CORE] Support indylambda Scala closure in ClosureCleaner

2020-05-08 Thread GitBox


maropu commented on a change in pull request #28463:
URL: https://github.com/apache/spark/pull/28463#discussion_r422055779



##
File path: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
##
@@ -372,14 +342,64 @@ private[spark] object ClosureCleaner extends Logging {
 
   logDebug(s" +++ closure $func (${func.getClass.getName}) is now cleaned 
+++")
 } else {
-  logDebug(s"Cleaning lambda: ${lambdaFunc.get.getImplMethodName}")
+  val lambdaProxy = maybeIndylambdaProxy.get
+  val implMethodName = lambdaProxy.getImplMethodName
+
+  logDebug(s"Cleaning indylambda closure: $implMethodName")
+
+  // capturing class is the class that declared this lambda
+  val capturingClassName = lambdaProxy.getCapturingClass.replace('/', '.')
+  val classLoader = func.getClass.getClassLoader // this is the safest 
option
+  // scalastyle:off classforname
+  val capturingClass = Class.forName(capturingClassName, false, 
classLoader)
+  // scalastyle:on classforname
 
-  val captClass = 
Utils.classForName(lambdaFunc.get.getCapturingClass.replace('/', '.'),
-initialize = false, noSparkClassLoader = true)
   // Fail fast if we detect return statements in closures
-  getClassReader(captClass)
-.accept(new 
ReturnStatementFinder(Some(lambdaFunc.get.getImplMethodName)), 0)
-  logDebug(s" +++ Lambda closure (${lambdaFunc.get.getImplMethodName}) is 
now cleaned +++")
+  val capturingClassReader = getClassReader(capturingClass)
+  capturingClassReader.accept(new 
ReturnStatementFinder(Option(implMethodName)), 0)
+
+  val isClosureDeclaredInScalaRepl = 
capturingClassName.startsWith("$line") &&
+capturingClassName.endsWith("$iw")
+  val outerThisOpt = if (lambdaProxy.getCapturedArgCount > 0) {
+Option(lambdaProxy.getCapturedArg(0))
+  } else {
+None
+  }
+
+  // only need to clean when there is an enclosing "this" captured by the 
closure, and it
+  // should be something cleanable, i.e. a Scala REPL line object
+  val needsCleaning = isClosureDeclaredInScalaRepl &&
+outerThisOpt.isDefined && outerThisOpt.get.getClass.getName == 
capturingClassName
+
+  if (needsCleaning) {
+// indylambda closures do not reference enclosing closures via an 
`$outer` chain, so no
+// transitive cleaning on the `$outer` chain is needed.
+// Thus clean() shouldn't be recursively called with a non-empty 
accessedFields.
+assert(accessedFields.isEmpty)
+
+initAccessedFields(accessedFields, Seq(capturingClass))
+IndylambdaScalaClosures.findAccessedFields(
+  lambdaProxy, classLoader, accessedFields, cleanTransitively)
+
+logDebug(s" + fields accessed by starting closure: 
${accessedFields.size} classes")
+accessedFields.foreach { f => logDebug(" " + f) }
+
+if (accessedFields(capturingClass).size < 
capturingClass.getDeclaredFields.length) {

Review comment:
   Ah, I see. Thanks for the explanation.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28463: [SPARK-31399][CORE] Support indylambda Scala closure in ClosureCleaner

2020-05-08 Thread GitBox


maropu commented on a change in pull request #28463:
URL: https://github.com/apache/spark/pull/28463#discussion_r422047047



##
File path: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
##
@@ -372,14 +342,64 @@ private[spark] object ClosureCleaner extends Logging {
 
   logDebug(s" +++ closure $func (${func.getClass.getName}) is now cleaned 
+++")
 } else {
-  logDebug(s"Cleaning lambda: ${lambdaFunc.get.getImplMethodName}")
+  val lambdaProxy = maybeIndylambdaProxy.get
+  val implMethodName = lambdaProxy.getImplMethodName
+
+  logDebug(s"Cleaning indylambda closure: $implMethodName")
+
+  // capturing class is the class that declared this lambda
+  val capturingClassName = lambdaProxy.getCapturingClass.replace('/', '.')
+  val classLoader = func.getClass.getClassLoader // this is the safest 
option
+  // scalastyle:off classforname
+  val capturingClass = Class.forName(capturingClassName, false, 
classLoader)
+  // scalastyle:on classforname
 
-  val captClass = 
Utils.classForName(lambdaFunc.get.getCapturingClass.replace('/', '.'),
-initialize = false, noSparkClassLoader = true)
   // Fail fast if we detect return statements in closures
-  getClassReader(captClass)
-.accept(new 
ReturnStatementFinder(Some(lambdaFunc.get.getImplMethodName)), 0)
-  logDebug(s" +++ Lambda closure (${lambdaFunc.get.getImplMethodName}) is 
now cleaned +++")
+  val capturingClassReader = getClassReader(capturingClass)
+  capturingClassReader.accept(new 
ReturnStatementFinder(Option(implMethodName)), 0)
+
+  val isClosureDeclaredInScalaRepl = 
capturingClassName.startsWith("$line") &&
+capturingClassName.endsWith("$iw")
+  val outerThisOpt = if (lambdaProxy.getCapturedArgCount > 0) {
+Option(lambdaProxy.getCapturedArg(0))
+  } else {
+None
+  }
+
+  // only need to clean when there is an enclosing "this" captured by the 
closure, and it
+  // should be something cleanable, i.e. a Scala REPL line object
+  val needsCleaning = isClosureDeclaredInScalaRepl &&
+outerThisOpt.isDefined && outerThisOpt.get.getClass.getName == 
capturingClassName
+
+  if (needsCleaning) {
+// indylambda closures do not reference enclosing closures via an 
`$outer` chain, so no
+// transitive cleaning on the `$outer` chain is needed.
+// Thus clean() shouldn't be recursively called with a non-empty 
accessedFields.
+assert(accessedFields.isEmpty)
+
+initAccessedFields(accessedFields, Seq(capturingClass))
+IndylambdaScalaClosures.findAccessedFields(
+  lambdaProxy, classLoader, accessedFields, cleanTransitively)
+
+logDebug(s" + fields accessed by starting closure: 
${accessedFields.size} classes")
+accessedFields.foreach { f => logDebug(" " + f) }
+
+if (accessedFields(capturingClass).size < 
capturingClass.getDeclaredFields.length) {

Review comment:
   Does this part come from the old code? I looked over the existing code 
but I couldn't find it. Anyway, what I just to want to know is that, is this 
comparison correct, `# of outer classes` < `# of fields in the given class` ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28463: [SPARK-31399][CORE] Support indylambda Scala closure in ClosureCleaner

2020-05-08 Thread GitBox


maropu commented on a change in pull request #28463:
URL: https://github.com/apache/spark/pull/28463#discussion_r422047047



##
File path: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
##
@@ -372,14 +342,64 @@ private[spark] object ClosureCleaner extends Logging {
 
   logDebug(s" +++ closure $func (${func.getClass.getName}) is now cleaned 
+++")
 } else {
-  logDebug(s"Cleaning lambda: ${lambdaFunc.get.getImplMethodName}")
+  val lambdaProxy = maybeIndylambdaProxy.get
+  val implMethodName = lambdaProxy.getImplMethodName
+
+  logDebug(s"Cleaning indylambda closure: $implMethodName")
+
+  // capturing class is the class that declared this lambda
+  val capturingClassName = lambdaProxy.getCapturingClass.replace('/', '.')
+  val classLoader = func.getClass.getClassLoader // this is the safest 
option
+  // scalastyle:off classforname
+  val capturingClass = Class.forName(capturingClassName, false, 
classLoader)
+  // scalastyle:on classforname
 
-  val captClass = 
Utils.classForName(lambdaFunc.get.getCapturingClass.replace('/', '.'),
-initialize = false, noSparkClassLoader = true)
   // Fail fast if we detect return statements in closures
-  getClassReader(captClass)
-.accept(new 
ReturnStatementFinder(Some(lambdaFunc.get.getImplMethodName)), 0)
-  logDebug(s" +++ Lambda closure (${lambdaFunc.get.getImplMethodName}) is 
now cleaned +++")
+  val capturingClassReader = getClassReader(capturingClass)
+  capturingClassReader.accept(new 
ReturnStatementFinder(Option(implMethodName)), 0)
+
+  val isClosureDeclaredInScalaRepl = 
capturingClassName.startsWith("$line") &&
+capturingClassName.endsWith("$iw")
+  val outerThisOpt = if (lambdaProxy.getCapturedArgCount > 0) {
+Option(lambdaProxy.getCapturedArg(0))
+  } else {
+None
+  }
+
+  // only need to clean when there is an enclosing "this" captured by the 
closure, and it
+  // should be something cleanable, i.e. a Scala REPL line object
+  val needsCleaning = isClosureDeclaredInScalaRepl &&
+outerThisOpt.isDefined && outerThisOpt.get.getClass.getName == 
capturingClassName
+
+  if (needsCleaning) {
+// indylambda closures do not reference enclosing closures via an 
`$outer` chain, so no
+// transitive cleaning on the `$outer` chain is needed.
+// Thus clean() shouldn't be recursively called with a non-empty 
accessedFields.
+assert(accessedFields.isEmpty)
+
+initAccessedFields(accessedFields, Seq(capturingClass))
+IndylambdaScalaClosures.findAccessedFields(
+  lambdaProxy, classLoader, accessedFields, cleanTransitively)
+
+logDebug(s" + fields accessed by starting closure: 
${accessedFields.size} classes")
+accessedFields.foreach { f => logDebug(" " + f) }
+
+if (accessedFields(capturingClass).size < 
capturingClass.getDeclaredFields.length) {

Review comment:
   Does this part come from the old code? I looked over the existing code 
but I couldn't find it. Anyway, what I just to want know is that, is this 
comparison correct, `# of outer classes` < `# of fields in the given class` ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28463: [SPARK-31399][CORE] Support indylambda Scala closure in ClosureCleaner

2020-05-08 Thread GitBox


maropu commented on a change in pull request #28463:
URL: https://github.com/apache/spark/pull/28463#discussion_r422031235



##
File path: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
##
@@ -256,7 +226,7 @@ private[spark] object ClosureCleaner extends Logging {
   return
 }
 
-if (lambdaFunc.isEmpty) {
+if (maybeIndylambdaProxy.isEmpty) {
   logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++")

Review comment:
   nit: how about explicitly saying this is an old style closure here? 
e.g., `Cleaning old style closure $func`, `Cleaning inline closure $func`, ...?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28463: [SPARK-31399][CORE] Support indylambda Scala closure in ClosureCleaner

2020-05-08 Thread GitBox


maropu commented on a change in pull request #28463:
URL: https://github.com/apache/spark/pull/28463#discussion_r422030014



##
File path: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
##
@@ -372,14 +342,64 @@ private[spark] object ClosureCleaner extends Logging {
 
   logDebug(s" +++ closure $func (${func.getClass.getName}) is now cleaned 
+++")
 } else {
-  logDebug(s"Cleaning lambda: ${lambdaFunc.get.getImplMethodName}")
+  val lambdaProxy = maybeIndylambdaProxy.get
+  val implMethodName = lambdaProxy.getImplMethodName
+
+  logDebug(s"Cleaning indylambda closure: $implMethodName")
+
+  // capturing class is the class that declared this lambda
+  val capturingClassName = lambdaProxy.getCapturingClass.replace('/', '.')
+  val classLoader = func.getClass.getClassLoader // this is the safest 
option
+  // scalastyle:off classforname
+  val capturingClass = Class.forName(capturingClassName, false, 
classLoader)
+  // scalastyle:on classforname
 
-  val captClass = 
Utils.classForName(lambdaFunc.get.getCapturingClass.replace('/', '.'),
-initialize = false, noSparkClassLoader = true)
   // Fail fast if we detect return statements in closures
-  getClassReader(captClass)
-.accept(new 
ReturnStatementFinder(Some(lambdaFunc.get.getImplMethodName)), 0)
-  logDebug(s" +++ Lambda closure (${lambdaFunc.get.getImplMethodName}) is 
now cleaned +++")
+  val capturingClassReader = getClassReader(capturingClass)
+  capturingClassReader.accept(new 
ReturnStatementFinder(Option(implMethodName)), 0)
+
+  val isClosureDeclaredInScalaRepl = 
capturingClassName.startsWith("$line") &&
+capturingClassName.endsWith("$iw")
+  val outerThisOpt = if (lambdaProxy.getCapturedArgCount > 0) {
+Option(lambdaProxy.getCapturedArg(0))
+  } else {
+None
+  }
+
+  // only need to clean when there is an enclosing "this" captured by the 
closure, and it
+  // should be something cleanable, i.e. a Scala REPL line object
+  val needsCleaning = isClosureDeclaredInScalaRepl &&
+outerThisOpt.isDefined && outerThisOpt.get.getClass.getName == 
capturingClassName
+
+  if (needsCleaning) {
+// indylambda closures do not reference enclosing closures via an 
`$outer` chain, so no
+// transitive cleaning on the `$outer` chain is needed.
+// Thus clean() shouldn't be recursively called with a non-empty 
accessedFields.
+assert(accessedFields.isEmpty)
+
+initAccessedFields(accessedFields, Seq(capturingClass))
+IndylambdaScalaClosures.findAccessedFields(
+  lambdaProxy, classLoader, accessedFields, cleanTransitively)
+
+logDebug(s" + fields accessed by starting closure: 
${accessedFields.size} classes")
+accessedFields.foreach { f => logDebug(" " + f) }
+
+if (accessedFields(capturingClass).size < 
capturingClass.getDeclaredFields.length) {

Review comment:
   Not `accessedFields(capturingClass).size` but 
`accessedFields.map(_._2.size).sum` here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28463: [SPARK-31399][CORE] Support indylambda Scala closure in ClosureCleaner

2020-05-08 Thread GitBox


maropu commented on a change in pull request #28463:
URL: https://github.com/apache/spark/pull/28463#discussion_r422011000



##
File path: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
##
@@ -414,6 +434,296 @@ private[spark] object ClosureCleaner extends Logging {
   }
 }
 
+private[spark] object IndylambdaScalaClosures extends Logging {
+  // internal name of java.lang.invoke.LambdaMetafactory
+  val LambdaMetafactoryClassName = "java/lang/invoke/LambdaMetafactory"
+  // the method that Scala indylambda use for bootstrap method
+  val LambdaMetafactoryMethodName = "altMetafactory"
+  val LambdaMetafactoryMethodDesc = "(Ljava/lang/invoke/MethodHandles$Lookup;" 
+
+"Ljava/lang/String;Ljava/lang/invoke/MethodType;[Ljava/lang/Object;)" +
+"Ljava/lang/invoke/CallSite;"
+
+  /**
+   * Check if the given reference is a indylambda style Scala closure.
+   * If so, return a non-empty serialization proxy (SerializedLambda) of the 
closure;
+   * otherwise return None.
+   *
+   * @param maybeClosure the closure to check.
+   */
+  def getSerializationProxy(maybeClosure: AnyRef): Option[SerializedLambda] = {
+val maybeClosureClass = maybeClosure.getClass
+
+// shortcut the fast check:
+// 1. indylambda closure classes are generated by Java's 
LambdaMetafactory, and they're always
+//synthetic.
+// 2. We only care about Serializable closures, so let's check that as well
+if (!maybeClosureClass.isSynthetic || 
!maybeClosure.isInstanceOf[Serializable]) return None

Review comment:
   nit: How about using pattern-matching here like this;
   ```
   
   def isClosureCandidate(clazz: Class[_]): Boolean = {
 val implementedInterfaces = ClassUtils.getAllInterfaces(clazz).asScala
 implementedInterfaces.exists(_.getName.startsWith("scala.Function"))
   }
   
   maybeClosure.getClass match {
 // shortcut the fast check:
 // 1. indylambda closure classes are generated by Java's 
LambdaMetafactory, and they're always
 //synthetic.
 // 2. We only care about Serializable closures, so let's check that as 
well
 case c if !c.isSynthetic || !maybeClosure.isInstanceOf[Serializable] 
=> None
   
 case c if isClosureCandidate(c) =>
   try {
 val lambdaProxy = inspect(maybeClosure)
 Option(lambdaProxy).filter(isIndylambdaScalaClosure)
   } catch {
 case e: Exception =>
   // no need to check if debug is enabled here the Spark logging 
api covers this.
   logDebug("The given reference is not an indylambda Scala 
closure.", e)
   None
   }
   
 case _ => None
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #28463: [SPARK-31399][CORE] Support indylambda Scala closure in ClosureCleaner

2020-05-08 Thread GitBox


maropu commented on a change in pull request #28463:
URL: https://github.com/apache/spark/pull/28463#discussion_r422011000



##
File path: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
##
@@ -414,6 +434,296 @@ private[spark] object ClosureCleaner extends Logging {
   }
 }
 
+private[spark] object IndylambdaScalaClosures extends Logging {
+  // internal name of java.lang.invoke.LambdaMetafactory
+  val LambdaMetafactoryClassName = "java/lang/invoke/LambdaMetafactory"
+  // the method that Scala indylambda use for bootstrap method
+  val LambdaMetafactoryMethodName = "altMetafactory"
+  val LambdaMetafactoryMethodDesc = "(Ljava/lang/invoke/MethodHandles$Lookup;" 
+
+"Ljava/lang/String;Ljava/lang/invoke/MethodType;[Ljava/lang/Object;)" +
+"Ljava/lang/invoke/CallSite;"
+
+  /**
+   * Check if the given reference is a indylambda style Scala closure.
+   * If so, return a non-empty serialization proxy (SerializedLambda) of the 
closure;
+   * otherwise return None.
+   *
+   * @param maybeClosure the closure to check.
+   */
+  def getSerializationProxy(maybeClosure: AnyRef): Option[SerializedLambda] = {
+val maybeClosureClass = maybeClosure.getClass
+
+// shortcut the fast check:
+// 1. indylambda closure classes are generated by Java's 
LambdaMetafactory, and they're always
+//synthetic.
+// 2. We only care about Serializable closures, so let's check that as well
+if (!maybeClosureClass.isSynthetic || 
!maybeClosure.isInstanceOf[Serializable]) return None

Review comment:
   nit: How about using pattern-matching here like this;
   ```
   
   def isClosureCandidate(clazz: Class[_]): Boolean = {
 val implementedInterfaces = ClassUtils.getAllInterfaces(clazz).asScala
 implementedInterfaces.exists(_.getName.startsWith("scala.Function"))
   }
   
   maybeClosure.getClass match {
 // shortcut the fast check:
 // 1. indylambda closure classes are generated by Java's 
LambdaMetafactory, and they're always
 //synthetic.
 // 2. We only care about Serializable closures, so let's check that as 
well
 case c if !c.isSynthetic || !maybeClosure.isInstanceOf[Serializable] 
=> None
   
 case c if isClosureCandidate(c) =>
   try {
 Option(inspect(maybeClosure)).filter(isIndylambdaScalaClosure)
   } catch {
 case e: Exception =>
   // no need to check if debug is enabled here the Spark logging 
api covers this.
   logDebug("The given reference is not an indylambda Scala 
closure.", e)
   None
   }
   
 case _ => None
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org