[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...

2018-08-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21930


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...

2018-08-01 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21930#discussion_r207009415
  
--- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala ---
@@ -366,14 +423,26 @@ private[spark] object ClosureCleaner extends Logging {
 private[spark] class ReturnStatementInClosureException
   extends SparkException("Return statements aren't allowed in Spark 
closures")
 
-private class ReturnStatementFinder extends ClassVisitor(ASM5) {
+private class ReturnStatementFinder(targetMethodName: Option[String] = 
None)
+  extends ClassVisitor(ASM5) {
   override def visitMethod(access: Int, name: String, desc: String,
   sig: String, exceptions: Array[String]): MethodVisitor = {
+
 // $anonfun$ covers Java 8 lambdas
 if (name.contains("apply") || name.contains("$anonfun$")) {
+  // A method with suffix "$adapted" will be generated in cases like
+  // { _:Int => return; Seq()} but not { _:Int => return; true}
+  // closure passed is $anonfun$t$1$adapted while actual code resides 
in $anonfun$s$1
+  // visitor will see only $anonfun$s$1$adapted, so we remove the 
suffix, see
+  // https://github.com/scala/scala-dev/issues/109
+  val isTargetMethod = if (targetMethodName.isEmpty) {
--- End diff --

ok will give it a shot.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...

2018-08-01 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21930#discussion_r207009284
  
--- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala ---
@@ -218,118 +261,132 @@ private[spark] object ClosureCleaner extends 
Logging {
   return
 }
 
-logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++")
-
-// A list of classes that represents closures enclosed in the given one
-val innerClasses = getInnerClosureClasses(func)
-
-// A list of enclosing objects and their respective classes, from 
innermost to outermost
-// An outer object at a given index is of type outer class at the same 
index
-val (outerClasses, outerObjects) = getOuterClassesAndObjects(func)
-
-// For logging purposes only
-val declaredFields = func.getClass.getDeclaredFields
-val declaredMethods = func.getClass.getDeclaredMethods
-
-if (log.isDebugEnabled) {
-  logDebug(" + declared fields: " + declaredFields.size)
-  declaredFields.foreach { f => logDebug(" " + f) }
-  logDebug(" + declared methods: " + declaredMethods.size)
-  declaredMethods.foreach { m => logDebug(" " + m) }
-  logDebug(" + inner classes: " + innerClasses.size)
-  innerClasses.foreach { c => logDebug(" " + c.getName) }
-  logDebug(" + outer classes: " + outerClasses.size)
-  outerClasses.foreach { c => logDebug(" " + c.getName) }
-  logDebug(" + outer objects: " + outerObjects.size)
-  outerObjects.foreach { o => logDebug(" " + o) }
-}
+if(lambdaFunc.isEmpty) {
+  logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) 
+++")
+
+  // A list of classes that represents closures enclosed in the given 
one
+  val innerClasses = getInnerClosureClasses(func)
+
+  // A list of enclosing objects and their respective classes, from 
innermost to outermost
+  // An outer object at a given index is of type outer class at the 
same index
+  val (outerClasses, outerObjects) = getOuterClassesAndObjects(func)
+
+  // For logging purposes only
+  val declaredFields = func.getClass.getDeclaredFields
+  val declaredMethods = func.getClass.getDeclaredMethods
+
+  if (log.isDebugEnabled) {
+logDebug(" + declared fields: " + declaredFields.size)
+declaredFields.foreach { f => logDebug(" " + f) }
+logDebug(" + declared methods: " + declaredMethods.size)
+declaredMethods.foreach { m => logDebug(" " + m) }
+logDebug(" + inner classes: " + innerClasses.size)
+innerClasses.foreach { c => logDebug(" " + c.getName) }
+logDebug(" + outer classes: " + outerClasses.size)
+outerClasses.foreach { c => logDebug(" " + c.getName) }
+logDebug(" + outer objects: " + outerObjects.size)
+outerObjects.foreach { o => logDebug(" " + o) }
+  }
 
-// Fail fast if we detect return statements in closures
-getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)
-
-// If accessed fields is not populated yet, we assume that
-// the closure we are trying to clean is the starting one
-if (accessedFields.isEmpty) {
-  logDebug(s" + populating accessed fields because this is the 
starting closure")
-  // Initialize accessed fields with the outer classes first
-  // This step is needed to associate the fields to the correct 
classes later
-  initAccessedFields(accessedFields, outerClasses)
-
-  // Populate accessed fields by visiting all fields and methods 
accessed by this and
-  // all of its inner closures. If transitive cleaning is enabled, 
this may recursively
-  // visits methods that belong to other classes in search of 
transitively referenced fields.
-  for (cls <- func.getClass :: innerClasses) {
-getClassReader(cls).accept(new FieldAccessFinder(accessedFields, 
cleanTransitively), 0)
+  // Fail fast if we detect return statements in closures
+  getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)
+
+  // If accessed fields is not populated yet, we assume that
+  // the closure we are trying to clean is the starting one
+  if (accessedFields.isEmpty) {
+logDebug(s" + populating accessed fields because this is the 
starting closure")
+// Initialize accessed fields with the outer classes first
+// This step is needed to associate the fields to the correct 
classes later
+initAccessedFields(accessedFields, outerClasses)
+
+// Populate accessed fields by visiting all fields and methods 
accessed by t

[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...

2018-08-01 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21930#discussion_r207009194
  
--- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala ---
@@ -218,118 +261,132 @@ private[spark] object ClosureCleaner extends 
Logging {
   return
 }
 
-logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++")
-
-// A list of classes that represents closures enclosed in the given one
-val innerClasses = getInnerClosureClasses(func)
-
-// A list of enclosing objects and their respective classes, from 
innermost to outermost
-// An outer object at a given index is of type outer class at the same 
index
-val (outerClasses, outerObjects) = getOuterClassesAndObjects(func)
-
-// For logging purposes only
-val declaredFields = func.getClass.getDeclaredFields
-val declaredMethods = func.getClass.getDeclaredMethods
-
-if (log.isDebugEnabled) {
-  logDebug(" + declared fields: " + declaredFields.size)
-  declaredFields.foreach { f => logDebug(" " + f) }
-  logDebug(" + declared methods: " + declaredMethods.size)
-  declaredMethods.foreach { m => logDebug(" " + m) }
-  logDebug(" + inner classes: " + innerClasses.size)
-  innerClasses.foreach { c => logDebug(" " + c.getName) }
-  logDebug(" + outer classes: " + outerClasses.size)
-  outerClasses.foreach { c => logDebug(" " + c.getName) }
-  logDebug(" + outer objects: " + outerObjects.size)
-  outerObjects.foreach { o => logDebug(" " + o) }
-}
+if(lambdaFunc.isEmpty) {
+  logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) 
+++")
+
+  // A list of classes that represents closures enclosed in the given 
one
+  val innerClasses = getInnerClosureClasses(func)
+
+  // A list of enclosing objects and their respective classes, from 
innermost to outermost
+  // An outer object at a given index is of type outer class at the 
same index
+  val (outerClasses, outerObjects) = getOuterClassesAndObjects(func)
+
+  // For logging purposes only
+  val declaredFields = func.getClass.getDeclaredFields
+  val declaredMethods = func.getClass.getDeclaredMethods
+
+  if (log.isDebugEnabled) {
+logDebug(" + declared fields: " + declaredFields.size)
+declaredFields.foreach { f => logDebug(" " + f) }
+logDebug(" + declared methods: " + declaredMethods.size)
+declaredMethods.foreach { m => logDebug(" " + m) }
+logDebug(" + inner classes: " + innerClasses.size)
+innerClasses.foreach { c => logDebug(" " + c.getName) }
+logDebug(" + outer classes: " + outerClasses.size)
+outerClasses.foreach { c => logDebug(" " + c.getName) }
+logDebug(" + outer objects: " + outerObjects.size)
+outerObjects.foreach { o => logDebug(" " + o) }
+  }
 
-// Fail fast if we detect return statements in closures
-getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)
-
-// If accessed fields is not populated yet, we assume that
-// the closure we are trying to clean is the starting one
-if (accessedFields.isEmpty) {
-  logDebug(s" + populating accessed fields because this is the 
starting closure")
-  // Initialize accessed fields with the outer classes first
-  // This step is needed to associate the fields to the correct 
classes later
-  initAccessedFields(accessedFields, outerClasses)
-
-  // Populate accessed fields by visiting all fields and methods 
accessed by this and
-  // all of its inner closures. If transitive cleaning is enabled, 
this may recursively
-  // visits methods that belong to other classes in search of 
transitively referenced fields.
-  for (cls <- func.getClass :: innerClasses) {
-getClassReader(cls).accept(new FieldAccessFinder(accessedFields, 
cleanTransitively), 0)
+  // Fail fast if we detect return statements in closures
+  getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)
+
+  // If accessed fields is not populated yet, we assume that
+  // the closure we are trying to clean is the starting one
+  if (accessedFields.isEmpty) {
+logDebug(s" + populating accessed fields because this is the 
starting closure")
+// Initialize accessed fields with the outer classes first
+// This step is needed to associate the fields to the correct 
classes later
+initAccessedFields(accessedFields, outerClasses)
+
+// Populate accessed fields by visiting all fields and methods 
accessed by t

[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...

2018-08-01 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21930#discussion_r207009085
  
--- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala ---
@@ -218,118 +261,132 @@ private[spark] object ClosureCleaner extends 
Logging {
   return
 }
 
-logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++")
-
-// A list of classes that represents closures enclosed in the given one
-val innerClasses = getInnerClosureClasses(func)
-
-// A list of enclosing objects and their respective classes, from 
innermost to outermost
-// An outer object at a given index is of type outer class at the same 
index
-val (outerClasses, outerObjects) = getOuterClassesAndObjects(func)
-
-// For logging purposes only
-val declaredFields = func.getClass.getDeclaredFields
-val declaredMethods = func.getClass.getDeclaredMethods
-
-if (log.isDebugEnabled) {
-  logDebug(" + declared fields: " + declaredFields.size)
-  declaredFields.foreach { f => logDebug(" " + f) }
-  logDebug(" + declared methods: " + declaredMethods.size)
-  declaredMethods.foreach { m => logDebug(" " + m) }
-  logDebug(" + inner classes: " + innerClasses.size)
-  innerClasses.foreach { c => logDebug(" " + c.getName) }
-  logDebug(" + outer classes: " + outerClasses.size)
-  outerClasses.foreach { c => logDebug(" " + c.getName) }
-  logDebug(" + outer objects: " + outerObjects.size)
-  outerObjects.foreach { o => logDebug(" " + o) }
-}
+if(lambdaFunc.isEmpty) {
+  logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) 
+++")
+
+  // A list of classes that represents closures enclosed in the given 
one
+  val innerClasses = getInnerClosureClasses(func)
+
+  // A list of enclosing objects and their respective classes, from 
innermost to outermost
+  // An outer object at a given index is of type outer class at the 
same index
+  val (outerClasses, outerObjects) = getOuterClassesAndObjects(func)
+
+  // For logging purposes only
+  val declaredFields = func.getClass.getDeclaredFields
+  val declaredMethods = func.getClass.getDeclaredMethods
+
+  if (log.isDebugEnabled) {
+logDebug(" + declared fields: " + declaredFields.size)
--- End diff --

This stuff pre-existed.  The git diff just shows them again because I added 
that if statement at the top.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...

2018-08-01 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21930#discussion_r207008830
  
--- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala ---
@@ -159,6 +160,43 @@ private[spark] object ClosureCleaner extends Logging {
 clean(closure, checkSerializable, cleanTransitively, Map.empty)
   }
 
+  /**
+   * Try to get a serialized Lambda from the closure.
+   *
+   * @param closure the closure to check.
+   */
+  private def getSerializedLambda(closure: AnyRef): 
Option[SerializedLambda] = {
+if (scala.util.Properties.versionString.contains("2.11")) {
+  return None
+}
+val isClosureCandidate =
+  closure.getClass.isSynthetic &&
+closure
+  .getClass
+  .getInterfaces.exists{x: Class[_] => 
x.getName.equals("scala.Serializable") }
--- End diff --

Need to test, I think I had to use it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...

2018-08-01 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21930#discussion_r207008732
  
--- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala ---
@@ -159,6 +160,43 @@ private[spark] object ClosureCleaner extends Logging {
 clean(closure, checkSerializable, cleanTransitively, Map.empty)
   }
 
+  /**
+   * Try to get a serialized Lambda from the closure.
+   *
+   * @param closure the closure to check.
+   */
+  private def getSerializedLambda(closure: AnyRef): 
Option[SerializedLambda] = {
+if (scala.util.Properties.versionString.contains("2.11")) {
--- End diff --

sure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...

2018-08-01 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21930#discussion_r206969720
  
--- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala ---
@@ -366,14 +423,26 @@ private[spark] object ClosureCleaner extends Logging {
 private[spark] class ReturnStatementInClosureException
   extends SparkException("Return statements aren't allowed in Spark 
closures")
 
-private class ReturnStatementFinder extends ClassVisitor(ASM5) {
+private class ReturnStatementFinder(targetMethodName: Option[String] = 
None)
+  extends ClassVisitor(ASM5) {
   override def visitMethod(access: Int, name: String, desc: String,
   sig: String, exceptions: Array[String]): MethodVisitor = {
+
 // $anonfun$ covers Java 8 lambdas
 if (name.contains("apply") || name.contains("$anonfun$")) {
+  // A method with suffix "$adapted" will be generated in cases like
+  // { _:Int => return; Seq()} but not { _:Int => return; true}
+  // closure passed is $anonfun$t$1$adapted while actual code resides 
in $anonfun$s$1
+  // visitor will see only $anonfun$s$1$adapted, so we remove the 
suffix, see
+  // https://github.com/scala/scala-dev/issues/109
+  val isTargetMethod = if (targetMethodName.isEmpty) {
--- End diff --

Simplify to `isTargetMethod = targetMethodName.isEmpty || ... || ...`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...

2018-08-01 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21930#discussion_r206969409
  
--- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala ---
@@ -218,118 +261,132 @@ private[spark] object ClosureCleaner extends 
Logging {
   return
 }
 
-logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++")
-
-// A list of classes that represents closures enclosed in the given one
-val innerClasses = getInnerClosureClasses(func)
-
-// A list of enclosing objects and their respective classes, from 
innermost to outermost
-// An outer object at a given index is of type outer class at the same 
index
-val (outerClasses, outerObjects) = getOuterClassesAndObjects(func)
-
-// For logging purposes only
-val declaredFields = func.getClass.getDeclaredFields
-val declaredMethods = func.getClass.getDeclaredMethods
-
-if (log.isDebugEnabled) {
-  logDebug(" + declared fields: " + declaredFields.size)
-  declaredFields.foreach { f => logDebug(" " + f) }
-  logDebug(" + declared methods: " + declaredMethods.size)
-  declaredMethods.foreach { m => logDebug(" " + m) }
-  logDebug(" + inner classes: " + innerClasses.size)
-  innerClasses.foreach { c => logDebug(" " + c.getName) }
-  logDebug(" + outer classes: " + outerClasses.size)
-  outerClasses.foreach { c => logDebug(" " + c.getName) }
-  logDebug(" + outer objects: " + outerObjects.size)
-  outerObjects.foreach { o => logDebug(" " + o) }
-}
+if(lambdaFunc.isEmpty) {
+  logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) 
+++")
+
+  // A list of classes that represents closures enclosed in the given 
one
+  val innerClasses = getInnerClosureClasses(func)
+
+  // A list of enclosing objects and their respective classes, from 
innermost to outermost
+  // An outer object at a given index is of type outer class at the 
same index
+  val (outerClasses, outerObjects) = getOuterClassesAndObjects(func)
+
+  // For logging purposes only
+  val declaredFields = func.getClass.getDeclaredFields
+  val declaredMethods = func.getClass.getDeclaredMethods
+
+  if (log.isDebugEnabled) {
+logDebug(" + declared fields: " + declaredFields.size)
+declaredFields.foreach { f => logDebug(" " + f) }
+logDebug(" + declared methods: " + declaredMethods.size)
+declaredMethods.foreach { m => logDebug(" " + m) }
+logDebug(" + inner classes: " + innerClasses.size)
+innerClasses.foreach { c => logDebug(" " + c.getName) }
+logDebug(" + outer classes: " + outerClasses.size)
+outerClasses.foreach { c => logDebug(" " + c.getName) }
+logDebug(" + outer objects: " + outerObjects.size)
+outerObjects.foreach { o => logDebug(" " + o) }
+  }
 
-// Fail fast if we detect return statements in closures
-getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)
-
-// If accessed fields is not populated yet, we assume that
-// the closure we are trying to clean is the starting one
-if (accessedFields.isEmpty) {
-  logDebug(s" + populating accessed fields because this is the 
starting closure")
-  // Initialize accessed fields with the outer classes first
-  // This step is needed to associate the fields to the correct 
classes later
-  initAccessedFields(accessedFields, outerClasses)
-
-  // Populate accessed fields by visiting all fields and methods 
accessed by this and
-  // all of its inner closures. If transitive cleaning is enabled, 
this may recursively
-  // visits methods that belong to other classes in search of 
transitively referenced fields.
-  for (cls <- func.getClass :: innerClasses) {
-getClassReader(cls).accept(new FieldAccessFinder(accessedFields, 
cleanTransitively), 0)
+  // Fail fast if we detect return statements in closures
+  getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)
+
+  // If accessed fields is not populated yet, we assume that
+  // the closure we are trying to clean is the starting one
+  if (accessedFields.isEmpty) {
+logDebug(s" + populating accessed fields because this is the 
starting closure")
+// Initialize accessed fields with the outer classes first
+// This step is needed to associate the fields to the correct 
classes later
+initAccessedFields(accessedFields, outerClasses)
+
+// Populate accessed fields by visiting all fields and methods 
accessed by t

[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...

2018-08-01 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21930#discussion_r206969057
  
--- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala ---
@@ -218,118 +261,132 @@ private[spark] object ClosureCleaner extends 
Logging {
   return
 }
 
-logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++")
-
-// A list of classes that represents closures enclosed in the given one
-val innerClasses = getInnerClosureClasses(func)
-
-// A list of enclosing objects and their respective classes, from 
innermost to outermost
-// An outer object at a given index is of type outer class at the same 
index
-val (outerClasses, outerObjects) = getOuterClassesAndObjects(func)
-
-// For logging purposes only
-val declaredFields = func.getClass.getDeclaredFields
-val declaredMethods = func.getClass.getDeclaredMethods
-
-if (log.isDebugEnabled) {
-  logDebug(" + declared fields: " + declaredFields.size)
-  declaredFields.foreach { f => logDebug(" " + f) }
-  logDebug(" + declared methods: " + declaredMethods.size)
-  declaredMethods.foreach { m => logDebug(" " + m) }
-  logDebug(" + inner classes: " + innerClasses.size)
-  innerClasses.foreach { c => logDebug(" " + c.getName) }
-  logDebug(" + outer classes: " + outerClasses.size)
-  outerClasses.foreach { c => logDebug(" " + c.getName) }
-  logDebug(" + outer objects: " + outerObjects.size)
-  outerObjects.foreach { o => logDebug(" " + o) }
-}
+if(lambdaFunc.isEmpty) {
+  logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) 
+++")
+
+  // A list of classes that represents closures enclosed in the given 
one
+  val innerClasses = getInnerClosureClasses(func)
+
+  // A list of enclosing objects and their respective classes, from 
innermost to outermost
+  // An outer object at a given index is of type outer class at the 
same index
+  val (outerClasses, outerObjects) = getOuterClassesAndObjects(func)
+
+  // For logging purposes only
+  val declaredFields = func.getClass.getDeclaredFields
+  val declaredMethods = func.getClass.getDeclaredMethods
+
+  if (log.isDebugEnabled) {
+logDebug(" + declared fields: " + declaredFields.size)
+declaredFields.foreach { f => logDebug(" " + f) }
+logDebug(" + declared methods: " + declaredMethods.size)
+declaredMethods.foreach { m => logDebug(" " + m) }
+logDebug(" + inner classes: " + innerClasses.size)
+innerClasses.foreach { c => logDebug(" " + c.getName) }
+logDebug(" + outer classes: " + outerClasses.size)
+outerClasses.foreach { c => logDebug(" " + c.getName) }
+logDebug(" + outer objects: " + outerObjects.size)
+outerObjects.foreach { o => logDebug(" " + o) }
+  }
 
-// Fail fast if we detect return statements in closures
-getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)
-
-// If accessed fields is not populated yet, we assume that
-// the closure we are trying to clean is the starting one
-if (accessedFields.isEmpty) {
-  logDebug(s" + populating accessed fields because this is the 
starting closure")
-  // Initialize accessed fields with the outer classes first
-  // This step is needed to associate the fields to the correct 
classes later
-  initAccessedFields(accessedFields, outerClasses)
-
-  // Populate accessed fields by visiting all fields and methods 
accessed by this and
-  // all of its inner closures. If transitive cleaning is enabled, 
this may recursively
-  // visits methods that belong to other classes in search of 
transitively referenced fields.
-  for (cls <- func.getClass :: innerClasses) {
-getClassReader(cls).accept(new FieldAccessFinder(accessedFields, 
cleanTransitively), 0)
+  // Fail fast if we detect return statements in closures
+  getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)
+
+  // If accessed fields is not populated yet, we assume that
+  // the closure we are trying to clean is the starting one
+  if (accessedFields.isEmpty) {
+logDebug(s" + populating accessed fields because this is the 
starting closure")
--- End diff --

Doesn't actually need interpolation after all


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...

2018-08-01 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21930#discussion_r206968792
  
--- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala ---
@@ -218,118 +261,132 @@ private[spark] object ClosureCleaner extends 
Logging {
   return
 }
 
-logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++")
-
-// A list of classes that represents closures enclosed in the given one
-val innerClasses = getInnerClosureClasses(func)
-
-// A list of enclosing objects and their respective classes, from 
innermost to outermost
-// An outer object at a given index is of type outer class at the same 
index
-val (outerClasses, outerObjects) = getOuterClassesAndObjects(func)
-
-// For logging purposes only
-val declaredFields = func.getClass.getDeclaredFields
-val declaredMethods = func.getClass.getDeclaredMethods
-
-if (log.isDebugEnabled) {
-  logDebug(" + declared fields: " + declaredFields.size)
-  declaredFields.foreach { f => logDebug(" " + f) }
-  logDebug(" + declared methods: " + declaredMethods.size)
-  declaredMethods.foreach { m => logDebug(" " + m) }
-  logDebug(" + inner classes: " + innerClasses.size)
-  innerClasses.foreach { c => logDebug(" " + c.getName) }
-  logDebug(" + outer classes: " + outerClasses.size)
-  outerClasses.foreach { c => logDebug(" " + c.getName) }
-  logDebug(" + outer objects: " + outerObjects.size)
-  outerObjects.foreach { o => logDebug(" " + o) }
-}
+if(lambdaFunc.isEmpty) {
--- End diff --

Nit: space after if


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...

2018-08-01 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21930#discussion_r206969293
  
--- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala ---
@@ -218,118 +261,132 @@ private[spark] object ClosureCleaner extends 
Logging {
   return
 }
 
-logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++")
-
-// A list of classes that represents closures enclosed in the given one
-val innerClasses = getInnerClosureClasses(func)
-
-// A list of enclosing objects and their respective classes, from 
innermost to outermost
-// An outer object at a given index is of type outer class at the same 
index
-val (outerClasses, outerObjects) = getOuterClassesAndObjects(func)
-
-// For logging purposes only
-val declaredFields = func.getClass.getDeclaredFields
-val declaredMethods = func.getClass.getDeclaredMethods
-
-if (log.isDebugEnabled) {
-  logDebug(" + declared fields: " + declaredFields.size)
-  declaredFields.foreach { f => logDebug(" " + f) }
-  logDebug(" + declared methods: " + declaredMethods.size)
-  declaredMethods.foreach { m => logDebug(" " + m) }
-  logDebug(" + inner classes: " + innerClasses.size)
-  innerClasses.foreach { c => logDebug(" " + c.getName) }
-  logDebug(" + outer classes: " + outerClasses.size)
-  outerClasses.foreach { c => logDebug(" " + c.getName) }
-  logDebug(" + outer objects: " + outerObjects.size)
-  outerObjects.foreach { o => logDebug(" " + o) }
-}
+if(lambdaFunc.isEmpty) {
+  logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) 
+++")
+
+  // A list of classes that represents closures enclosed in the given 
one
+  val innerClasses = getInnerClosureClasses(func)
+
+  // A list of enclosing objects and their respective classes, from 
innermost to outermost
+  // An outer object at a given index is of type outer class at the 
same index
+  val (outerClasses, outerObjects) = getOuterClassesAndObjects(func)
+
+  // For logging purposes only
+  val declaredFields = func.getClass.getDeclaredFields
+  val declaredMethods = func.getClass.getDeclaredMethods
+
+  if (log.isDebugEnabled) {
+logDebug(" + declared fields: " + declaredFields.size)
+declaredFields.foreach { f => logDebug(" " + f) }
+logDebug(" + declared methods: " + declaredMethods.size)
+declaredMethods.foreach { m => logDebug(" " + m) }
+logDebug(" + inner classes: " + innerClasses.size)
+innerClasses.foreach { c => logDebug(" " + c.getName) }
+logDebug(" + outer classes: " + outerClasses.size)
+outerClasses.foreach { c => logDebug(" " + c.getName) }
+logDebug(" + outer objects: " + outerObjects.size)
+outerObjects.foreach { o => logDebug(" " + o) }
+  }
 
-// Fail fast if we detect return statements in closures
-getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)
-
-// If accessed fields is not populated yet, we assume that
-// the closure we are trying to clean is the starting one
-if (accessedFields.isEmpty) {
-  logDebug(s" + populating accessed fields because this is the 
starting closure")
-  // Initialize accessed fields with the outer classes first
-  // This step is needed to associate the fields to the correct 
classes later
-  initAccessedFields(accessedFields, outerClasses)
-
-  // Populate accessed fields by visiting all fields and methods 
accessed by this and
-  // all of its inner closures. If transitive cleaning is enabled, 
this may recursively
-  // visits methods that belong to other classes in search of 
transitively referenced fields.
-  for (cls <- func.getClass :: innerClasses) {
-getClassReader(cls).accept(new FieldAccessFinder(accessedFields, 
cleanTransitively), 0)
+  // Fail fast if we detect return statements in closures
+  getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)
+
+  // If accessed fields is not populated yet, we assume that
+  // the closure we are trying to clean is the starting one
+  if (accessedFields.isEmpty) {
+logDebug(s" + populating accessed fields because this is the 
starting closure")
+// Initialize accessed fields with the outer classes first
+// This step is needed to associate the fields to the correct 
classes later
+initAccessedFields(accessedFields, outerClasses)
+
+// Populate accessed fields by visiting all fields and methods 
accessed by t

[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...

2018-08-01 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21930#discussion_r206968600
  
--- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala ---
@@ -159,6 +160,43 @@ private[spark] object ClosureCleaner extends Logging {
 clean(closure, checkSerializable, cleanTransitively, Map.empty)
   }
 
+  /**
+   * Try to get a serialized Lambda from the closure.
+   *
+   * @param closure the closure to check.
+   */
+  private def getSerializedLambda(closure: AnyRef): 
Option[SerializedLambda] = {
+if (scala.util.Properties.versionString.contains("2.11")) {
+  return None
+}
+val isClosureCandidate =
+  closure.getClass.isSynthetic &&
+closure
+  .getClass
+  .getInterfaces.exists{x: Class[_] => 
x.getName.equals("scala.Serializable") }
--- End diff --

More nits: does `(_.getName.equals...)` not work?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...

2018-08-01 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21930#discussion_r206968962
  
--- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala ---
@@ -218,118 +261,132 @@ private[spark] object ClosureCleaner extends 
Logging {
   return
 }
 
-logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++")
-
-// A list of classes that represents closures enclosed in the given one
-val innerClasses = getInnerClosureClasses(func)
-
-// A list of enclosing objects and their respective classes, from 
innermost to outermost
-// An outer object at a given index is of type outer class at the same 
index
-val (outerClasses, outerObjects) = getOuterClassesAndObjects(func)
-
-// For logging purposes only
-val declaredFields = func.getClass.getDeclaredFields
-val declaredMethods = func.getClass.getDeclaredMethods
-
-if (log.isDebugEnabled) {
-  logDebug(" + declared fields: " + declaredFields.size)
-  declaredFields.foreach { f => logDebug(" " + f) }
-  logDebug(" + declared methods: " + declaredMethods.size)
-  declaredMethods.foreach { m => logDebug(" " + m) }
-  logDebug(" + inner classes: " + innerClasses.size)
-  innerClasses.foreach { c => logDebug(" " + c.getName) }
-  logDebug(" + outer classes: " + outerClasses.size)
-  outerClasses.foreach { c => logDebug(" " + c.getName) }
-  logDebug(" + outer objects: " + outerObjects.size)
-  outerObjects.foreach { o => logDebug(" " + o) }
-}
+if(lambdaFunc.isEmpty) {
+  logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) 
+++")
+
+  // A list of classes that represents closures enclosed in the given 
one
+  val innerClasses = getInnerClosureClasses(func)
+
+  // A list of enclosing objects and their respective classes, from 
innermost to outermost
+  // An outer object at a given index is of type outer class at the 
same index
+  val (outerClasses, outerObjects) = getOuterClassesAndObjects(func)
+
+  // For logging purposes only
+  val declaredFields = func.getClass.getDeclaredFields
+  val declaredMethods = func.getClass.getDeclaredMethods
+
+  if (log.isDebugEnabled) {
+logDebug(" + declared fields: " + declaredFields.size)
--- End diff --

How about using interpolation, while bothering to update these debug 
statements?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...

2018-08-01 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21930#discussion_r206968430
  
--- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala ---
@@ -159,6 +160,43 @@ private[spark] object ClosureCleaner extends Logging {
 clean(closure, checkSerializable, cleanTransitively, Map.empty)
   }
 
+  /**
+   * Try to get a serialized Lambda from the closure.
+   *
+   * @param closure the closure to check.
+   */
+  private def getSerializedLambda(closure: AnyRef): 
Option[SerializedLambda] = {
+if (scala.util.Properties.versionString.contains("2.11")) {
--- End diff --

Ah, this part of the diff was collapsed and I didn't see it. I have a few 
more minor questions/suggestions on the closure cleaner change but would indeed 
like to get this in for 2.4, and it looks close.

Here, what about storing the result of this in a private field so as not to 
compute it every time? it might not be a big deal, don't know.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...

2018-08-01 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21930#discussion_r206968703
  
--- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala ---
@@ -159,6 +160,43 @@ private[spark] object ClosureCleaner extends Logging {
 clean(closure, checkSerializable, cleanTransitively, Map.empty)
   }
 
+  /**
+   * Try to get a serialized Lambda from the closure.
+   *
+   * @param closure the closure to check.
+   */
+  private def getSerializedLambda(closure: AnyRef): 
Option[SerializedLambda] = {
+if (scala.util.Properties.versionString.contains("2.11")) {
+  return None
+}
+val isClosureCandidate =
+  closure.getClass.isSynthetic &&
+closure
+  .getClass
+  .getInterfaces.exists{x: Class[_] => 
x.getName.equals("scala.Serializable") }
+
+if (isClosureCandidate) {
+  try {
+val res = inspect(closure)
--- End diff --

Inline res? just little stuff that might streamline this a bit, it doesn't 
matter 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...

2018-07-31 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21930#discussion_r206605419
  
--- Diff: core/src/main/scala/org/apache/spark/TaskContext.scala ---
@@ -123,7 +123,7 @@ abstract class TaskContext extends Serializable {
*
* Exceptions thrown by the listener will result in failure of the task.
*/
-  def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext = {
+  def addTaskCompletionListener[U](f: (TaskContext) => U): TaskContext = {
--- End diff --

OK, if it's binary- and source-compatible with existing user programs for 
2.11 users, that's fine. Bets are off for 2.12 users anyway. 

When the release notes are crafted for 2.4, we'll want to mention this JIRA 
(I'll tag it) and issues like this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...

2018-07-31 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21930#discussion_r206562302
  
--- Diff: core/src/main/scala/org/apache/spark/TaskContext.scala ---
@@ -123,7 +123,7 @@ abstract class TaskContext extends Serializable {
*
* Exceptions thrown by the listener will result in failure of the task.
*/
-  def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext = {
+  def addTaskCompletionListener[U](f: (TaskContext) => U): TaskContext = {
--- End diff --

Yes this covers that bug. So if you build with 2.11 you dont need to 
specify the type Unit (I tried that) when you make the call since there is no 
ambiguity, compiler does not face an overloading issue. With 2.12 both  
addTaskCompletionListener methods end up to be SAM types and the Unit adaption 
causes this issue. I am not sure we can do anything more here. @retronym or 
@lrytz may add more context.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...

2018-07-31 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21930#discussion_r206549150
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala ---
@@ -538,17 +543,22 @@ class ClosureCleanerSuite2 extends SparkFunSuite with 
BeforeAndAfterAll with Pri
   // As before, this closure is neither serializable nor cleanable
   verifyCleaning(inner1, serializableBefore = false, serializableAfter 
= false)
 
-  // This closure is no longer serializable because it now has a 
pointer to the outer closure,
-  // which is itself not serializable because it has a pointer to the 
ClosureCleanerSuite2.
-  // If we do not clean transitively, we will not null out this 
indirect reference.
-  verifyCleaning(
-inner2, serializableBefore = false, serializableAfter = false, 
transitive = false)
-
-  // If we clean transitively, we will find that method `a` does not 
actually reference the
-  // outer closure's parent (i.e. the ClosureCleanerSuite), so we can 
additionally null out
-  // the outer closure's parent pointer. This will make `inner2` 
serializable.
-  verifyCleaning(
-inner2, serializableBefore = false, serializableAfter = true, 
transitive = true)
+  if(!ClosureCleanerSuite2.supportsLMFs) {
--- End diff --

sure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...

2018-07-31 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21930#discussion_r206541904
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala ---
@@ -538,17 +543,22 @@ class ClosureCleanerSuite2 extends SparkFunSuite with 
BeforeAndAfterAll with Pri
   // As before, this closure is neither serializable nor cleanable
   verifyCleaning(inner1, serializableBefore = false, serializableAfter 
= false)
 
-  // This closure is no longer serializable because it now has a 
pointer to the outer closure,
-  // which is itself not serializable because it has a pointer to the 
ClosureCleanerSuite2.
-  // If we do not clean transitively, we will not null out this 
indirect reference.
-  verifyCleaning(
-inner2, serializableBefore = false, serializableAfter = false, 
transitive = false)
-
-  // If we clean transitively, we will find that method `a` does not 
actually reference the
-  // outer closure's parent (i.e. the ClosureCleanerSuite), so we can 
additionally null out
-  // the outer closure's parent pointer. This will make `inner2` 
serializable.
-  verifyCleaning(
-inner2, serializableBefore = false, serializableAfter = true, 
transitive = true)
+  if(!ClosureCleanerSuite2.supportsLMFs) {
--- End diff --

Nit: space after "if". scalastyle might flag that. While you're at it, what 
about flipping the blocks to avoid a negation? just for a tiny bit of clarity.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...

2018-07-31 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21930#discussion_r206542154
  
--- Diff: core/src/main/scala/org/apache/spark/TaskContext.scala ---
@@ -123,7 +123,7 @@ abstract class TaskContext extends Serializable {
*
* Exceptions thrown by the listener will result in failure of the task.
*/
-  def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext = {
+  def addTaskCompletionListener[U](f: (TaskContext) => U): TaskContext = {
--- End diff --

This is to work around https://github.com/scala/bug/issues/11016 right? I'd 
prefer any solution that doesn't involve changing all the callers, but looks 
like both workarounds require something to be done. At least I'd document the 
purpose of U here.

That said, user code can call this right? And it would have to implement a 
similar change to work with 2.12? that's probably OK in the sense that any user 
app must make several changes to be compatible with 2.12.

I don't think 2.11 users would find there is a change to the binary API. 
Would a 2.11 user need to change its calls to specify a type for U with this 
change? because it looks like it's not optional, given that Spark code has to 
change its calls. Is that not a source incompatibility?

If it's not, then, I guess I wonder if you can avoid changing all the calls 
in Spark?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...

2018-07-31 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/21930#discussion_r206540368
  
--- Diff: core/src/main/scala/org/apache/spark/TaskContext.scala ---
@@ -123,7 +123,7 @@ abstract class TaskContext extends Serializable {
*
* Exceptions thrown by the listener will result in failure of the task.
*/
-  def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext = {
+  def addTaskCompletionListener[U](f: (TaskContext) => U): TaskContext = {
--- End diff --

Do we need to change this? I don't think it is a problem binary 
compatibility wise, but it seems a but weird since we don't use the result of 
the function.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...

2018-07-31 Thread skonto
GitHub user skonto opened a pull request:

https://github.com/apache/spark/pull/21930

[SPARK-14540][Core] Fix remaining major issues for Scala 2.12 Support 

## What changes were proposed in this pull request?
This PR addresses issues 2,3 in the 
[document](https://docs.google.com/document/d/1fbkjEL878witxVQpOCbjlvOvadHtVjYXeB-2mgzDTvk).

* We modified the closure cleaner to identify closures that are implemented 
via the LambdaMetaFactory mechanism (serializedLambdas) (issue2). 

* We also fix the issue: scala/bug#11016. There are two options for solving 
the Unit issue, either add () at the end of the closure or use the trick 
described in the doc. Otherwise overloading resolution does not work (we are 
not going to eliminate either of the methods) here. Compiler tries to adapt to 
Unit and makes these two methods candidates for overloading, when there is 
polymorphic overloading there is no ambiguity (that is the workaround 
implemented). This does not look that good but it serves its purpose as we need 
to support two different uses for method: `addTaskFailureListener`. One that 
passes a TaskCompletionListener and one that passes a closure that is wrapped 
with a TaskCompletionListener later on (issue3).

Note: regarding issue 1 the plan is:

> Do Nothing. Don’t try to fix this as this is only a problem for Java 
users who would want to use 2.11 binaries. In that case they can cast to 
MapFunction to be able to utilize lambdas. In Spark 3.0.0 the API should be 
simplified so that this issue is removed.

## How was this patch tested?
This was manually tested:
```./dev/change-scala-version.sh 2.12
./build/mvn -DskipTests -Pscala-2.12 clean package
./build/mvn -Pscala-2.12 clean package 
-DwildcardSuites=org.apache.spark.serializer.ProactiveClosureSerializationSuite 
-Dtest=None
./build/mvn -Pscala-2.12 clean package 
-DwildcardSuites=org.apache.spark.util.ClosureCleanerSuite -Dtest=None
./build/mvn -Pscala-2.12 clean package 
-DwildcardSuites=org.apache.spark.streaming.DStreamClosureSuite -Dtest=None```

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/skonto/spark scala2.12-sup

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21930.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21930


commit d466a9c2529e5fe1a680b6bab6b80fd57bef8c35
Author: Stavros Kontopoulos 
Date:   2018-07-30T22:51:14Z

initial




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org