This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a667fbff7e [MINOR] Improve to update some mutable hash maps
2a667fbff7e is described below

commit 2a667fbff7e7fd21bf69668ef78b175f60a24dba
Author: weixiuli <weixi...@jd.com>
AuthorDate: Mon Apr 18 09:22:24 2022 -0500

    [MINOR] Improve to update some mutable hash maps
    
    ### What changes were proposed in this pull request?
     Improve to update some mutable hash maps
    
    ### Why are the changes needed?
    
     Reduce  some mutable hash maps calls and cleanup code.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing unittests.
    
    Closes #36179 from weixiuli/update-numBlocksInFlightPerAddress.
    
    Authored-by: weixiuli <weixi...@jd.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../main/scala/org/apache/spark/deploy/worker/Worker.scala    |  4 ++--
 .../scala/org/apache/spark/resource/ResourceAllocator.scala   |  4 ++--
 .../scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala   |  2 +-
 .../apache/spark/storage/ShuffleBlockFetcherIterator.scala    | 11 +++++------
 .../scala/org/apache/spark/ml/classification/NaiveBayes.scala |  2 +-
 .../src/main/scala/org/apache/spark/ml/stat/Summarizer.scala  |  2 +-
 6 files changed, 12 insertions(+), 13 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 2c7021bdcb9..b26a0f82d69 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -264,13 +264,13 @@ private[deploy] class Worker(
 
   private def addResourcesUsed(deltaInfo: Map[String, ResourceInformation]): 
Unit = {
     deltaInfo.foreach { case (rName, rInfo) =>
-      resourcesUsed(rName) = resourcesUsed(rName) + rInfo
+      resourcesUsed(rName) += rInfo
     }
   }
 
   private def removeResourcesUsed(deltaInfo: Map[String, 
ResourceInformation]): Unit = {
     deltaInfo.foreach { case (rName, rInfo) =>
-      resourcesUsed(rName) = resourcesUsed(rName) - rInfo
+      resourcesUsed(rName) -= rInfo
     }
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
index 7605e8c44b9..10cf0402d5f 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
@@ -82,7 +82,7 @@ private[spark] trait ResourceAllocator {
       }
       val isAvailable = addressAvailabilityMap(address)
       if (isAvailable > 0) {
-        addressAvailabilityMap(address) = addressAvailabilityMap(address) - 1
+        addressAvailabilityMap(address) -= 1
       } else {
         throw new SparkException("Try to acquire an address that is not 
available. " +
           s"$resourceName address $address is not available.")
@@ -103,7 +103,7 @@ private[spark] trait ResourceAllocator {
       }
       val isAvailable = addressAvailabilityMap(address)
       if (isAvailable < slotsPerAddress) {
-        addressAvailabilityMap(address) = addressAvailabilityMap(address) + 1
+        addressAvailabilityMap(address) += 1
       } else {
         throw new SparkException(s"Try to release an address that is not 
assigned. $resourceName " +
           s"address $address is not assigned.")
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala 
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
index 230ec7efdb1..15a9ddd40e6 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
@@ -317,7 +317,7 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) 
extends Logging {
       pushResult: PushResult): Boolean = synchronized {
     remainingBlocks -= pushResult.blockId
     bytesInFlight -= bytesPushed
-    numBlocksInFlightPerAddress(address) = 
numBlocksInFlightPerAddress(address) - 1
+    numBlocksInFlightPerAddress(address) -= 1
     if (remainingBlocks.isEmpty) {
       reqsInFlight -= 1
     }
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index e2fc5389091..c91aaa8ddb7 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -763,7 +763,7 @@ final class ShuffleBlockFetcherIterator(
               shuffleMetrics.incLocalBlocksFetched(1)
               shuffleMetrics.incLocalBytesRead(buf.size)
             } else {
-              numBlocksInFlightPerAddress(address) = 
numBlocksInFlightPerAddress(address) - 1
+              numBlocksInFlightPerAddress(address) -= 1
               shuffleMetrics.incRemoteBytesRead(buf.size)
               if (buf.isInstanceOf[FileSegmentManagedBuffer]) {
                 shuffleMetrics.incRemoteBytesReadToDisk(buf.size)
@@ -905,8 +905,7 @@ final class ShuffleBlockFetcherIterator(
 
         case DeferFetchRequestResult(request) =>
           val address = request.address
-          numBlocksInFlightPerAddress(address) =
-            numBlocksInFlightPerAddress(address) - request.blocks.size
+          numBlocksInFlightPerAddress(address) -= request.blocks.size
           bytesInFlight -= request.size
           reqsInFlight -= 1
           logDebug("Number of requests in flight " + reqsInFlight)
@@ -924,7 +923,7 @@ final class ShuffleBlockFetcherIterator(
           // 3. Failure to get the push-merged-local directories from the 
external shuffle service.
           //    In this case, the blockId is ShuffleBlockId.
           if (pushBasedFetchHelper.isRemotePushMergedBlockAddress(address)) {
-            numBlocksInFlightPerAddress(address) = 
numBlocksInFlightPerAddress(address) - 1
+            numBlocksInFlightPerAddress(address) -= 1
             bytesInFlight -= size
           }
           if (isNetworkReqDone) {
@@ -975,7 +974,7 @@ final class ShuffleBlockFetcherIterator(
           // The original meta request is processed so we decrease 
numBlocksToFetch and
           // numBlocksInFlightPerAddress by 1. We will collect new shuffle 
chunks request and the
           // count of this is added to numBlocksToFetch in 
collectFetchReqsFromMergedBlocks.
-          numBlocksInFlightPerAddress(address) = 
numBlocksInFlightPerAddress(address) - 1
+          numBlocksInFlightPerAddress(address) -= 1
           numBlocksToFetch -= 1
           val blocksToFetch = 
pushBasedFetchHelper.createChunkBlockInfosFromMetaResponse(
             shuffleId, shuffleMergeId, reduceId, blockSize, bitmaps)
@@ -988,7 +987,7 @@ final class ShuffleBlockFetcherIterator(
         case PushMergedRemoteMetaFailedFetchResult(
           shuffleId, shuffleMergeId, reduceId, address) =>
           // The original meta request failed so we decrease 
numBlocksInFlightPerAddress by 1.
-          numBlocksInFlightPerAddress(address) = 
numBlocksInFlightPerAddress(address) - 1
+          numBlocksInFlightPerAddress(address) -= 1
           // If we fail to fetch the meta of a push-merged block, we fall back 
to fetching the
           // original blocks.
           pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala 
b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
index dde4234a2e8..16176136a7e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
@@ -502,7 +502,7 @@ class NaiveBayesModel private[ml] (
 
     j = 0
     while (j < probArray.length) {
-      probArray(j) = probArray(j) - logSumExp
+      probArray(j) -= logSumExp
       j += 1
     }
     Vectors.dense(probArray)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala
index a3dd133a4ce..7fd99faf0c8 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala
@@ -596,7 +596,7 @@ private[spark] class SummarizerBuffer(
         // merge max and min
         if (currMax != null) { currMax(i) = math.max(currMax(i), 
other.currMax(i)) }
         if (currMin != null) { currMin(i) = math.min(currMin(i), 
other.currMin(i)) }
-        if (nnz != null) { nnz(i) = nnz(i) + other.nnz(i) }
+        if (nnz != null) { nnz(i) += other.nnz(i) }
         i += 1
       }
     } else if (totalWeightSum == 0.0 && other.totalWeightSum != 0.0) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to