Repository: spark Updated Branches: refs/heads/branch-2.2 6b4ec22e3 -> 8b7f72ed3
[SPARK-19644][SQL] Clean up Scala reflection garbage after creating Encoder (branch-2.2) ## What changes were proposed in this pull request? Backport #19687 to branch-2.2. The major difference is `cleanUpReflectionObjects` is protected by `ScalaReflectionLock.synchronized` in this PR for Scala 2.10. ## How was this patch tested? Jenkins Author: Shixiong Zhu <zsxw...@gmail.com> Closes #19718 from zsxwing/SPARK-19644-2.2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b7f72ed Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b7f72ed Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b7f72ed Branch: refs/heads/branch-2.2 Commit: 8b7f72ed37dac0daf5158a7f96b38fb1eab1d676 Parents: 6b4ec22 Author: Shixiong Zhu <zsxw...@gmail.com> Authored: Fri Nov 10 14:14:47 2017 -0800 Committer: Shixiong Zhu <zsxw...@gmail.com> Committed: Fri Nov 10 14:14:47 2017 -0800 ---------------------------------------------------------------------- .../spark/sql/catalyst/ScalaReflection.scala | 28 ++++++++++++---- .../encoders/ExpressionEncoderSuite.scala | 35 ++++++++++++++++++-- 2 files changed, 54 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8b7f72ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 3b3d566..ad21842 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -62,7 +62,7 @@ object ScalaReflection extends ScalaReflection { */ def dataTypeFor[T : TypeTag]: DataType = dataTypeFor(localTypeOf[T]) - private def dataTypeFor(tpe: `Type`): DataType = ScalaReflectionLock.synchronized { + private def dataTypeFor(tpe: `Type`): DataType = cleanUpReflectionObjects { tpe match { case t if t <:< definitions.IntTpe => IntegerType case t if t <:< definitions.LongTpe => LongType @@ -92,7 +92,7 @@ object ScalaReflection extends ScalaReflection { * Array[T]. Special handling is performed for primitive types to map them back to their raw * JVM form instead of the Scala Array that handles auto boxing. */ - private def arrayClassFor(tpe: `Type`): ObjectType = ScalaReflectionLock.synchronized { + private def arrayClassFor(tpe: `Type`): ObjectType = cleanUpReflectionObjects { val cls = tpe match { case t if t <:< definitions.IntTpe => classOf[Array[Int]] case t if t <:< definitions.LongTpe => classOf[Array[Long]] @@ -145,7 +145,7 @@ object ScalaReflection extends ScalaReflection { private def deserializerFor( tpe: `Type`, path: Option[Expression], - walkedTypePath: Seq[String]): Expression = ScalaReflectionLock.synchronized { + walkedTypePath: Seq[String]): Expression = cleanUpReflectionObjects { /** Returns the current path with a sub-field extracted. */ def addToPath(part: String, dataType: DataType, walkedTypePath: Seq[String]): Expression = { @@ -452,7 +452,7 @@ object ScalaReflection extends ScalaReflection { inputObject: Expression, tpe: `Type`, walkedTypePath: Seq[String], - seenTypeSet: Set[`Type`] = Set.empty): Expression = ScalaReflectionLock.synchronized { + seenTypeSet: Set[`Type`] = Set.empty): Expression = cleanUpReflectionObjects { def toCatalystArray(input: Expression, elementType: `Type`): Expression = { dataTypeFor(elementType) match { @@ -638,7 +638,7 @@ object ScalaReflection extends ScalaReflection { * Returns true if the given type is option of product type, e.g. `Option[Tuple2]`. Note that, * we also treat [[DefinedByConstructorParams]] as product type. */ - def optionOfProductType(tpe: `Type`): Boolean = ScalaReflectionLock.synchronized { + def optionOfProductType(tpe: `Type`): Boolean = cleanUpReflectionObjects { tpe match { case t if t <:< localTypeOf[Option[_]] => val TypeRef(_, _, Seq(optType)) = t @@ -700,7 +700,7 @@ object ScalaReflection extends ScalaReflection { def schemaFor[T: TypeTag]: Schema = schemaFor(localTypeOf[T]) /** Returns a catalyst DataType and its nullability for the given Scala Type using reflection. */ - def schemaFor(tpe: `Type`): Schema = ScalaReflectionLock.synchronized { + def schemaFor(tpe: `Type`): Schema = cleanUpReflectionObjects { tpe match { case t if t.typeSymbol.annotations.exists(_.tpe =:= typeOf[SQLUserDefinedType]) => val udt = getClassFromType(t).getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance() @@ -766,7 +766,7 @@ object ScalaReflection extends ScalaReflection { /** * Whether the fields of the given type is defined entirely by its constructor parameters. */ - def definedByConstructorParams(tpe: Type): Boolean = { + def definedByConstructorParams(tpe: Type): Boolean = cleanUpReflectionObjects { tpe <:< localTypeOf[Product] || tpe <:< localTypeOf[DefinedByConstructorParams] } @@ -796,6 +796,20 @@ trait ScalaReflection { import scala.collection.Map /** + * Any codes calling `scala.reflect.api.Types.TypeApi.<:<` should be wrapped by this method to + * clean up the Scala reflection garbage automatically. Otherwise, it will leak some objects to + * `scala.reflect.runtime.JavaUniverse.undoLog`. + * + * This method will also wrap `func` with `ScalaReflectionLock.synchronized` so the caller doesn't + * need to call it again. + * + * @see https://github.com/scala/bug/issues/8302 + */ + def cleanUpReflectionObjects[T](func: => T): T = ScalaReflectionLock.synchronized { + universe.asInstanceOf[scala.reflect.runtime.JavaUniverse].undoLog.undo(func) + } + + /** * Return the Scala Type for `T` in the current classloader mirror. * * Use this method instead of the convenience method `universe.typeOf`, which http://git-wip-us.apache.org/repos/asf/spark/blob/8b7f72ed/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala index bb1955a..e6d09bd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project} import org.apache.spark.sql.catalyst.util.ArrayData import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.ClosureCleaner case class RepeatedStruct(s: Seq[PrimitiveData]) @@ -114,7 +115,9 @@ object ReferenceValueClass { class ExpressionEncoderSuite extends PlanTest with AnalysisTest { OuterScopes.addOuterScope(this) - implicit def encoder[T : TypeTag]: ExpressionEncoder[T] = ExpressionEncoder() + implicit def encoder[T : TypeTag]: ExpressionEncoder[T] = verifyNotLeakingReflectionObjects { + ExpressionEncoder() + } // test flat encoders encodeDecodeTest(false, "primitive boolean") @@ -370,8 +373,12 @@ class ExpressionEncoderSuite extends PlanTest with AnalysisTest { private def encodeDecodeTest[T : ExpressionEncoder]( input: T, testName: String): Unit = { - test(s"encode/decode for $testName: $input") { + testAndVerifyNotLeakingReflectionObjects(s"encode/decode for $testName: $input") { val encoder = implicitly[ExpressionEncoder[T]] + + // Make sure encoder is serializable. + ClosureCleaner.clean((s: String) => encoder.getClass.getName) + val row = encoder.toRow(input) val schema = encoder.schema.toAttributes val boundEncoder = encoder.resolveAndBind() @@ -441,4 +448,28 @@ class ExpressionEncoderSuite extends PlanTest with AnalysisTest { } } } + + /** + * Verify the size of scala.reflect.runtime.JavaUniverse.undoLog before and after `func` to + * ensure we don't leak Scala reflection garbage. + * + * @see org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects + */ + private def verifyNotLeakingReflectionObjects[T](func: => T): T = { + def undoLogSize: Int = { + scala.reflect.runtime.universe + .asInstanceOf[scala.reflect.runtime.JavaUniverse].undoLog.log.size + } + + val previousUndoLogSize = undoLogSize + val r = func + assert(previousUndoLogSize == undoLogSize) + r + } + + private def testAndVerifyNotLeakingReflectionObjects(testName: String)(testFun: => Any) { + test(testName) { + verifyNotLeakingReflectionObjects(testFun) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org