This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new bc621c36 [Improvement] Introduce a new class ShuffleTaskInfo (#158)
bc621c36 is described below

commit bc621c36960b9743465080519dfaf27a74cab762
Author: jokercurry <[email protected]>
AuthorDate: Thu Aug 18 15:29:57 2022 +0800

    [Improvement] Introduce a new class ShuffleTaskInfo (#158)
    
    ### What changes were proposed in this pull request?
    There is no need to cache four Map. Introduce a new object, 
`ShuffleTaskInfo`, to replace `currentTimes`, `commitCounts`, `commitLocks` and 
`cachedBlockIds`.
    
    ### Why are the changes needed?
    This will be clear and save space.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Passed the ut.
---
 .../apache/uniffle/test/ShuffleServerGrpcTest.java |  4 +-
 .../uniffle/server/ShuffleServerGrpcService.java   |  2 +-
 .../org/apache/uniffle/server/ShuffleTaskInfo.java | 69 ++++++++++++++++++++++
 .../apache/uniffle/server/ShuffleTaskManager.java  | 55 +++++++----------
 .../uniffle/server/ShuffleTaskManagerTest.java     |  6 +-
 5 files changed, 95 insertions(+), 41 deletions(-)

diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index 52bcc9cd..6b14bde0 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -120,7 +120,7 @@ public class ShuffleServerGrpcTest extends 
IntegrationTestBase {
         Lists.newArrayList(new PartitionRange(0, 1)), "");
     shuffleServerClient.registerShuffle(rrsr);
     assertEquals(Sets.newHashSet("clearResourceTest1", "clearResourceTest2"),
-        shuffleServers.get(0).getShuffleTaskManager().getAppIds().keySet());
+        shuffleServers.get(0).getShuffleTaskManager().getAppIds());
 
     // Thread will keep refresh clearResourceTest1 in coordinator
     Thread t = new Thread(() -> {
@@ -146,7 +146,7 @@ public class ShuffleServerGrpcTest extends 
IntegrationTestBase {
     // clearResourceTest2 will be removed because of 
rss.server.app.expired.withoutHeartbeat
     Thread.sleep(2000);
     assertEquals(Sets.newHashSet("clearResourceTest1"),
-        shuffleServers.get(0).getShuffleTaskManager().getAppIds().keySet());
+        shuffleServers.get(0).getShuffleTaskManager().getAppIds());
 
     // clearResourceTest1 will be removed because of 
rss.server.app.expired.withoutHeartbeat
     t.interrupt();
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 7fd21853..a248329c 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -213,7 +213,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
     int commitCount = 0;
 
     try {
-      if 
(!shuffleServer.getShuffleTaskManager().getAppIds().containsKey(appId)) {
+      if (!shuffleServer.getShuffleTaskManager().getAppIds().contains(appId)) {
         throw new IllegalStateException("AppId " + appId + " was removed 
already");
       }
       commitCount = 
shuffleServer.getShuffleTaskManager().updateAndGetCommitCount(appId, shuffleId);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
new file mode 100644
index 00000000..a25dddf5
--- /dev/null
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.Maps;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+
+/**
+ * ShuffleTaskInfo contains the information of submitting the shuffle,
+ * the information of the cache block, and the timestamp corresponding to the 
app
+ */
+public class ShuffleTaskInfo {
+
+  private Long currentTimes;
+  /**
+   * shuffleId -> commit count
+   */
+  private Map<Integer, AtomicInteger> commitCounts;
+  private Map<Integer, Object> commitLocks;
+  /**
+   * shuffleId -> blockIds
+    */
+  private Map<Integer, Roaring64NavigableMap> cachedBlockIds;
+
+  public ShuffleTaskInfo() {
+    this.currentTimes = System.currentTimeMillis();
+    this.commitCounts = Maps.newConcurrentMap();
+    this.commitLocks = Maps.newConcurrentMap();
+    this.cachedBlockIds = Maps.newConcurrentMap();
+  }
+
+  public Long getCurrentTimes() {
+    return currentTimes;
+  }
+
+  public void setCurrentTimes(Long currentTimes) {
+    this.currentTimes = currentTimes;
+  }
+
+  public Map<Integer, AtomicInteger> getCommitCounts() {
+    return commitCounts;
+  }
+
+  public Map<Integer, Object> getCommitLocks() {
+    return commitLocks;
+  }
+
+  public Map<Integer, Roaring64NavigableMap> getCachedBlockIds() {
+    return cachedBlockIds;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 3243c2d7..655bf5be 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -74,12 +74,7 @@ public class ShuffleTaskManager {
   // but when get blockId, performance will degrade a little which can be 
optimized by client configuration
   private Map<String, Map<Integer, Roaring64NavigableMap[]>> 
partitionsToBlockIds;
   private ShuffleBufferManager shuffleBufferManager;
-  private Map<String, Long> appIds = Maps.newConcurrentMap();
-  // appId -> shuffleId -> commit count
-  private Map<String, Map<Integer, AtomicInteger>> commitCounts = 
Maps.newConcurrentMap();
-  private Map<String, Map<Integer, Object>> commitLocks = 
Maps.newConcurrentMap();
-  // appId -> shuffleId -> blockIds
-  private Map<String, Map<Integer, Roaring64NavigableMap>> cachedBlockIds = 
Maps.newConcurrentMap();
+  private Map<String, ShuffleTaskInfo> shuffleTaskInfos = 
Maps.newConcurrentMap();
   private Map<Long, PreAllocatedBufferInfo> requireBufferIds = 
Maps.newConcurrentMap();
   private Runnable clearResourceThread;
   private BlockingQueue<String> expiredAppIdQueue = 
Queues.newLinkedBlockingQueue();
@@ -161,10 +156,8 @@ public class ShuffleTaskManager {
     refreshAppId(appId);
     Roaring64NavigableMap cachedBlockIds = getCachedBlockIds(appId, shuffleId);
     Roaring64NavigableMap cloneBlockIds;
-    commitLocks.putIfAbsent(appId, Maps.newConcurrentMap());
-    Map<Integer, Object> shuffleLevelLocks = commitLocks.get(appId);
-    shuffleLevelLocks.putIfAbsent(shuffleId, new Object());
-    Object lock = shuffleLevelLocks.get(shuffleId);
+    ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.computeIfAbsent(appId, 
x -> new ShuffleTaskInfo());
+    Object lock = shuffleTaskInfo.getCommitLocks().computeIfAbsent(shuffleId, 
x -> new Object());
     synchronized (lock) {
       long commitTimeout = conf.get(ShuffleServerConf.SERVER_COMMIT_TIMEOUT);
       if (System.currentTimeMillis() - start > commitTimeout) {
@@ -227,10 +220,9 @@ public class ShuffleTaskManager {
   }
 
   public int updateAndGetCommitCount(String appId, int shuffleId) {
-    commitCounts.putIfAbsent(appId, Maps.newConcurrentMap());
-    Map<Integer, AtomicInteger> shuffleCommit = commitCounts.get(appId);
-    shuffleCommit.putIfAbsent(shuffleId, new AtomicInteger(0));
-    AtomicInteger commitNum = shuffleCommit.get(shuffleId);
+    ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.computeIfAbsent(appId, 
x -> new ShuffleTaskInfo());
+    AtomicInteger commitNum = shuffleTaskInfo.getCommitCounts()
+        .computeIfAbsent(shuffleId, x -> new AtomicInteger(0));
     return commitNum.incrementAndGet();
   }
 
@@ -238,10 +230,9 @@ public class ShuffleTaskManager {
     if (spbs == null || spbs.length == 0) {
       return;
     }
-    cachedBlockIds.putIfAbsent(appId, Maps.newConcurrentMap());
-    Map<Integer, Roaring64NavigableMap> shuffleToBlockIds = 
cachedBlockIds.get(appId);
-    shuffleToBlockIds.putIfAbsent(shuffleId, Roaring64NavigableMap.bitmapOf());
-    Roaring64NavigableMap bitmap = shuffleToBlockIds.get(shuffleId);
+    ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.computeIfAbsent(appId, 
x -> new ShuffleTaskInfo());
+    Roaring64NavigableMap bitmap = shuffleTaskInfo.getCachedBlockIds()
+        .computeIfAbsent(shuffleId, x -> Roaring64NavigableMap.bitmapOf());
     synchronized (bitmap) {
       for (ShufflePartitionedBlock spb : spbs) {
         bitmap.addLong(spb.getBlockId());
@@ -250,11 +241,8 @@ public class ShuffleTaskManager {
   }
 
   public Roaring64NavigableMap getCachedBlockIds(String appId, int shuffleId) {
-    Map<Integer, Roaring64NavigableMap> shuffleIdToBlockIds = 
cachedBlockIds.get(appId);
-    if (shuffleIdToBlockIds == null) {
-      LOG.warn("Unexpected value when getCachedBlockIds for appId[" + appId + 
"]");
-      return Roaring64NavigableMap.bitmapOf();
-    }
+    Map<Integer, Roaring64NavigableMap> shuffleIdToBlockIds = shuffleTaskInfos
+        .getOrDefault(appId, new ShuffleTaskInfo()).getCachedBlockIds();
     Roaring64NavigableMap blockIds = shuffleIdToBlockIds.get(shuffleId);
     if (blockIds == null) {
       LOG.warn("Unexpected value when getCachedBlockIds for appId[" + appId + 
"], shuffleId[" + shuffleId + "]");
@@ -368,16 +356,16 @@ public class ShuffleTaskManager {
 
   public void checkResourceStatus() {
     try {
-      Set<String> appNames = Sets.newHashSet(appIds.keySet());
+      Set<String> appNames = Sets.newHashSet(shuffleTaskInfos.keySet());
       // remove applications which is timeout according to 
rss.server.app.expired.withoutHeartbeat
       for (String appId : appNames) {
-        if (System.currentTimeMillis() - appIds.get(appId) > 
appExpiredWithoutHB) {
+        if (System.currentTimeMillis() - 
shuffleTaskInfos.get(appId).getCurrentTimes() > appExpiredWithoutHB) {
           LOG.info("Detect expired appId[" + appId + "] according "
               + "to rss.server.app.expired.withoutHeartbeat");
           expiredAppIdQueue.add(appId);
         }
       }
-      ShuffleServerMetrics.gaugeAppNum.set(appIds.size());
+      ShuffleServerMetrics.gaugeAppNum.set(shuffleTaskInfos.size());
     } catch (Exception e) {
       LOG.warn("Error happened in checkResourceStatus", e);
     }
@@ -387,22 +375,19 @@ public class ShuffleTaskManager {
   public void removeResources(String appId) {
     LOG.info("Start remove resource for appId[" + appId + "]");
     final long start = System.currentTimeMillis();
-    final Map<Integer, Roaring64NavigableMap> shuffleToCachedBlockIds = 
cachedBlockIds.get(appId);
-    appIds.remove(appId);
+    final Map<Integer, Roaring64NavigableMap> shuffleToCachedBlockIds = 
shuffleTaskInfos.get(appId).getCachedBlockIds();
     partitionsToBlockIds.remove(appId);
-    cachedBlockIds.remove(appId);
-    commitCounts.remove(appId);
-    commitLocks.remove(appId);
     shuffleBufferManager.removeBuffer(appId);
     shuffleFlushManager.removeResources(appId);
-    if (shuffleToCachedBlockIds != null) {
+    if (!shuffleToCachedBlockIds.isEmpty()) {
       storageManager.removeResources(appId, shuffleToCachedBlockIds.keySet());
     }
+    shuffleTaskInfos.remove(appId);
     LOG.info("Finish remove resource for appId[" + appId + "] cost " + 
(System.currentTimeMillis() - start) + " ms");
   }
 
   public void refreshAppId(String appId) {
-    appIds.put(appId, System.currentTimeMillis());
+    shuffleTaskInfos.computeIfAbsent(appId, x -> new 
ShuffleTaskInfo()).setCurrentTimes(System.currentTimeMillis());
   }
 
   // check pre allocated buffer, release the memory if it expired
@@ -434,8 +419,8 @@ public class ShuffleTaskManager {
   }
 
   @VisibleForTesting
-  public Map<String, Long> getAppIds() {
-    return appIds;
+  public Set<String> getAppIds() {
+    return shuffleTaskInfos.keySet();
   }
 
   @VisibleForTesting
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 46c762f6..13b2e509 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -268,7 +268,7 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
       retry++;
     }
     // application "clearTest2" was removed according to 
rss.server.app.expired.withoutHeartbeat
-    assertEquals(Sets.newHashSet("clearTest1"), 
shuffleTaskManager.getAppIds().keySet());
+    assertEquals(Sets.newHashSet("clearTest1"), 
shuffleTaskManager.getAppIds());
     assertEquals(1, shuffleTaskManager.getCachedBlockIds("clearTest1", 
shuffleId).getLongCardinality());
 
     // register again
@@ -277,12 +277,12 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
         Lists.newArrayList(new PartitionRange(0, 1)), 
RemoteStorageInfo.EMPTY_REMOTE_STORAGE);
     shuffleTaskManager.refreshAppId("clearTest2");
     shuffleTaskManager.checkResourceStatus();
-    assertEquals(Sets.newHashSet("clearTest1", "clearTest2"), 
shuffleTaskManager.getAppIds().keySet());
+    assertEquals(Sets.newHashSet("clearTest1", "clearTest2"), 
shuffleTaskManager.getAppIds());
     Thread.sleep(5000);
     shuffleTaskManager.checkResourceStatus();
     // wait resource delete
     Thread.sleep(3000);
-    assertEquals(Sets.newHashSet(), shuffleTaskManager.getAppIds().keySet());
+    assertEquals(Sets.newHashSet(), shuffleTaskManager.getAppIds());
     assertTrue(shuffleTaskManager.getCachedBlockIds("clearTest1", 
shuffleId).isEmpty());
   }
 

Reply via email to