xuanyuanking commented on a change in pull request #26164: [SPARK-21492][SQL] 
Fix memory leak in SortMergeJoin
URL: https://github.com/apache/spark/pull/26164#discussion_r336720473
 
 

 ##########
 File path: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
 ##########
 @@ -1040,3 +1044,49 @@ class JoinSuite extends QueryTest with 
SharedSparkSession {
     checkAnswer(df, Row(1, 2, 1, 2) :: Nil)
   }
 }
+
+class JoinWithResourceCleanSuite extends JoinSuite with BeforeAndAfterAll {
+  import testImplicits._
+  import scala.collection.mutable.ArrayBuffer
+
+  private def checkCleanupResourceTriggered(plan: SparkPlan) : 
ArrayBuffer[SortExec] = {
+    // Check cleanupResources are finally triggered in SortExec node
+    val sorts = new ArrayBuffer[SortExec]()
+    plan.foreachUp {
+      case s: SortExec => sorts += s
+      case _ =>
+    }
+    sorts.foreach { sort =>
+      val sortExec = spy(sort)
+      verify(sortExec, atLeastOnce).cleanupResources()
+      verify(sortExec.rowSorter, atLeastOnce).cleanupResources()
+    }
+    sorts
+  }
+
+  override def checkAnswer(df: => DataFrame, rows: Seq[Row]): Unit = {
+    withSQLConf(
+      SQLConf.SORT_MERGE_JOIN_EXEC_EAGER_CLEANUP_RESOURCES.key -> "true") {
+      checkCleanupResourceTriggered(df.queryExecution.sparkPlan)
+      super.checkAnswer(df, rows)
+    }
+  }
+
+  test("cleanupResource in code generation") {
+    withSQLConf(
+      SQLConf.SORT_MERGE_JOIN_EXEC_EAGER_CLEANUP_RESOURCES.key -> "true",
+      SQLConf.SHUFFLE_PARTITIONS.key -> "1",
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+      val df1 = spark.range(0, 10, 1, 2)
+      val df2 = spark.range(10).select($"id".as("b1"), (- $"id").as("b2"))
+      val res = df1.join(df2, $"id" === $"b1" && $"id" === 
$"b2").select($"b1", $"b2", $"id")
+
+      val sorts = checkCleanupResourceTriggered(res.queryExecution.sparkPlan)
 
 Review comment:
   Yeah, the first call is also want to collect all the SortExec to make sure 
code generation takes effect.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to