This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 59b6a71 [SPARK-31952][SQL] Fix incorrect memory spill metric when doing Aggregate 59b6a71 is described below commit 59b6a71a960b8965cb77b0bb7c885477736446a3 Author: yi.wu <yi...@databricks.com> AuthorDate: Mon Jan 11 07:15:28 2021 +0000 [SPARK-31952][SQL] Fix incorrect memory spill metric when doing Aggregate ### What changes were proposed in this pull request? This PR takes over https://github.com/apache/spark/pull/28780. 1. Counted the spilled memory size when creating the `UnsafeExternalSorter` with the existing `InMemorySorter` 2. Accumulate the `totalSpillBytes` when merging two `UnsafeExternalSorter` ### Why are the changes needed? As mentioned in https://github.com/apache/spark/pull/28780: > It happends when hash aggregate downgrades to sort based aggregate. `UnsafeExternalSorter.createWithExistingInMemorySorter` calls spill on an `InMemorySorter` immediately, but the memory pointed by `InMemorySorter` is acquired by outside `BytesToBytesMap`, instead the allocatedPages in `UnsafeExternalSorter`. So the memory spill bytes metric is always 0, but disk bytes spill metric is right. Besides, this PR also fixes the `UnsafeExternalSorter.merge` by accumulating the `totalSpillBytes` of two sorters. Thus, we can report the correct spilled size in `HashAggregateExec.finishAggregate`. Issues can be reproduced by the following step by checking the SQL metrics in UI: ``` bin/spark-shell --driver-memory 512m --executor-memory 512m --executor-cores 1 --conf "spark.default.parallelism=1" scala> sql("select id, count(1) from range(10000000) group by id").write.csv("/tmp/result.json") ``` Before: <img width="200" alt="WeChatfe5146180d91015e03b9a27852e9a443" src="https://user-images.githubusercontent.com/16397174/103625414-e6fc6280-4f75-11eb-8b93-c55095bdb5b8.png"> After: <img width="200" alt="WeChat42ab0e73c5fbc3b14c12ab85d232071d" src="https://user-images.githubusercontent.com/16397174/103625420-e8c62600-4f75-11eb-8e1f-6f5e8ab561b9.png"> ### Does this PR introduce _any_ user-facing change? Yes, users can see the correct spill metrics after this PR. ### How was this patch tested? Tested manually and added UTs. Closes #31035 from Ngone51/SPARK-31952. Lead-authored-by: yi.wu <yi...@databricks.com> Co-authored-by: wangguangxin.cn <wangguangxin...@bytedance.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 4afca0f706504ac22e956c498538023dc64b0413) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../unsafe/sort/UnsafeExternalSorter.java | 6 +- .../sql/execution/UnsafeKVExternalSorter.java | 3 +- .../execution/UnsafeKVExternalSorterSuite.scala | 95 ++++++++++++++++++---- 3 files changed, 86 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index dda8ed4..c38327c 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -104,11 +104,14 @@ public final class UnsafeExternalSorter extends MemoryConsumer { int initialSize, long pageSizeBytes, int numElementsForSpillThreshold, - UnsafeInMemorySorter inMemorySorter) throws IOException { + UnsafeInMemorySorter inMemorySorter, + long existingMemoryConsumption) throws IOException { UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, blockManager, serializerManager, taskContext, recordComparatorSupplier, prefixComparator, initialSize, pageSizeBytes, numElementsForSpillThreshold, inMemorySorter, false /* ignored */); sorter.spill(Long.MAX_VALUE, sorter); + taskContext.taskMetrics().incMemoryBytesSpilled(existingMemoryConsumption); + sorter.totalSpillBytes += existingMemoryConsumption; // The external sorter will be used to insert records, in-memory sorter is not needed. sorter.inMemSorter = null; return sorter; @@ -496,6 +499,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { */ public void merge(UnsafeExternalSorter other) throws IOException { other.spill(); + totalSpillBytes += other.totalSpillBytes; spillWriters.addAll(other.spillWriters); // remove them from `spillWriters`, or the files will be deleted in `cleanupResources`. other.spillWriters.clear(); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index 7a9f61a..42ceebc 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -165,7 +165,8 @@ public final class UnsafeKVExternalSorter { (int) (long) SparkEnv.get().conf().get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()), pageSizeBytes, numElementsForSpillThreshold, - inMemSorter); + inMemSorter, + map.getTotalMemoryConsumption()); // reset the map, so we can re-use it to insert new records. the inMemSorter will not used // anymore, so the underline array could be used by map again. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala index 8aa003a..f630cd8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala @@ -210,23 +210,8 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSparkSession test("SPARK-23376: Create UnsafeKVExternalSorter with BytesToByteMap having duplicated keys") { val memoryManager = new TestMemoryManager(new SparkConf()) val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) - val map = new BytesToBytesMap(taskMemoryManager, 64, taskMemoryManager.pageSizeBytes()) - - // Key/value are a unsafe rows with a single int column + val map = createBytesToBytesMapWithDuplicateKeys(taskMemoryManager) val schema = new StructType().add("i", IntegerType) - val key = new UnsafeRow(1) - key.pointTo(new Array[Byte](32), 32) - key.setInt(0, 1) - val value = new UnsafeRow(1) - value.pointTo(new Array[Byte](32), 32) - value.setInt(0, 2) - - for (_ <- 1 to 65) { - val loc = map.lookup(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes) - loc.append( - key.getBaseObject, key.getBaseOffset, key.getSizeInBytes, - value.getBaseObject, value.getBaseOffset, value.getSizeInBytes) - } // Make sure we can successfully create a UnsafeKVExternalSorter with a `BytesToBytesMap` // which has duplicated keys and the number of entries exceeds its capacity. @@ -245,4 +230,82 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSparkSession TaskContext.unset() } } + + test("SPARK-31952: create UnsafeKVExternalSorter with existing map should count spilled memory " + + "size correctly") { + val memoryManager = new TestMemoryManager(new SparkConf()) + val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) + val map = createBytesToBytesMapWithDuplicateKeys(taskMemoryManager) + val schema = new StructType().add("i", IntegerType) + + try { + val context = new TaskContextImpl(0, 0, 0, 0, 0, taskMemoryManager, new Properties(), null) + TaskContext.setTaskContext(context) + val expectedSpillSize = map.getTotalMemoryConsumption + val sorter = new UnsafeKVExternalSorter( + schema, + schema, + sparkContext.env.blockManager, + sparkContext.env.serializerManager, + taskMemoryManager.pageSizeBytes(), + Int.MaxValue, + map) + assert(sorter.getSpillSize === expectedSpillSize) + } finally { + TaskContext.unset() + } + } + + test("SPARK-31952: UnsafeKVExternalSorter.merge should accumulate totalSpillBytes") { + val memoryManager = new TestMemoryManager(new SparkConf()) + val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) + val map1 = createBytesToBytesMapWithDuplicateKeys(taskMemoryManager) + val map2 = createBytesToBytesMapWithDuplicateKeys(taskMemoryManager) + val schema = new StructType().add("i", IntegerType) + + try { + val context = new TaskContextImpl(0, 0, 0, 0, 0, taskMemoryManager, new Properties(), null) + TaskContext.setTaskContext(context) + val expectedSpillSize = map1.getTotalMemoryConsumption + map2.getTotalMemoryConsumption + val sorter1 = new UnsafeKVExternalSorter( + schema, + schema, + sparkContext.env.blockManager, + sparkContext.env.serializerManager, + taskMemoryManager.pageSizeBytes(), + Int.MaxValue, + map1) + val sorter2 = new UnsafeKVExternalSorter( + schema, + schema, + sparkContext.env.blockManager, + sparkContext.env.serializerManager, + taskMemoryManager.pageSizeBytes(), + Int.MaxValue, + map2) + sorter1.merge(sorter2) + assert(sorter1.getSpillSize === expectedSpillSize) + } finally { + TaskContext.unset() + } + } + + private def createBytesToBytesMapWithDuplicateKeys(taskMemoryManager: TaskMemoryManager) + : BytesToBytesMap = { + val map = new BytesToBytesMap(taskMemoryManager, 64, taskMemoryManager.pageSizeBytes()) + // Key/value are a unsafe rows with a single int column + val key = new UnsafeRow(1) + key.pointTo(new Array[Byte](32), 32) + key.setInt(0, 1) + val value = new UnsafeRow(1) + value.pointTo(new Array[Byte](32), 32) + value.setInt(0, 2) + for (_ <- 1 to 65) { + val loc = map.lookup(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes) + loc.append( + key.getBaseObject, key.getBaseOffset, key.getSizeInBytes, + value.getBaseObject, value.getBaseOffset, value.getSizeInBytes) + } + map + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org