[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21930 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21930#discussion_r207009415 --- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala --- @@ -366,14 +423,26 @@ private[spark] object ClosureCleaner extends Logging { private[spark] class ReturnStatementInClosureException extends SparkException("Return statements aren't allowed in Spark closures") -private class ReturnStatementFinder extends ClassVisitor(ASM5) { +private class ReturnStatementFinder(targetMethodName: Option[String] = None) + extends ClassVisitor(ASM5) { override def visitMethod(access: Int, name: String, desc: String, sig: String, exceptions: Array[String]): MethodVisitor = { + // $anonfun$ covers Java 8 lambdas if (name.contains("apply") || name.contains("$anonfun$")) { + // A method with suffix "$adapted" will be generated in cases like + // { _:Int => return; Seq()} but not { _:Int => return; true} + // closure passed is $anonfun$t$1$adapted while actual code resides in $anonfun$s$1 + // visitor will see only $anonfun$s$1$adapted, so we remove the suffix, see + // https://github.com/scala/scala-dev/issues/109 + val isTargetMethod = if (targetMethodName.isEmpty) { --- End diff -- ok will give it a shot. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21930#discussion_r207009284 --- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala --- @@ -218,118 +261,132 @@ private[spark] object ClosureCleaner extends Logging { return } -logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++") - -// A list of classes that represents closures enclosed in the given one -val innerClasses = getInnerClosureClasses(func) - -// A list of enclosing objects and their respective classes, from innermost to outermost -// An outer object at a given index is of type outer class at the same index -val (outerClasses, outerObjects) = getOuterClassesAndObjects(func) - -// For logging purposes only -val declaredFields = func.getClass.getDeclaredFields -val declaredMethods = func.getClass.getDeclaredMethods - -if (log.isDebugEnabled) { - logDebug(" + declared fields: " + declaredFields.size) - declaredFields.foreach { f => logDebug(" " + f) } - logDebug(" + declared methods: " + declaredMethods.size) - declaredMethods.foreach { m => logDebug(" " + m) } - logDebug(" + inner classes: " + innerClasses.size) - innerClasses.foreach { c => logDebug(" " + c.getName) } - logDebug(" + outer classes: " + outerClasses.size) - outerClasses.foreach { c => logDebug(" " + c.getName) } - logDebug(" + outer objects: " + outerObjects.size) - outerObjects.foreach { o => logDebug(" " + o) } -} +if(lambdaFunc.isEmpty) { + logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++") + + // A list of classes that represents closures enclosed in the given one + val innerClasses = getInnerClosureClasses(func) + + // A list of enclosing objects and their respective classes, from innermost to outermost + // An outer object at a given index is of type outer class at the same index + val (outerClasses, outerObjects) = getOuterClassesAndObjects(func) + + // For logging purposes only + val declaredFields = func.getClass.getDeclaredFields + val declaredMethods = func.getClass.getDeclaredMethods + + if (log.isDebugEnabled) { +logDebug(" + declared fields: " + declaredFields.size) +declaredFields.foreach { f => logDebug(" " + f) } +logDebug(" + declared methods: " + declaredMethods.size) +declaredMethods.foreach { m => logDebug(" " + m) } +logDebug(" + inner classes: " + innerClasses.size) +innerClasses.foreach { c => logDebug(" " + c.getName) } +logDebug(" + outer classes: " + outerClasses.size) +outerClasses.foreach { c => logDebug(" " + c.getName) } +logDebug(" + outer objects: " + outerObjects.size) +outerObjects.foreach { o => logDebug(" " + o) } + } -// Fail fast if we detect return statements in closures -getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0) - -// If accessed fields is not populated yet, we assume that -// the closure we are trying to clean is the starting one -if (accessedFields.isEmpty) { - logDebug(s" + populating accessed fields because this is the starting closure") - // Initialize accessed fields with the outer classes first - // This step is needed to associate the fields to the correct classes later - initAccessedFields(accessedFields, outerClasses) - - // Populate accessed fields by visiting all fields and methods accessed by this and - // all of its inner closures. If transitive cleaning is enabled, this may recursively - // visits methods that belong to other classes in search of transitively referenced fields. - for (cls <- func.getClass :: innerClasses) { -getClassReader(cls).accept(new FieldAccessFinder(accessedFields, cleanTransitively), 0) + // Fail fast if we detect return statements in closures + getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0) + + // If accessed fields is not populated yet, we assume that + // the closure we are trying to clean is the starting one + if (accessedFields.isEmpty) { +logDebug(s" + populating accessed fields because this is the starting closure") +// Initialize accessed fields with the outer classes first +// This step is needed to associate the fields to the correct classes later +initAccessedFields(accessedFields, outerClasses) + +// Populate accessed fields by visiting all fields and methods accessed by t
[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21930#discussion_r207009194 --- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala --- @@ -218,118 +261,132 @@ private[spark] object ClosureCleaner extends Logging { return } -logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++") - -// A list of classes that represents closures enclosed in the given one -val innerClasses = getInnerClosureClasses(func) - -// A list of enclosing objects and their respective classes, from innermost to outermost -// An outer object at a given index is of type outer class at the same index -val (outerClasses, outerObjects) = getOuterClassesAndObjects(func) - -// For logging purposes only -val declaredFields = func.getClass.getDeclaredFields -val declaredMethods = func.getClass.getDeclaredMethods - -if (log.isDebugEnabled) { - logDebug(" + declared fields: " + declaredFields.size) - declaredFields.foreach { f => logDebug(" " + f) } - logDebug(" + declared methods: " + declaredMethods.size) - declaredMethods.foreach { m => logDebug(" " + m) } - logDebug(" + inner classes: " + innerClasses.size) - innerClasses.foreach { c => logDebug(" " + c.getName) } - logDebug(" + outer classes: " + outerClasses.size) - outerClasses.foreach { c => logDebug(" " + c.getName) } - logDebug(" + outer objects: " + outerObjects.size) - outerObjects.foreach { o => logDebug(" " + o) } -} +if(lambdaFunc.isEmpty) { + logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++") + + // A list of classes that represents closures enclosed in the given one + val innerClasses = getInnerClosureClasses(func) + + // A list of enclosing objects and their respective classes, from innermost to outermost + // An outer object at a given index is of type outer class at the same index + val (outerClasses, outerObjects) = getOuterClassesAndObjects(func) + + // For logging purposes only + val declaredFields = func.getClass.getDeclaredFields + val declaredMethods = func.getClass.getDeclaredMethods + + if (log.isDebugEnabled) { +logDebug(" + declared fields: " + declaredFields.size) +declaredFields.foreach { f => logDebug(" " + f) } +logDebug(" + declared methods: " + declaredMethods.size) +declaredMethods.foreach { m => logDebug(" " + m) } +logDebug(" + inner classes: " + innerClasses.size) +innerClasses.foreach { c => logDebug(" " + c.getName) } +logDebug(" + outer classes: " + outerClasses.size) +outerClasses.foreach { c => logDebug(" " + c.getName) } +logDebug(" + outer objects: " + outerObjects.size) +outerObjects.foreach { o => logDebug(" " + o) } + } -// Fail fast if we detect return statements in closures -getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0) - -// If accessed fields is not populated yet, we assume that -// the closure we are trying to clean is the starting one -if (accessedFields.isEmpty) { - logDebug(s" + populating accessed fields because this is the starting closure") - // Initialize accessed fields with the outer classes first - // This step is needed to associate the fields to the correct classes later - initAccessedFields(accessedFields, outerClasses) - - // Populate accessed fields by visiting all fields and methods accessed by this and - // all of its inner closures. If transitive cleaning is enabled, this may recursively - // visits methods that belong to other classes in search of transitively referenced fields. - for (cls <- func.getClass :: innerClasses) { -getClassReader(cls).accept(new FieldAccessFinder(accessedFields, cleanTransitively), 0) + // Fail fast if we detect return statements in closures + getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0) + + // If accessed fields is not populated yet, we assume that + // the closure we are trying to clean is the starting one + if (accessedFields.isEmpty) { +logDebug(s" + populating accessed fields because this is the starting closure") +// Initialize accessed fields with the outer classes first +// This step is needed to associate the fields to the correct classes later +initAccessedFields(accessedFields, outerClasses) + +// Populate accessed fields by visiting all fields and methods accessed by t
[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21930#discussion_r207009085 --- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala --- @@ -218,118 +261,132 @@ private[spark] object ClosureCleaner extends Logging { return } -logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++") - -// A list of classes that represents closures enclosed in the given one -val innerClasses = getInnerClosureClasses(func) - -// A list of enclosing objects and their respective classes, from innermost to outermost -// An outer object at a given index is of type outer class at the same index -val (outerClasses, outerObjects) = getOuterClassesAndObjects(func) - -// For logging purposes only -val declaredFields = func.getClass.getDeclaredFields -val declaredMethods = func.getClass.getDeclaredMethods - -if (log.isDebugEnabled) { - logDebug(" + declared fields: " + declaredFields.size) - declaredFields.foreach { f => logDebug(" " + f) } - logDebug(" + declared methods: " + declaredMethods.size) - declaredMethods.foreach { m => logDebug(" " + m) } - logDebug(" + inner classes: " + innerClasses.size) - innerClasses.foreach { c => logDebug(" " + c.getName) } - logDebug(" + outer classes: " + outerClasses.size) - outerClasses.foreach { c => logDebug(" " + c.getName) } - logDebug(" + outer objects: " + outerObjects.size) - outerObjects.foreach { o => logDebug(" " + o) } -} +if(lambdaFunc.isEmpty) { + logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++") + + // A list of classes that represents closures enclosed in the given one + val innerClasses = getInnerClosureClasses(func) + + // A list of enclosing objects and their respective classes, from innermost to outermost + // An outer object at a given index is of type outer class at the same index + val (outerClasses, outerObjects) = getOuterClassesAndObjects(func) + + // For logging purposes only + val declaredFields = func.getClass.getDeclaredFields + val declaredMethods = func.getClass.getDeclaredMethods + + if (log.isDebugEnabled) { +logDebug(" + declared fields: " + declaredFields.size) --- End diff -- This stuff pre-existed. The git diff just shows them again because I added that if statement at the top. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21930#discussion_r207008830 --- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala --- @@ -159,6 +160,43 @@ private[spark] object ClosureCleaner extends Logging { clean(closure, checkSerializable, cleanTransitively, Map.empty) } + /** + * Try to get a serialized Lambda from the closure. + * + * @param closure the closure to check. + */ + private def getSerializedLambda(closure: AnyRef): Option[SerializedLambda] = { +if (scala.util.Properties.versionString.contains("2.11")) { + return None +} +val isClosureCandidate = + closure.getClass.isSynthetic && +closure + .getClass + .getInterfaces.exists{x: Class[_] => x.getName.equals("scala.Serializable") } --- End diff -- Need to test, I think I had to use it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21930#discussion_r207008732 --- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala --- @@ -159,6 +160,43 @@ private[spark] object ClosureCleaner extends Logging { clean(closure, checkSerializable, cleanTransitively, Map.empty) } + /** + * Try to get a serialized Lambda from the closure. + * + * @param closure the closure to check. + */ + private def getSerializedLambda(closure: AnyRef): Option[SerializedLambda] = { +if (scala.util.Properties.versionString.contains("2.11")) { --- End diff -- sure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21930#discussion_r206969720 --- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala --- @@ -366,14 +423,26 @@ private[spark] object ClosureCleaner extends Logging { private[spark] class ReturnStatementInClosureException extends SparkException("Return statements aren't allowed in Spark closures") -private class ReturnStatementFinder extends ClassVisitor(ASM5) { +private class ReturnStatementFinder(targetMethodName: Option[String] = None) + extends ClassVisitor(ASM5) { override def visitMethod(access: Int, name: String, desc: String, sig: String, exceptions: Array[String]): MethodVisitor = { + // $anonfun$ covers Java 8 lambdas if (name.contains("apply") || name.contains("$anonfun$")) { + // A method with suffix "$adapted" will be generated in cases like + // { _:Int => return; Seq()} but not { _:Int => return; true} + // closure passed is $anonfun$t$1$adapted while actual code resides in $anonfun$s$1 + // visitor will see only $anonfun$s$1$adapted, so we remove the suffix, see + // https://github.com/scala/scala-dev/issues/109 + val isTargetMethod = if (targetMethodName.isEmpty) { --- End diff -- Simplify to `isTargetMethod = targetMethodName.isEmpty || ... || ...`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21930#discussion_r206969409 --- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala --- @@ -218,118 +261,132 @@ private[spark] object ClosureCleaner extends Logging { return } -logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++") - -// A list of classes that represents closures enclosed in the given one -val innerClasses = getInnerClosureClasses(func) - -// A list of enclosing objects and their respective classes, from innermost to outermost -// An outer object at a given index is of type outer class at the same index -val (outerClasses, outerObjects) = getOuterClassesAndObjects(func) - -// For logging purposes only -val declaredFields = func.getClass.getDeclaredFields -val declaredMethods = func.getClass.getDeclaredMethods - -if (log.isDebugEnabled) { - logDebug(" + declared fields: " + declaredFields.size) - declaredFields.foreach { f => logDebug(" " + f) } - logDebug(" + declared methods: " + declaredMethods.size) - declaredMethods.foreach { m => logDebug(" " + m) } - logDebug(" + inner classes: " + innerClasses.size) - innerClasses.foreach { c => logDebug(" " + c.getName) } - logDebug(" + outer classes: " + outerClasses.size) - outerClasses.foreach { c => logDebug(" " + c.getName) } - logDebug(" + outer objects: " + outerObjects.size) - outerObjects.foreach { o => logDebug(" " + o) } -} +if(lambdaFunc.isEmpty) { + logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++") + + // A list of classes that represents closures enclosed in the given one + val innerClasses = getInnerClosureClasses(func) + + // A list of enclosing objects and their respective classes, from innermost to outermost + // An outer object at a given index is of type outer class at the same index + val (outerClasses, outerObjects) = getOuterClassesAndObjects(func) + + // For logging purposes only + val declaredFields = func.getClass.getDeclaredFields + val declaredMethods = func.getClass.getDeclaredMethods + + if (log.isDebugEnabled) { +logDebug(" + declared fields: " + declaredFields.size) +declaredFields.foreach { f => logDebug(" " + f) } +logDebug(" + declared methods: " + declaredMethods.size) +declaredMethods.foreach { m => logDebug(" " + m) } +logDebug(" + inner classes: " + innerClasses.size) +innerClasses.foreach { c => logDebug(" " + c.getName) } +logDebug(" + outer classes: " + outerClasses.size) +outerClasses.foreach { c => logDebug(" " + c.getName) } +logDebug(" + outer objects: " + outerObjects.size) +outerObjects.foreach { o => logDebug(" " + o) } + } -// Fail fast if we detect return statements in closures -getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0) - -// If accessed fields is not populated yet, we assume that -// the closure we are trying to clean is the starting one -if (accessedFields.isEmpty) { - logDebug(s" + populating accessed fields because this is the starting closure") - // Initialize accessed fields with the outer classes first - // This step is needed to associate the fields to the correct classes later - initAccessedFields(accessedFields, outerClasses) - - // Populate accessed fields by visiting all fields and methods accessed by this and - // all of its inner closures. If transitive cleaning is enabled, this may recursively - // visits methods that belong to other classes in search of transitively referenced fields. - for (cls <- func.getClass :: innerClasses) { -getClassReader(cls).accept(new FieldAccessFinder(accessedFields, cleanTransitively), 0) + // Fail fast if we detect return statements in closures + getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0) + + // If accessed fields is not populated yet, we assume that + // the closure we are trying to clean is the starting one + if (accessedFields.isEmpty) { +logDebug(s" + populating accessed fields because this is the starting closure") +// Initialize accessed fields with the outer classes first +// This step is needed to associate the fields to the correct classes later +initAccessedFields(accessedFields, outerClasses) + +// Populate accessed fields by visiting all fields and methods accessed by t
[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21930#discussion_r206969057 --- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala --- @@ -218,118 +261,132 @@ private[spark] object ClosureCleaner extends Logging { return } -logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++") - -// A list of classes that represents closures enclosed in the given one -val innerClasses = getInnerClosureClasses(func) - -// A list of enclosing objects and their respective classes, from innermost to outermost -// An outer object at a given index is of type outer class at the same index -val (outerClasses, outerObjects) = getOuterClassesAndObjects(func) - -// For logging purposes only -val declaredFields = func.getClass.getDeclaredFields -val declaredMethods = func.getClass.getDeclaredMethods - -if (log.isDebugEnabled) { - logDebug(" + declared fields: " + declaredFields.size) - declaredFields.foreach { f => logDebug(" " + f) } - logDebug(" + declared methods: " + declaredMethods.size) - declaredMethods.foreach { m => logDebug(" " + m) } - logDebug(" + inner classes: " + innerClasses.size) - innerClasses.foreach { c => logDebug(" " + c.getName) } - logDebug(" + outer classes: " + outerClasses.size) - outerClasses.foreach { c => logDebug(" " + c.getName) } - logDebug(" + outer objects: " + outerObjects.size) - outerObjects.foreach { o => logDebug(" " + o) } -} +if(lambdaFunc.isEmpty) { + logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++") + + // A list of classes that represents closures enclosed in the given one + val innerClasses = getInnerClosureClasses(func) + + // A list of enclosing objects and their respective classes, from innermost to outermost + // An outer object at a given index is of type outer class at the same index + val (outerClasses, outerObjects) = getOuterClassesAndObjects(func) + + // For logging purposes only + val declaredFields = func.getClass.getDeclaredFields + val declaredMethods = func.getClass.getDeclaredMethods + + if (log.isDebugEnabled) { +logDebug(" + declared fields: " + declaredFields.size) +declaredFields.foreach { f => logDebug(" " + f) } +logDebug(" + declared methods: " + declaredMethods.size) +declaredMethods.foreach { m => logDebug(" " + m) } +logDebug(" + inner classes: " + innerClasses.size) +innerClasses.foreach { c => logDebug(" " + c.getName) } +logDebug(" + outer classes: " + outerClasses.size) +outerClasses.foreach { c => logDebug(" " + c.getName) } +logDebug(" + outer objects: " + outerObjects.size) +outerObjects.foreach { o => logDebug(" " + o) } + } -// Fail fast if we detect return statements in closures -getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0) - -// If accessed fields is not populated yet, we assume that -// the closure we are trying to clean is the starting one -if (accessedFields.isEmpty) { - logDebug(s" + populating accessed fields because this is the starting closure") - // Initialize accessed fields with the outer classes first - // This step is needed to associate the fields to the correct classes later - initAccessedFields(accessedFields, outerClasses) - - // Populate accessed fields by visiting all fields and methods accessed by this and - // all of its inner closures. If transitive cleaning is enabled, this may recursively - // visits methods that belong to other classes in search of transitively referenced fields. - for (cls <- func.getClass :: innerClasses) { -getClassReader(cls).accept(new FieldAccessFinder(accessedFields, cleanTransitively), 0) + // Fail fast if we detect return statements in closures + getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0) + + // If accessed fields is not populated yet, we assume that + // the closure we are trying to clean is the starting one + if (accessedFields.isEmpty) { +logDebug(s" + populating accessed fields because this is the starting closure") --- End diff -- Doesn't actually need interpolation after all --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21930#discussion_r206968792 --- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala --- @@ -218,118 +261,132 @@ private[spark] object ClosureCleaner extends Logging { return } -logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++") - -// A list of classes that represents closures enclosed in the given one -val innerClasses = getInnerClosureClasses(func) - -// A list of enclosing objects and their respective classes, from innermost to outermost -// An outer object at a given index is of type outer class at the same index -val (outerClasses, outerObjects) = getOuterClassesAndObjects(func) - -// For logging purposes only -val declaredFields = func.getClass.getDeclaredFields -val declaredMethods = func.getClass.getDeclaredMethods - -if (log.isDebugEnabled) { - logDebug(" + declared fields: " + declaredFields.size) - declaredFields.foreach { f => logDebug(" " + f) } - logDebug(" + declared methods: " + declaredMethods.size) - declaredMethods.foreach { m => logDebug(" " + m) } - logDebug(" + inner classes: " + innerClasses.size) - innerClasses.foreach { c => logDebug(" " + c.getName) } - logDebug(" + outer classes: " + outerClasses.size) - outerClasses.foreach { c => logDebug(" " + c.getName) } - logDebug(" + outer objects: " + outerObjects.size) - outerObjects.foreach { o => logDebug(" " + o) } -} +if(lambdaFunc.isEmpty) { --- End diff -- Nit: space after if --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21930#discussion_r206969293 --- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala --- @@ -218,118 +261,132 @@ private[spark] object ClosureCleaner extends Logging { return } -logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++") - -// A list of classes that represents closures enclosed in the given one -val innerClasses = getInnerClosureClasses(func) - -// A list of enclosing objects and their respective classes, from innermost to outermost -// An outer object at a given index is of type outer class at the same index -val (outerClasses, outerObjects) = getOuterClassesAndObjects(func) - -// For logging purposes only -val declaredFields = func.getClass.getDeclaredFields -val declaredMethods = func.getClass.getDeclaredMethods - -if (log.isDebugEnabled) { - logDebug(" + declared fields: " + declaredFields.size) - declaredFields.foreach { f => logDebug(" " + f) } - logDebug(" + declared methods: " + declaredMethods.size) - declaredMethods.foreach { m => logDebug(" " + m) } - logDebug(" + inner classes: " + innerClasses.size) - innerClasses.foreach { c => logDebug(" " + c.getName) } - logDebug(" + outer classes: " + outerClasses.size) - outerClasses.foreach { c => logDebug(" " + c.getName) } - logDebug(" + outer objects: " + outerObjects.size) - outerObjects.foreach { o => logDebug(" " + o) } -} +if(lambdaFunc.isEmpty) { + logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++") + + // A list of classes that represents closures enclosed in the given one + val innerClasses = getInnerClosureClasses(func) + + // A list of enclosing objects and their respective classes, from innermost to outermost + // An outer object at a given index is of type outer class at the same index + val (outerClasses, outerObjects) = getOuterClassesAndObjects(func) + + // For logging purposes only + val declaredFields = func.getClass.getDeclaredFields + val declaredMethods = func.getClass.getDeclaredMethods + + if (log.isDebugEnabled) { +logDebug(" + declared fields: " + declaredFields.size) +declaredFields.foreach { f => logDebug(" " + f) } +logDebug(" + declared methods: " + declaredMethods.size) +declaredMethods.foreach { m => logDebug(" " + m) } +logDebug(" + inner classes: " + innerClasses.size) +innerClasses.foreach { c => logDebug(" " + c.getName) } +logDebug(" + outer classes: " + outerClasses.size) +outerClasses.foreach { c => logDebug(" " + c.getName) } +logDebug(" + outer objects: " + outerObjects.size) +outerObjects.foreach { o => logDebug(" " + o) } + } -// Fail fast if we detect return statements in closures -getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0) - -// If accessed fields is not populated yet, we assume that -// the closure we are trying to clean is the starting one -if (accessedFields.isEmpty) { - logDebug(s" + populating accessed fields because this is the starting closure") - // Initialize accessed fields with the outer classes first - // This step is needed to associate the fields to the correct classes later - initAccessedFields(accessedFields, outerClasses) - - // Populate accessed fields by visiting all fields and methods accessed by this and - // all of its inner closures. If transitive cleaning is enabled, this may recursively - // visits methods that belong to other classes in search of transitively referenced fields. - for (cls <- func.getClass :: innerClasses) { -getClassReader(cls).accept(new FieldAccessFinder(accessedFields, cleanTransitively), 0) + // Fail fast if we detect return statements in closures + getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0) + + // If accessed fields is not populated yet, we assume that + // the closure we are trying to clean is the starting one + if (accessedFields.isEmpty) { +logDebug(s" + populating accessed fields because this is the starting closure") +// Initialize accessed fields with the outer classes first +// This step is needed to associate the fields to the correct classes later +initAccessedFields(accessedFields, outerClasses) + +// Populate accessed fields by visiting all fields and methods accessed by t
[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21930#discussion_r206968600 --- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala --- @@ -159,6 +160,43 @@ private[spark] object ClosureCleaner extends Logging { clean(closure, checkSerializable, cleanTransitively, Map.empty) } + /** + * Try to get a serialized Lambda from the closure. + * + * @param closure the closure to check. + */ + private def getSerializedLambda(closure: AnyRef): Option[SerializedLambda] = { +if (scala.util.Properties.versionString.contains("2.11")) { + return None +} +val isClosureCandidate = + closure.getClass.isSynthetic && +closure + .getClass + .getInterfaces.exists{x: Class[_] => x.getName.equals("scala.Serializable") } --- End diff -- More nits: does `(_.getName.equals...)` not work? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21930#discussion_r206968962 --- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala --- @@ -218,118 +261,132 @@ private[spark] object ClosureCleaner extends Logging { return } -logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++") - -// A list of classes that represents closures enclosed in the given one -val innerClasses = getInnerClosureClasses(func) - -// A list of enclosing objects and their respective classes, from innermost to outermost -// An outer object at a given index is of type outer class at the same index -val (outerClasses, outerObjects) = getOuterClassesAndObjects(func) - -// For logging purposes only -val declaredFields = func.getClass.getDeclaredFields -val declaredMethods = func.getClass.getDeclaredMethods - -if (log.isDebugEnabled) { - logDebug(" + declared fields: " + declaredFields.size) - declaredFields.foreach { f => logDebug(" " + f) } - logDebug(" + declared methods: " + declaredMethods.size) - declaredMethods.foreach { m => logDebug(" " + m) } - logDebug(" + inner classes: " + innerClasses.size) - innerClasses.foreach { c => logDebug(" " + c.getName) } - logDebug(" + outer classes: " + outerClasses.size) - outerClasses.foreach { c => logDebug(" " + c.getName) } - logDebug(" + outer objects: " + outerObjects.size) - outerObjects.foreach { o => logDebug(" " + o) } -} +if(lambdaFunc.isEmpty) { + logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++") + + // A list of classes that represents closures enclosed in the given one + val innerClasses = getInnerClosureClasses(func) + + // A list of enclosing objects and their respective classes, from innermost to outermost + // An outer object at a given index is of type outer class at the same index + val (outerClasses, outerObjects) = getOuterClassesAndObjects(func) + + // For logging purposes only + val declaredFields = func.getClass.getDeclaredFields + val declaredMethods = func.getClass.getDeclaredMethods + + if (log.isDebugEnabled) { +logDebug(" + declared fields: " + declaredFields.size) --- End diff -- How about using interpolation, while bothering to update these debug statements? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21930#discussion_r206968430 --- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala --- @@ -159,6 +160,43 @@ private[spark] object ClosureCleaner extends Logging { clean(closure, checkSerializable, cleanTransitively, Map.empty) } + /** + * Try to get a serialized Lambda from the closure. + * + * @param closure the closure to check. + */ + private def getSerializedLambda(closure: AnyRef): Option[SerializedLambda] = { +if (scala.util.Properties.versionString.contains("2.11")) { --- End diff -- Ah, this part of the diff was collapsed and I didn't see it. I have a few more minor questions/suggestions on the closure cleaner change but would indeed like to get this in for 2.4, and it looks close. Here, what about storing the result of this in a private field so as not to compute it every time? it might not be a big deal, don't know. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21930#discussion_r206968703 --- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala --- @@ -159,6 +160,43 @@ private[spark] object ClosureCleaner extends Logging { clean(closure, checkSerializable, cleanTransitively, Map.empty) } + /** + * Try to get a serialized Lambda from the closure. + * + * @param closure the closure to check. + */ + private def getSerializedLambda(closure: AnyRef): Option[SerializedLambda] = { +if (scala.util.Properties.versionString.contains("2.11")) { + return None +} +val isClosureCandidate = + closure.getClass.isSynthetic && +closure + .getClass + .getInterfaces.exists{x: Class[_] => x.getName.equals("scala.Serializable") } + +if (isClosureCandidate) { + try { +val res = inspect(closure) --- End diff -- Inline res? just little stuff that might streamline this a bit, it doesn't matter --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21930#discussion_r206605419 --- Diff: core/src/main/scala/org/apache/spark/TaskContext.scala --- @@ -123,7 +123,7 @@ abstract class TaskContext extends Serializable { * * Exceptions thrown by the listener will result in failure of the task. */ - def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext = { + def addTaskCompletionListener[U](f: (TaskContext) => U): TaskContext = { --- End diff -- OK, if it's binary- and source-compatible with existing user programs for 2.11 users, that's fine. Bets are off for 2.12 users anyway. When the release notes are crafted for 2.4, we'll want to mention this JIRA (I'll tag it) and issues like this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21930#discussion_r206562302 --- Diff: core/src/main/scala/org/apache/spark/TaskContext.scala --- @@ -123,7 +123,7 @@ abstract class TaskContext extends Serializable { * * Exceptions thrown by the listener will result in failure of the task. */ - def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext = { + def addTaskCompletionListener[U](f: (TaskContext) => U): TaskContext = { --- End diff -- Yes this covers that bug. So if you build with 2.11 you dont need to specify the type Unit (I tried that) when you make the call since there is no ambiguity, compiler does not face an overloading issue. With 2.12 both addTaskCompletionListener methods end up to be SAM types and the Unit adaption causes this issue. I am not sure we can do anything more here. @retronym or @lrytz may add more context. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21930#discussion_r206549150 --- Diff: core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala --- @@ -538,17 +543,22 @@ class ClosureCleanerSuite2 extends SparkFunSuite with BeforeAndAfterAll with Pri // As before, this closure is neither serializable nor cleanable verifyCleaning(inner1, serializableBefore = false, serializableAfter = false) - // This closure is no longer serializable because it now has a pointer to the outer closure, - // which is itself not serializable because it has a pointer to the ClosureCleanerSuite2. - // If we do not clean transitively, we will not null out this indirect reference. - verifyCleaning( -inner2, serializableBefore = false, serializableAfter = false, transitive = false) - - // If we clean transitively, we will find that method `a` does not actually reference the - // outer closure's parent (i.e. the ClosureCleanerSuite), so we can additionally null out - // the outer closure's parent pointer. This will make `inner2` serializable. - verifyCleaning( -inner2, serializableBefore = false, serializableAfter = true, transitive = true) + if(!ClosureCleanerSuite2.supportsLMFs) { --- End diff -- sure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21930#discussion_r206541904 --- Diff: core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala --- @@ -538,17 +543,22 @@ class ClosureCleanerSuite2 extends SparkFunSuite with BeforeAndAfterAll with Pri // As before, this closure is neither serializable nor cleanable verifyCleaning(inner1, serializableBefore = false, serializableAfter = false) - // This closure is no longer serializable because it now has a pointer to the outer closure, - // which is itself not serializable because it has a pointer to the ClosureCleanerSuite2. - // If we do not clean transitively, we will not null out this indirect reference. - verifyCleaning( -inner2, serializableBefore = false, serializableAfter = false, transitive = false) - - // If we clean transitively, we will find that method `a` does not actually reference the - // outer closure's parent (i.e. the ClosureCleanerSuite), so we can additionally null out - // the outer closure's parent pointer. This will make `inner2` serializable. - verifyCleaning( -inner2, serializableBefore = false, serializableAfter = true, transitive = true) + if(!ClosureCleanerSuite2.supportsLMFs) { --- End diff -- Nit: space after "if". scalastyle might flag that. While you're at it, what about flipping the blocks to avoid a negation? just for a tiny bit of clarity. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21930#discussion_r206542154 --- Diff: core/src/main/scala/org/apache/spark/TaskContext.scala --- @@ -123,7 +123,7 @@ abstract class TaskContext extends Serializable { * * Exceptions thrown by the listener will result in failure of the task. */ - def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext = { + def addTaskCompletionListener[U](f: (TaskContext) => U): TaskContext = { --- End diff -- This is to work around https://github.com/scala/bug/issues/11016 right? I'd prefer any solution that doesn't involve changing all the callers, but looks like both workarounds require something to be done. At least I'd document the purpose of U here. That said, user code can call this right? And it would have to implement a similar change to work with 2.12? that's probably OK in the sense that any user app must make several changes to be compatible with 2.12. I don't think 2.11 users would find there is a change to the binary API. Would a 2.11 user need to change its calls to specify a type for U with this change? because it looks like it's not optional, given that Spark code has to change its calls. Is that not a source incompatibility? If it's not, then, I guess I wonder if you can avoid changing all the calls in Spark? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/21930#discussion_r206540368 --- Diff: core/src/main/scala/org/apache/spark/TaskContext.scala --- @@ -123,7 +123,7 @@ abstract class TaskContext extends Serializable { * * Exceptions thrown by the listener will result in failure of the task. */ - def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext = { + def addTaskCompletionListener[U](f: (TaskContext) => U): TaskContext = { --- End diff -- Do we need to change this? I don't think it is a problem binary compatibility wise, but it seems a but weird since we don't use the result of the function. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...
GitHub user skonto opened a pull request: https://github.com/apache/spark/pull/21930 [SPARK-14540][Core] Fix remaining major issues for Scala 2.12 Support ## What changes were proposed in this pull request? This PR addresses issues 2,3 in the [document](https://docs.google.com/document/d/1fbkjEL878witxVQpOCbjlvOvadHtVjYXeB-2mgzDTvk). * We modified the closure cleaner to identify closures that are implemented via the LambdaMetaFactory mechanism (serializedLambdas) (issue2). * We also fix the issue: scala/bug#11016. There are two options for solving the Unit issue, either add () at the end of the closure or use the trick described in the doc. Otherwise overloading resolution does not work (we are not going to eliminate either of the methods) here. Compiler tries to adapt to Unit and makes these two methods candidates for overloading, when there is polymorphic overloading there is no ambiguity (that is the workaround implemented). This does not look that good but it serves its purpose as we need to support two different uses for method: `addTaskFailureListener`. One that passes a TaskCompletionListener and one that passes a closure that is wrapped with a TaskCompletionListener later on (issue3). Note: regarding issue 1 the plan is: > Do Nothing. Donât try to fix this as this is only a problem for Java users who would want to use 2.11 binaries. In that case they can cast to MapFunction to be able to utilize lambdas. In Spark 3.0.0 the API should be simplified so that this issue is removed. ## How was this patch tested? This was manually tested: ```./dev/change-scala-version.sh 2.12 ./build/mvn -DskipTests -Pscala-2.12 clean package ./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.serializer.ProactiveClosureSerializationSuite -Dtest=None ./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.util.ClosureCleanerSuite -Dtest=None ./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.streaming.DStreamClosureSuite -Dtest=None``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/skonto/spark scala2.12-sup Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21930.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21930 commit d466a9c2529e5fe1a680b6bab6b80fd57bef8c35 Author: Stavros Kontopoulos Date: 2018-07-30T22:51:14Z initial --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org