This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2a667fbff7e [MINOR] Improve to update some mutable hash maps 2a667fbff7e is described below commit 2a667fbff7e7fd21bf69668ef78b175f60a24dba Author: weixiuli <weixi...@jd.com> AuthorDate: Mon Apr 18 09:22:24 2022 -0500 [MINOR] Improve to update some mutable hash maps ### What changes were proposed in this pull request? Improve to update some mutable hash maps ### Why are the changes needed? Reduce some mutable hash maps calls and cleanup code. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unittests. Closes #36179 from weixiuli/update-numBlocksInFlightPerAddress. Authored-by: weixiuli <weixi...@jd.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../main/scala/org/apache/spark/deploy/worker/Worker.scala | 4 ++-- .../scala/org/apache/spark/resource/ResourceAllocator.scala | 4 ++-- .../scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala | 2 +- .../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 11 +++++------ .../scala/org/apache/spark/ml/classification/NaiveBayes.scala | 2 +- .../src/main/scala/org/apache/spark/ml/stat/Summarizer.scala | 2 +- 6 files changed, 12 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 2c7021bdcb9..b26a0f82d69 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -264,13 +264,13 @@ private[deploy] class Worker( private def addResourcesUsed(deltaInfo: Map[String, ResourceInformation]): Unit = { deltaInfo.foreach { case (rName, rInfo) => - resourcesUsed(rName) = resourcesUsed(rName) + rInfo + resourcesUsed(rName) += rInfo } } private def removeResourcesUsed(deltaInfo: Map[String, ResourceInformation]): Unit = { deltaInfo.foreach { case (rName, rInfo) => - resourcesUsed(rName) = resourcesUsed(rName) - rInfo + resourcesUsed(rName) -= rInfo } } diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala index 7605e8c44b9..10cf0402d5f 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala @@ -82,7 +82,7 @@ private[spark] trait ResourceAllocator { } val isAvailable = addressAvailabilityMap(address) if (isAvailable > 0) { - addressAvailabilityMap(address) = addressAvailabilityMap(address) - 1 + addressAvailabilityMap(address) -= 1 } else { throw new SparkException("Try to acquire an address that is not available. " + s"$resourceName address $address is not available.") @@ -103,7 +103,7 @@ private[spark] trait ResourceAllocator { } val isAvailable = addressAvailabilityMap(address) if (isAvailable < slotsPerAddress) { - addressAvailabilityMap(address) = addressAvailabilityMap(address) + 1 + addressAvailabilityMap(address) += 1 } else { throw new SparkException(s"Try to release an address that is not assigned. $resourceName " + s"address $address is not assigned.") diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala index 230ec7efdb1..15a9ddd40e6 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala @@ -317,7 +317,7 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging { pushResult: PushResult): Boolean = synchronized { remainingBlocks -= pushResult.blockId bytesInFlight -= bytesPushed - numBlocksInFlightPerAddress(address) = numBlocksInFlightPerAddress(address) - 1 + numBlocksInFlightPerAddress(address) -= 1 if (remainingBlocks.isEmpty) { reqsInFlight -= 1 } diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index e2fc5389091..c91aaa8ddb7 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -763,7 +763,7 @@ final class ShuffleBlockFetcherIterator( shuffleMetrics.incLocalBlocksFetched(1) shuffleMetrics.incLocalBytesRead(buf.size) } else { - numBlocksInFlightPerAddress(address) = numBlocksInFlightPerAddress(address) - 1 + numBlocksInFlightPerAddress(address) -= 1 shuffleMetrics.incRemoteBytesRead(buf.size) if (buf.isInstanceOf[FileSegmentManagedBuffer]) { shuffleMetrics.incRemoteBytesReadToDisk(buf.size) @@ -905,8 +905,7 @@ final class ShuffleBlockFetcherIterator( case DeferFetchRequestResult(request) => val address = request.address - numBlocksInFlightPerAddress(address) = - numBlocksInFlightPerAddress(address) - request.blocks.size + numBlocksInFlightPerAddress(address) -= request.blocks.size bytesInFlight -= request.size reqsInFlight -= 1 logDebug("Number of requests in flight " + reqsInFlight) @@ -924,7 +923,7 @@ final class ShuffleBlockFetcherIterator( // 3. Failure to get the push-merged-local directories from the external shuffle service. // In this case, the blockId is ShuffleBlockId. if (pushBasedFetchHelper.isRemotePushMergedBlockAddress(address)) { - numBlocksInFlightPerAddress(address) = numBlocksInFlightPerAddress(address) - 1 + numBlocksInFlightPerAddress(address) -= 1 bytesInFlight -= size } if (isNetworkReqDone) { @@ -975,7 +974,7 @@ final class ShuffleBlockFetcherIterator( // The original meta request is processed so we decrease numBlocksToFetch and // numBlocksInFlightPerAddress by 1. We will collect new shuffle chunks request and the // count of this is added to numBlocksToFetch in collectFetchReqsFromMergedBlocks. - numBlocksInFlightPerAddress(address) = numBlocksInFlightPerAddress(address) - 1 + numBlocksInFlightPerAddress(address) -= 1 numBlocksToFetch -= 1 val blocksToFetch = pushBasedFetchHelper.createChunkBlockInfosFromMetaResponse( shuffleId, shuffleMergeId, reduceId, blockSize, bitmaps) @@ -988,7 +987,7 @@ final class ShuffleBlockFetcherIterator( case PushMergedRemoteMetaFailedFetchResult( shuffleId, shuffleMergeId, reduceId, address) => // The original meta request failed so we decrease numBlocksInFlightPerAddress by 1. - numBlocksInFlightPerAddress(address) = numBlocksInFlightPerAddress(address) - 1 + numBlocksInFlightPerAddress(address) -= 1 // If we fail to fetch the meta of a push-merged block, we fall back to fetching the // original blocks. pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock( diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala index dde4234a2e8..16176136a7e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala @@ -502,7 +502,7 @@ class NaiveBayesModel private[ml] ( j = 0 while (j < probArray.length) { - probArray(j) = probArray(j) - logSumExp + probArray(j) -= logSumExp j += 1 } Vectors.dense(probArray) diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala index a3dd133a4ce..7fd99faf0c8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala @@ -596,7 +596,7 @@ private[spark] class SummarizerBuffer( // merge max and min if (currMax != null) { currMax(i) = math.max(currMax(i), other.currMax(i)) } if (currMin != null) { currMin(i) = math.min(currMin(i), other.currMin(i)) } - if (nnz != null) { nnz(i) = nnz(i) + other.nnz(i) } + if (nnz != null) { nnz(i) += other.nnz(i) } i += 1 } } else if (totalWeightSum == 0.0 && other.totalWeightSum != 0.0) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org