This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 9440590a909 [SPARK-38948][TESTS] Fix `DiskRowQueue` leak in `PythonForeachWriterSuite` 9440590a909 is described below commit 9440590a909d9222db838426c8e528ddec90e196 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Mon Apr 25 09:53:02 2022 +0900 [SPARK-38948][TESTS] Fix `DiskRowQueue` leak in `PythonForeachWriterSuite` ### What changes were proposed in this pull request? This pr add `try-finally` for `run` method of `BufferTester.thread` and call `buffer.close()` in the `finally` block to ensure the resources held by `BufferTester.buffer` are released. Before this pr, there will be an `DiskRowQueue` resource hold by `BufferTester.buffer` not closed. ### Why are the changes needed? Minor fix of UT. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GA. Closes #36261 from LuciferYang/SPARK-38948. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../sql/execution/python/PythonForeachWriterSuite.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala index 61c9782bd17..02d6ff87f89 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonForeachWriterSuite.scala @@ -102,11 +102,15 @@ class PythonForeachWriterSuite extends SparkFunSuite with Eventually with Mockit private val intProj = UnsafeProjection.create(Array[DataType](IntegerType)) private val thread = new Thread() { override def run(): Unit = { - while (iterator.hasNext) { - outputBuffer.synchronized { - outputBuffer += iterator.next().getInt(0) + try { + while (iterator.hasNext) { + outputBuffer.synchronized { + outputBuffer += iterator.next().getInt(0) + } + Thread.sleep(sleepPerRowReadMs) } - Thread.sleep(sleepPerRowReadMs) + } finally { + buffer.close() } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org