This is an automated email from the ASF dual-hosted git repository. yumwang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b6aea1a8d99 [SPARK-39325][CORE] Improve MapOutputTracker convertMapStatuses performance b6aea1a8d99 is described below commit b6aea1a8d99b3d99e91f7f195b23169d3d61b6a7 Author: Kun Wan <wan...@apache.org> AuthorDate: Sat Jun 11 18:16:43 2022 +0800 [SPARK-39325][CORE] Improve MapOutputTracker convertMapStatuses performance ### What changes were proposed in this pull request? Optimize `MapOutputTracker.convertMapStatuses()` method. ### Why are the changes needed? `MapOutputTracker.convertMapStatuses()` will be very slow if there are tens of thousands MapStatuses and MergeStatuses. Benchmark code: ```java val benchmark = new Benchmark("MapStatuses Convert", 1, output = output) val blockManagerNumber = 1000 val mapNumber = 50000 val shufflePartitions = 10000 val shuffleId: Int = 0 // First reduce task will fetch map data from startPartition to endPartition val startPartition = 0 val startMapIndex = 0 val endMapIndex = mapNumber val blockManagers = Array.tabulate(blockManagerNumber) { i => BlockManagerId("a", "host" + i, 7337) } val mapStatuses: Array[MapStatus] = Array.tabulate(mapNumber) { mapTaskId => HighlyCompressedMapStatus( blockManagers(mapTaskId % blockManagerNumber), Array.tabulate(shufflePartitions)(i => if (i % 50 == 0) 1 else 0), mapTaskId) } val bitmap = new RoaringBitmap() Range(0, 4000).foreach(bitmap.add(_)) val mergeStatuses = Array.tabulate(shufflePartitions) { part => MergeStatus(blockManagers(part % blockManagerNumber), shuffleId, bitmap, 100) } Array(499, 999, 1499).foreach { endPartition => benchmark.addCase( s"Num Maps: $mapNumber Fetch partitions:${endPartition - startPartition + 1}", numIters) { _ => MapOutputTracker.convertMapStatuses( shuffleId, startPartition, endPartition, mapStatuses, startMapIndex, endMapIndex, Some(mergeStatuses)) } } ``` Before this PR ``` ================================================================================================ MapStatuses Convert Benchmark ================================================================================================ Java HotSpot(TM) 64-Bit Server VM 1.8.0_281-b09 on Mac OS X 10.15.7 Intel(R) Core(TM) i9-9980HK CPU 2.40GHz MapStatuses Convert: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ Num Maps: 50000 Fetch partitions:500 3393 3483 96 0.0 3393439257.0 1.0X Num Maps: 50000 Fetch partitions:1000 6640 6772 121 0.0 6639654832.0 0.5X Num Maps: 50000 Fetch partitions:1500 10035 10143 108 0.0 10035100069.0 0.3X ``` After this PR ``` ================================================================================================ MapStatuses Convert Benchmark ================================================================================================ Java HotSpot(TM) 64-Bit Server VM 1.8.0_281-b09 on Mac OS X 10.15.7 Intel(R) Core(TM) i9-9980HK CPU 2.40GHz MapStatuses Convert: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ Num Maps: 50000 Fetch partitions:500 667 679 15 0.0 666562302.0 1.0X Num Maps: 50000 Fetch partitions:1000 1285 1397 115 0.0 1284808865.0 0.5X Num Maps: 50000 Fetch partitions:1500 2045 2068 32 0.0 2044951906.0 0.3X ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Exists UTs. Closes #36709 from wankunde/convert_mapstatus. Lead-authored-by: Kun Wan <wan...@apache.org> Co-authored-by: wankun <wanku...@163.com> Signed-off-by: Yuming Wang <yumw...@ebay.com> --- .../MapStatusesConvertBenchmark-results.txt | 13 +++ .../scala/org/apache/spark/MapOutputTracker.scala | 50 ++++++------ .../org/apache/spark/scheduler/MergeStatus.scala | 9 --- .../apache/spark/MapStatusesConvertBenchmark.scala | 92 ++++++++++++++++++++++ 4 files changed, 130 insertions(+), 34 deletions(-) diff --git a/core/benchmarks/MapStatusesConvertBenchmark-results.txt b/core/benchmarks/MapStatusesConvertBenchmark-results.txt new file mode 100644 index 00000000000..f41401bbe2e --- /dev/null +++ b/core/benchmarks/MapStatusesConvertBenchmark-results.txt @@ -0,0 +1,13 @@ +================================================================================================ +MapStatuses Convert Benchmark +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1025-azure +Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz +MapStatuses Convert: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Num Maps: 50000 Fetch partitions:500 1330 1359 26 0.0 1329827185.0 1.0X +Num Maps: 50000 Fetch partitions:1000 2648 2666 20 0.0 2647944453.0 0.5X +Num Maps: 50000 Fetch partitions:1500 4155 4436 383 0.0 4154563448.0 0.3X + + diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index e6ed469250b..79cae483b22 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -1596,7 +1596,7 @@ private[spark] object MapOutputTracker extends Logging { mapStatuses: Array[MapStatus], startMapIndex : Int, endMapIndex: Int, - mergeStatuses: Option[Array[MergeStatus]] = None): MapSizesByExecutorId = { + mergeStatusesOpt: Option[Array[MergeStatus]] = None): MapSizesByExecutorId = { assert (mapStatuses != null) val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long, Int)]] var enableBatchFetch = true @@ -1608,39 +1608,39 @@ private[spark] object MapOutputTracker extends Logging { // TODO: SPARK-35036: Instead of reading map blocks in case of AQE with Push based shuffle, // TODO: improve push based shuffle to read partial merged blocks satisfying the start/end // TODO: map indexes - if (mergeStatuses.exists(_.exists(_ != null)) && startMapIndex == 0 + if (mergeStatusesOpt.exists(_.exists(_ != null)) && startMapIndex == 0 && endMapIndex == mapStatuses.length) { enableBatchFetch = false logDebug(s"Disable shuffle batch fetch as Push based shuffle is enabled for $shuffleId.") - // We have MergeStatus and full range of mapIds are requested so return a merged block. - val numMaps = mapStatuses.length - mergeStatuses.get.zipWithIndex.slice(startPartition, endPartition).foreach { - case (mergeStatus, partId) => - val remainingMapStatuses = if (mergeStatus != null && mergeStatus.totalSize > 0) { - // If MergeStatus is available for the given partition, add location of the - // pre-merged shuffle partition for this partition ID. Here we create a - // ShuffleMergedBlockId to indicate this is a merged shuffle block. - splitsByAddress.getOrElseUpdate(mergeStatus.location, ListBuffer()) += - ((ShuffleMergedBlockId(shuffleId, mergeStatus.shuffleMergeId, partId), - mergeStatus.totalSize, SHUFFLE_PUSH_MAP_ID)) - // For the "holes" in this pre-merged shuffle partition, i.e., unmerged mapper - // shuffle partition blocks, fetch the original map produced shuffle partition blocks - val mapStatusesWithIndex = mapStatuses.zipWithIndex - mergeStatus.getMissingMaps(numMaps).map(mapStatusesWithIndex) - } else { - // If MergeStatus is not available for the given partition, fall back to - // fetching all the original mapper shuffle partition blocks - mapStatuses.zipWithIndex.toSeq - } - // Add location for the mapper shuffle partition blocks - for ((mapStatus, mapIndex) <- remainingMapStatuses) { - validateStatus(mapStatus, shuffleId, partId) + val mergeStatuses = mergeStatusesOpt.get + for (partId <- startPartition until endPartition) { + val mergeStatus = mergeStatuses(partId) + if (mergeStatus != null && mergeStatus.totalSize > 0) { + // If MergeStatus is available for the given partition, add location of the + // pre-merged shuffle partition for this partition ID. Here we create a + // ShuffleMergedBlockId to indicate this is a merged shuffle block. + splitsByAddress.getOrElseUpdate(mergeStatus.location, ListBuffer()) += + ((ShuffleMergedBlockId(shuffleId, mergeStatus.shuffleMergeId, partId), + mergeStatus.totalSize, SHUFFLE_PUSH_MAP_ID)) + } + } + + // Add location for the mapper shuffle partition blocks + for ((mapStatus, mapIndex) <- mapStatuses.iterator.zipWithIndex) { + validateStatus(mapStatus, shuffleId, startPartition) + for (partId <- startPartition until endPartition) { + // For the "holes" in this pre-merged shuffle partition, i.e., unmerged mapper + // shuffle partition blocks, fetch the original map produced shuffle partition blocks + val mergeStatus = mergeStatuses(partId) + if (mergeStatus == null || mergeStatus.totalSize == 0 || + !mergeStatus.tracker.contains(mapIndex)) { val size = mapStatus.getSizeForBlock(partId) if (size != 0) { splitsByAddress.getOrElseUpdate(mapStatus.location, ListBuffer()) += ((ShuffleBlockId(shuffleId, mapStatus.mapId, partId), size, mapIndex)) } } + } } } else { val iter = mapStatuses.iterator.zipWithIndex diff --git a/core/src/main/scala/org/apache/spark/scheduler/MergeStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MergeStatus.scala index 6d160264538..850756b50a3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MergeStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MergeStatus.scala @@ -58,15 +58,6 @@ private[spark] class MergeStatus( def tracker: RoaringBitmap = mapTracker - /** - * Get the list of mapper IDs for missing mapper partition blocks that are not merged. - * The reducer will use this information to decide which shuffle partition blocks to - * fetch in the original way. - */ - def getMissingMaps(numMaps: Int): Seq[Int] = { - (0 until numMaps).filter(i => !mapTracker.contains(i)) - } - /** * Get the number of missing map outputs for missing mapper partition blocks that are not merged. */ diff --git a/core/src/test/scala/org/apache/spark/MapStatusesConvertBenchmark.scala b/core/src/test/scala/org/apache/spark/MapStatusesConvertBenchmark.scala new file mode 100644 index 00000000000..7f25c86497f --- /dev/null +++ b/core/src/test/scala/org/apache/spark/MapStatusesConvertBenchmark.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.roaringbitmap.RoaringBitmap + +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.scheduler.{HighlyCompressedMapStatus, MapStatus, MergeStatus} +import org.apache.spark.storage.BlockManagerId + +/** + * Benchmark to measure performance for converting mapStatuses and mergeStatuses. + * To run this benchmark: + * {{{ + * 1. without sbt: + * bin/spark-submit --class <this class> --jars <spark core test jar> + * 2. build/sbt "core/test:runMain <this class>" + * 3. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain <this class>" + * Results will be written to "benchmarks/MapStatusesConvertBenchmark-results.txt". + * }}} + * */ +object MapStatusesConvertBenchmark extends BenchmarkBase { + + private def convertMapStatus(numIters: Int): Unit = { + + val benchmark = new Benchmark("MapStatuses Convert", 1, output = output) + + val blockManagerNumber = 1000 + val mapNumber = 50000 + val shufflePartitions = 10000 + + val shuffleId: Int = 0 + // First reduce task will fetch map data from startPartition to endPartition + val startPartition = 0 + val startMapIndex = 0 + val endMapIndex = mapNumber + val blockManagers = Array.tabulate(blockManagerNumber) { i => + BlockManagerId("a", "host" + i, 7337) + } + val mapStatuses: Array[MapStatus] = Array.tabulate(mapNumber) { mapTaskId => + HighlyCompressedMapStatus( + blockManagers(mapTaskId % blockManagerNumber), + Array.tabulate(shufflePartitions)(i => if (i % 50 == 0) 1 else 0), + mapTaskId) + } + val bitmap = new RoaringBitmap() + Range(0, 4000).foreach(bitmap.add(_)) + val mergeStatuses = Array.tabulate(shufflePartitions) { part => + MergeStatus(blockManagers(part % blockManagerNumber), shuffleId, bitmap, 100) + } + + Array(499, 999, 1499).foreach { endPartition => + benchmark.addCase( + s"Num Maps: $mapNumber Fetch partitions:${endPartition - startPartition + 1}", + numIters) { _ => + MapOutputTracker.convertMapStatuses( + shuffleId, + startPartition, + endPartition, + mapStatuses, + startMapIndex, + endMapIndex, + Some(mergeStatuses)) + } + } + + benchmark.run() + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + val numIters = 3 + runBenchmark("MapStatuses Convert Benchmark") { + convertMapStatus(numIters) + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org