This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new ed842ba8e82 [SPARK-39932][SQL] WindowExec should clear the final partition buffer ed842ba8e82 is described below commit ed842ba8e82e845efc38bd115909ce54faef318a Author: ulysses-you <ulyssesyo...@gmail.com> AuthorDate: Tue Aug 2 18:05:48 2022 +0900 [SPARK-39932][SQL] WindowExec should clear the final partition buffer ### What changes were proposed in this pull request? Explicitly clear final partition buffer if can not find next in `WindowExec`. The same fix in `WindowInPandasExec` ### Why are the changes needed? We do a repartition after a window, then we need do a local sort after window due to RoundRobinPartitioning shuffle. The error stack: ```java ExternalAppendOnlyUnsafeRowArray INFO - Reached spill threshold of 4096 rows, switching to org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of memory, got 0 at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157) at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:97) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:352) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:435) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:455) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.$anonfun$prepareShuffleDependency$10(ShuffleExchangeExec.scala:355) ``` `WindowExec` only clear buffer in `fetchNextPartition` so the final partition buffer miss to clear. It is not a big problem since we have task completion listener. ```scala taskContext.addTaskCompletionListener(context -> { cleanupResources(); }); ``` This bug only affects if the window is not the last operator for this task and the follow operator like sort. ### Does this PR introduce _any_ user-facing change? yes, bug fix ### How was this patch tested? N/A Closes #37358 from ulysses-you/window. Authored-by: ulysses-you <ulyssesyo...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 1fac870126c289a7ec75f45b6b61c93b9a4965d4) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../apache/spark/sql/execution/python/WindowInPandasExec.scala | 10 ++++++++-- .../org/apache/spark/sql/execution/window/WindowExec.scala | 10 ++++++++-- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala index 983fe9db738..a87024cfcd7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala @@ -353,8 +353,14 @@ case class WindowInPandasExec( // Iteration var rowIndex = 0 - override final def hasNext: Boolean = - (bufferIterator != null && bufferIterator.hasNext) || nextRowAvailable + override final def hasNext: Boolean = { + val found = (bufferIterator != null && bufferIterator.hasNext) || nextRowAvailable + if (!found) { + // clear final partition + buffer.clear() + } + found + } override final def next(): Iterator[UnsafeRow] = { // Load the next partition if we need to. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala index 6e0e36cbe59..963decb4cf4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala @@ -178,8 +178,14 @@ case class WindowExec( // Iteration var rowIndex = 0 - override final def hasNext: Boolean = - (bufferIterator != null && bufferIterator.hasNext) || nextRowAvailable + override final def hasNext: Boolean = { + val found = (bufferIterator != null && bufferIterator.hasNext) || nextRowAvailable + if (!found) { + // clear final partition + buffer.clear() + } + found + } val join = new JoinedRow override final def next(): InternalRow = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org