This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new bc621c36 [Improvement] Introduce a new class ShuffleTaskInfo (#158)
bc621c36 is described below
commit bc621c36960b9743465080519dfaf27a74cab762
Author: jokercurry <[email protected]>
AuthorDate: Thu Aug 18 15:29:57 2022 +0800
[Improvement] Introduce a new class ShuffleTaskInfo (#158)
### What changes were proposed in this pull request?
There is no need to cache four Map. Introduce a new object,
`ShuffleTaskInfo`, to replace `currentTimes`, `commitCounts`, `commitLocks` and
`cachedBlockIds`.
### Why are the changes needed?
This will be clear and save space.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Passed the ut.
---
.../apache/uniffle/test/ShuffleServerGrpcTest.java | 4 +-
.../uniffle/server/ShuffleServerGrpcService.java | 2 +-
.../org/apache/uniffle/server/ShuffleTaskInfo.java | 69 ++++++++++++++++++++++
.../apache/uniffle/server/ShuffleTaskManager.java | 55 +++++++----------
.../uniffle/server/ShuffleTaskManagerTest.java | 6 +-
5 files changed, 95 insertions(+), 41 deletions(-)
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index 52bcc9cd..6b14bde0 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -120,7 +120,7 @@ public class ShuffleServerGrpcTest extends
IntegrationTestBase {
Lists.newArrayList(new PartitionRange(0, 1)), "");
shuffleServerClient.registerShuffle(rrsr);
assertEquals(Sets.newHashSet("clearResourceTest1", "clearResourceTest2"),
- shuffleServers.get(0).getShuffleTaskManager().getAppIds().keySet());
+ shuffleServers.get(0).getShuffleTaskManager().getAppIds());
// Thread will keep refresh clearResourceTest1 in coordinator
Thread t = new Thread(() -> {
@@ -146,7 +146,7 @@ public class ShuffleServerGrpcTest extends
IntegrationTestBase {
// clearResourceTest2 will be removed because of
rss.server.app.expired.withoutHeartbeat
Thread.sleep(2000);
assertEquals(Sets.newHashSet("clearResourceTest1"),
- shuffleServers.get(0).getShuffleTaskManager().getAppIds().keySet());
+ shuffleServers.get(0).getShuffleTaskManager().getAppIds());
// clearResourceTest1 will be removed because of
rss.server.app.expired.withoutHeartbeat
t.interrupt();
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 7fd21853..a248329c 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -213,7 +213,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
int commitCount = 0;
try {
- if
(!shuffleServer.getShuffleTaskManager().getAppIds().containsKey(appId)) {
+ if (!shuffleServer.getShuffleTaskManager().getAppIds().contains(appId)) {
throw new IllegalStateException("AppId " + appId + " was removed
already");
}
commitCount =
shuffleServer.getShuffleTaskManager().updateAndGetCommitCount(appId, shuffleId);
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
new file mode 100644
index 00000000..a25dddf5
--- /dev/null
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.Maps;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+
+/**
+ * ShuffleTaskInfo contains the information of submitting the shuffle,
+ * the information of the cache block, and the timestamp corresponding to the
app
+ */
+public class ShuffleTaskInfo {
+
+ private Long currentTimes;
+ /**
+ * shuffleId -> commit count
+ */
+ private Map<Integer, AtomicInteger> commitCounts;
+ private Map<Integer, Object> commitLocks;
+ /**
+ * shuffleId -> blockIds
+ */
+ private Map<Integer, Roaring64NavigableMap> cachedBlockIds;
+
+ public ShuffleTaskInfo() {
+ this.currentTimes = System.currentTimeMillis();
+ this.commitCounts = Maps.newConcurrentMap();
+ this.commitLocks = Maps.newConcurrentMap();
+ this.cachedBlockIds = Maps.newConcurrentMap();
+ }
+
+ public Long getCurrentTimes() {
+ return currentTimes;
+ }
+
+ public void setCurrentTimes(Long currentTimes) {
+ this.currentTimes = currentTimes;
+ }
+
+ public Map<Integer, AtomicInteger> getCommitCounts() {
+ return commitCounts;
+ }
+
+ public Map<Integer, Object> getCommitLocks() {
+ return commitLocks;
+ }
+
+ public Map<Integer, Roaring64NavigableMap> getCachedBlockIds() {
+ return cachedBlockIds;
+ }
+}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 3243c2d7..655bf5be 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -74,12 +74,7 @@ public class ShuffleTaskManager {
// but when get blockId, performance will degrade a little which can be
optimized by client configuration
private Map<String, Map<Integer, Roaring64NavigableMap[]>>
partitionsToBlockIds;
private ShuffleBufferManager shuffleBufferManager;
- private Map<String, Long> appIds = Maps.newConcurrentMap();
- // appId -> shuffleId -> commit count
- private Map<String, Map<Integer, AtomicInteger>> commitCounts =
Maps.newConcurrentMap();
- private Map<String, Map<Integer, Object>> commitLocks =
Maps.newConcurrentMap();
- // appId -> shuffleId -> blockIds
- private Map<String, Map<Integer, Roaring64NavigableMap>> cachedBlockIds =
Maps.newConcurrentMap();
+ private Map<String, ShuffleTaskInfo> shuffleTaskInfos =
Maps.newConcurrentMap();
private Map<Long, PreAllocatedBufferInfo> requireBufferIds =
Maps.newConcurrentMap();
private Runnable clearResourceThread;
private BlockingQueue<String> expiredAppIdQueue =
Queues.newLinkedBlockingQueue();
@@ -161,10 +156,8 @@ public class ShuffleTaskManager {
refreshAppId(appId);
Roaring64NavigableMap cachedBlockIds = getCachedBlockIds(appId, shuffleId);
Roaring64NavigableMap cloneBlockIds;
- commitLocks.putIfAbsent(appId, Maps.newConcurrentMap());
- Map<Integer, Object> shuffleLevelLocks = commitLocks.get(appId);
- shuffleLevelLocks.putIfAbsent(shuffleId, new Object());
- Object lock = shuffleLevelLocks.get(shuffleId);
+ ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.computeIfAbsent(appId,
x -> new ShuffleTaskInfo());
+ Object lock = shuffleTaskInfo.getCommitLocks().computeIfAbsent(shuffleId,
x -> new Object());
synchronized (lock) {
long commitTimeout = conf.get(ShuffleServerConf.SERVER_COMMIT_TIMEOUT);
if (System.currentTimeMillis() - start > commitTimeout) {
@@ -227,10 +220,9 @@ public class ShuffleTaskManager {
}
public int updateAndGetCommitCount(String appId, int shuffleId) {
- commitCounts.putIfAbsent(appId, Maps.newConcurrentMap());
- Map<Integer, AtomicInteger> shuffleCommit = commitCounts.get(appId);
- shuffleCommit.putIfAbsent(shuffleId, new AtomicInteger(0));
- AtomicInteger commitNum = shuffleCommit.get(shuffleId);
+ ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.computeIfAbsent(appId,
x -> new ShuffleTaskInfo());
+ AtomicInteger commitNum = shuffleTaskInfo.getCommitCounts()
+ .computeIfAbsent(shuffleId, x -> new AtomicInteger(0));
return commitNum.incrementAndGet();
}
@@ -238,10 +230,9 @@ public class ShuffleTaskManager {
if (spbs == null || spbs.length == 0) {
return;
}
- cachedBlockIds.putIfAbsent(appId, Maps.newConcurrentMap());
- Map<Integer, Roaring64NavigableMap> shuffleToBlockIds =
cachedBlockIds.get(appId);
- shuffleToBlockIds.putIfAbsent(shuffleId, Roaring64NavigableMap.bitmapOf());
- Roaring64NavigableMap bitmap = shuffleToBlockIds.get(shuffleId);
+ ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.computeIfAbsent(appId,
x -> new ShuffleTaskInfo());
+ Roaring64NavigableMap bitmap = shuffleTaskInfo.getCachedBlockIds()
+ .computeIfAbsent(shuffleId, x -> Roaring64NavigableMap.bitmapOf());
synchronized (bitmap) {
for (ShufflePartitionedBlock spb : spbs) {
bitmap.addLong(spb.getBlockId());
@@ -250,11 +241,8 @@ public class ShuffleTaskManager {
}
public Roaring64NavigableMap getCachedBlockIds(String appId, int shuffleId) {
- Map<Integer, Roaring64NavigableMap> shuffleIdToBlockIds =
cachedBlockIds.get(appId);
- if (shuffleIdToBlockIds == null) {
- LOG.warn("Unexpected value when getCachedBlockIds for appId[" + appId +
"]");
- return Roaring64NavigableMap.bitmapOf();
- }
+ Map<Integer, Roaring64NavigableMap> shuffleIdToBlockIds = shuffleTaskInfos
+ .getOrDefault(appId, new ShuffleTaskInfo()).getCachedBlockIds();
Roaring64NavigableMap blockIds = shuffleIdToBlockIds.get(shuffleId);
if (blockIds == null) {
LOG.warn("Unexpected value when getCachedBlockIds for appId[" + appId +
"], shuffleId[" + shuffleId + "]");
@@ -368,16 +356,16 @@ public class ShuffleTaskManager {
public void checkResourceStatus() {
try {
- Set<String> appNames = Sets.newHashSet(appIds.keySet());
+ Set<String> appNames = Sets.newHashSet(shuffleTaskInfos.keySet());
// remove applications which is timeout according to
rss.server.app.expired.withoutHeartbeat
for (String appId : appNames) {
- if (System.currentTimeMillis() - appIds.get(appId) >
appExpiredWithoutHB) {
+ if (System.currentTimeMillis() -
shuffleTaskInfos.get(appId).getCurrentTimes() > appExpiredWithoutHB) {
LOG.info("Detect expired appId[" + appId + "] according "
+ "to rss.server.app.expired.withoutHeartbeat");
expiredAppIdQueue.add(appId);
}
}
- ShuffleServerMetrics.gaugeAppNum.set(appIds.size());
+ ShuffleServerMetrics.gaugeAppNum.set(shuffleTaskInfos.size());
} catch (Exception e) {
LOG.warn("Error happened in checkResourceStatus", e);
}
@@ -387,22 +375,19 @@ public class ShuffleTaskManager {
public void removeResources(String appId) {
LOG.info("Start remove resource for appId[" + appId + "]");
final long start = System.currentTimeMillis();
- final Map<Integer, Roaring64NavigableMap> shuffleToCachedBlockIds =
cachedBlockIds.get(appId);
- appIds.remove(appId);
+ final Map<Integer, Roaring64NavigableMap> shuffleToCachedBlockIds =
shuffleTaskInfos.get(appId).getCachedBlockIds();
partitionsToBlockIds.remove(appId);
- cachedBlockIds.remove(appId);
- commitCounts.remove(appId);
- commitLocks.remove(appId);
shuffleBufferManager.removeBuffer(appId);
shuffleFlushManager.removeResources(appId);
- if (shuffleToCachedBlockIds != null) {
+ if (!shuffleToCachedBlockIds.isEmpty()) {
storageManager.removeResources(appId, shuffleToCachedBlockIds.keySet());
}
+ shuffleTaskInfos.remove(appId);
LOG.info("Finish remove resource for appId[" + appId + "] cost " +
(System.currentTimeMillis() - start) + " ms");
}
public void refreshAppId(String appId) {
- appIds.put(appId, System.currentTimeMillis());
+ shuffleTaskInfos.computeIfAbsent(appId, x -> new
ShuffleTaskInfo()).setCurrentTimes(System.currentTimeMillis());
}
// check pre allocated buffer, release the memory if it expired
@@ -434,8 +419,8 @@ public class ShuffleTaskManager {
}
@VisibleForTesting
- public Map<String, Long> getAppIds() {
- return appIds;
+ public Set<String> getAppIds() {
+ return shuffleTaskInfos.keySet();
}
@VisibleForTesting
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 46c762f6..13b2e509 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -268,7 +268,7 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
retry++;
}
// application "clearTest2" was removed according to
rss.server.app.expired.withoutHeartbeat
- assertEquals(Sets.newHashSet("clearTest1"),
shuffleTaskManager.getAppIds().keySet());
+ assertEquals(Sets.newHashSet("clearTest1"),
shuffleTaskManager.getAppIds());
assertEquals(1, shuffleTaskManager.getCachedBlockIds("clearTest1",
shuffleId).getLongCardinality());
// register again
@@ -277,12 +277,12 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE);
shuffleTaskManager.refreshAppId("clearTest2");
shuffleTaskManager.checkResourceStatus();
- assertEquals(Sets.newHashSet("clearTest1", "clearTest2"),
shuffleTaskManager.getAppIds().keySet());
+ assertEquals(Sets.newHashSet("clearTest1", "clearTest2"),
shuffleTaskManager.getAppIds());
Thread.sleep(5000);
shuffleTaskManager.checkResourceStatus();
// wait resource delete
Thread.sleep(3000);
- assertEquals(Sets.newHashSet(), shuffleTaskManager.getAppIds().keySet());
+ assertEquals(Sets.newHashSet(), shuffleTaskManager.getAppIds());
assertTrue(shuffleTaskManager.getCachedBlockIds("clearTest1",
shuffleId).isEmpty());
}