This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 5a81c83c9 [#2198]fix(server) reduce the redundant appId info for
partitionKey in `AbstractStorage` (#2199)
5a81c83c9 is described below
commit 5a81c83c9e161d98bf1790c2a9d026a978b4e4e9
Author: leewish <[email protected]>
AuthorDate: Thu Oct 24 11:32:10 2024 +0800
[#2198]fix(server) reduce the redundant appId info for partitionKey in
`AbstractStorage` (#2199)
### What changes were proposed in this pull request?
There are three private variables in AbstractStorage that use partitionKey
as the key for the internal Map.
PartitionKey uses the redundant appId info which is already the first key
of the three private variables .
### Why are the changes needed?
Fix: #2198
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Tested locally
Co-authored-by: wenlongwlli <[email protected]>
---
.../uniffle/storage/common/AbstractStorage.java | 35 +++++++++++++---------
1 file changed, 21 insertions(+), 14 deletions(-)
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java
b/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java
index 90c1c3b2a..dcff1d9d9 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java
@@ -21,8 +21,8 @@ import java.util.Map;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.JavaUtils;
-import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.storage.handler.api.ServerReadHandler;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandlerWrapper;
@@ -32,10 +32,13 @@ import org.apache.uniffle.storage.util.ShuffleStorageUtils;
public abstract class AbstractStorage implements Storage {
+ // appId -> partitionKey -> ShuffleWriteHandler
private Map<String, Map<String, ShuffleWriteHandler>> writerHandlers =
JavaUtils.newConcurrentMap();
+ // appId -> partitionKey -> CreateShuffleWriteHandlerRequest
private Map<String, Map<String, CreateShuffleWriteHandlerRequest>> requests =
JavaUtils.newConcurrentMap();
+ // appId -> partitionKey -> ServerReadHandler
private Map<String, Map<String, ServerReadHandler>> readerHandlers =
JavaUtils.newConcurrentMap();
abstract ShuffleWriteHandler
newWriteHandler(CreateShuffleWriteHandlerRequest request);
@@ -46,17 +49,16 @@ public abstract class AbstractStorage implements Storage {
writerHandlers.computeIfAbsent(request.getAppId(), key ->
JavaUtils.newConcurrentMap());
requests.computeIfAbsent(request.getAppId(), key ->
JavaUtils.newConcurrentMap());
Map<String, ShuffleWriteHandler> map =
writerHandlers.get(request.getAppId());
- String partitionKey =
- RssUtils.generatePartitionKey(
- request.getAppId(), request.getShuffleId(),
request.getStartPartition());
+ String partitionKeyExceptAppId =
+ generatePartitionKeyExceptAppId(request.getShuffleId(),
request.getStartPartition());
boolean isNewlyCreated = false;
- if (!map.containsKey(partitionKey)) {
+ if (!map.containsKey(partitionKeyExceptAppId)) {
isNewlyCreated = true;
}
- map.computeIfAbsent(partitionKey, key -> newWriteHandler(request));
+ map.computeIfAbsent(partitionKeyExceptAppId, key ->
newWriteHandler(request));
Map<String, CreateShuffleWriteHandlerRequest> requestMap =
requests.get(request.getAppId());
- requestMap.putIfAbsent(partitionKey, request);
- return new ShuffleWriteHandlerWrapper(map.get(partitionKey),
isNewlyCreated);
+ requestMap.putIfAbsent(partitionKeyExceptAppId, request);
+ return new ShuffleWriteHandlerWrapper(map.get(partitionKeyExceptAppId),
isNewlyCreated);
}
@Override
@@ -66,10 +68,10 @@ public abstract class AbstractStorage implements Storage {
int[] range =
ShuffleStorageUtils.getPartitionRange(
request.getPartitionId(), request.getPartitionNumPerRange(),
request.getPartitionNum());
- String partitionKey =
- RssUtils.generatePartitionKey(request.getAppId(),
request.getShuffleId(), range[0]);
- map.computeIfAbsent(partitionKey, key -> newReadHandler(request));
- return map.get(partitionKey);
+ String partitionKeyExceptAppId =
+ generatePartitionKeyExceptAppId(request.getShuffleId(), range[0]);
+ map.computeIfAbsent(partitionKeyExceptAppId, key ->
newReadHandler(request));
+ return map.get(partitionKeyExceptAppId);
}
protected abstract ServerReadHandler
newReadHandler(CreateShuffleReadHandlerRequest request);
@@ -84,8 +86,13 @@ public abstract class AbstractStorage implements Storage {
if (map == null || map.isEmpty()) {
return false;
}
- String partitionKey = RssUtils.generatePartitionKey(appId, shuffleId,
partition);
- return map.containsKey(partitionKey);
+ String partitionKeyExceptAppId =
generatePartitionKeyExceptAppId(shuffleId, partition);
+ return map.containsKey(partitionKeyExceptAppId);
+ }
+
+ public static String generatePartitionKeyExceptAppId(Integer shuffleId,
Integer partition) {
+ return String.join(
+ Constants.KEY_SPLIT_CHAR, String.valueOf(shuffleId),
String.valueOf(partition));
}
@Override