This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 34bf686c8 [#2203] feat(server) Add write file quantity info for
application. (#2204)
34bf686c8 is described below
commit 34bf686c8440a1a86738176b5d67440ef16a2134
Author: leewish <[email protected]>
AuthorDate: Wed Oct 23 19:44:26 2024 +0800
[#2203] feat(server) Add write file quantity info for application. (#2204)
### What changes were proposed in this pull request?
Add write file quantity info for application.
### Why are the changes needed?
Fix: #2203
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Locally
Co-authored-by: wenlongwlli <[email protected]>
---
.../apache/uniffle/server/ShuffleFlushManager.java | 16 +++++-----
.../org/apache/uniffle/server/ShuffleTaskInfo.java | 12 ++++++--
.../uniffle/storage/common/AbstractStorage.java | 10 ++++--
.../org/apache/uniffle/storage/common/Storage.java | 4 +--
.../handler/api/ShuffleWriteHandlerWrapper.java | 36 ++++++++++++++++++++++
5 files changed, 65 insertions(+), 13 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 574b9ef0a..7d62bb96c 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -44,7 +44,7 @@ import org.apache.uniffle.server.flush.EventRetryException;
import org.apache.uniffle.server.storage.StorageManager;
import org.apache.uniffle.storage.common.LocalStorage;
import org.apache.uniffle.storage.common.Storage;
-import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
+import org.apache.uniffle.storage.handler.api.ShuffleWriteHandlerWrapper;
import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest;
import static
org.apache.uniffle.server.ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION;
@@ -170,16 +170,16 @@ public class ShuffleFlushManager {
storageDataReplica,
user,
maxConcurrencyPerPartitionToWrite);
- ShuffleWriteHandler handler;
+ ShuffleWriteHandlerWrapper handlerWrapper;
try {
- handler = storage.getOrCreateWriteHandler(request);
+ handlerWrapper = storage.getOrCreateWriteHandler(request);
} catch (Exception e) {
- LOG.warn("Failed to create write handler for event: {}", event, e);
+ LOG.warn("Failed to create write handlerWrapper for event: {}", event,
e);
throw new EventRetryException(e);
}
long startTime = System.currentTimeMillis();
- boolean writeSuccess = storageManager.write(storage, handler, event);
+ boolean writeSuccess = storageManager.write(storage,
handlerWrapper.getHandler(), event);
if (!writeSuccess) {
throw new EventRetryException();
}
@@ -207,9 +207,11 @@ public class ShuffleFlushManager {
if (null != shuffleTaskInfo) {
String storageHost = event.getUnderStorage().getStorageHost();
if (LocalStorage.STORAGE_HOST.equals(storageHost)) {
- shuffleTaskInfo.addOnLocalFileDataSize(event.getEncodedLength());
+ shuffleTaskInfo.addOnLocalFileDataSize(
+ event.getEncodedLength(), handlerWrapper.isNewlyCreated());
} else {
- shuffleTaskInfo.addOnHadoopDataSize(event.getEncodedLength());
+ shuffleTaskInfo.addOnHadoopDataSize(
+ event.getEncodedLength(), handlerWrapper.isNewlyCreated());
}
}
} finally {
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index 94987d661..be030769f 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -54,7 +54,9 @@ public class ShuffleTaskInfo {
private final AtomicLong totalDataSize = new AtomicLong(0);
private final AtomicLong inMemoryDataSize = new AtomicLong(0);
+ private final AtomicLong onLocalFileNum = new AtomicLong(0);
private final AtomicLong onLocalFileDataSize = new AtomicLong(0);
+ private final AtomicLong onHadoopFileNum = new AtomicLong(0);
private final AtomicLong onHadoopDataSize = new AtomicLong(0);
/** shuffleId, partitionId, partitionSize */
@@ -166,7 +168,10 @@ public class ShuffleTaskInfo {
return inMemoryDataSize.get();
}
- public long addOnLocalFileDataSize(long delta) {
+ public long addOnLocalFileDataSize(long delta, boolean isNewlyCreated) {
+ if (isNewlyCreated) {
+ onLocalFileNum.incrementAndGet();
+ }
inMemoryDataSize.addAndGet(-delta);
return onLocalFileDataSize.addAndGet(delta);
}
@@ -175,7 +180,10 @@ public class ShuffleTaskInfo {
return onLocalFileDataSize.get();
}
- public long addOnHadoopDataSize(long delta) {
+ public long addOnHadoopDataSize(long delta, boolean isNewlyCreated) {
+ if (isNewlyCreated) {
+ onHadoopDataSize.incrementAndGet();
+ }
inMemoryDataSize.addAndGet(-delta);
return onHadoopDataSize.addAndGet(delta);
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java
b/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java
index ef2b14b80..90c1c3b2a 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java
@@ -25,6 +25,7 @@ import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.storage.handler.api.ServerReadHandler;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
+import org.apache.uniffle.storage.handler.api.ShuffleWriteHandlerWrapper;
import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest;
import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;
@@ -40,17 +41,22 @@ public abstract class AbstractStorage implements Storage {
abstract ShuffleWriteHandler
newWriteHandler(CreateShuffleWriteHandlerRequest request);
@Override
- public ShuffleWriteHandler
getOrCreateWriteHandler(CreateShuffleWriteHandlerRequest request) {
+ public ShuffleWriteHandlerWrapper getOrCreateWriteHandler(
+ CreateShuffleWriteHandlerRequest request) {
writerHandlers.computeIfAbsent(request.getAppId(), key ->
JavaUtils.newConcurrentMap());
requests.computeIfAbsent(request.getAppId(), key ->
JavaUtils.newConcurrentMap());
Map<String, ShuffleWriteHandler> map =
writerHandlers.get(request.getAppId());
String partitionKey =
RssUtils.generatePartitionKey(
request.getAppId(), request.getShuffleId(),
request.getStartPartition());
+ boolean isNewlyCreated = false;
+ if (!map.containsKey(partitionKey)) {
+ isNewlyCreated = true;
+ }
map.computeIfAbsent(partitionKey, key -> newWriteHandler(request));
Map<String, CreateShuffleWriteHandlerRequest> requestMap =
requests.get(request.getAppId());
requestMap.putIfAbsent(partitionKey, request);
- return map.get(partitionKey);
+ return new ShuffleWriteHandlerWrapper(map.get(partitionKey),
isNewlyCreated);
}
@Override
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java
b/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java
index 80f36a7bd..d440de534 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java
@@ -20,7 +20,7 @@ package org.apache.uniffle.storage.common;
import java.io.IOException;
import org.apache.uniffle.storage.handler.api.ServerReadHandler;
-import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
+import org.apache.uniffle.storage.handler.api.ShuffleWriteHandlerWrapper;
import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest;
import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest;
@@ -32,7 +32,7 @@ public interface Storage {
void updateReadMetrics(StorageReadMetrics metrics);
- ShuffleWriteHandler getOrCreateWriteHandler(CreateShuffleWriteHandlerRequest
request)
+ ShuffleWriteHandlerWrapper
getOrCreateWriteHandler(CreateShuffleWriteHandlerRequest request)
throws IOException;
boolean containsWriteHandler(String appId);
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleWriteHandlerWrapper.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleWriteHandlerWrapper.java
new file mode 100644
index 000000000..82248ab3a
--- /dev/null
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleWriteHandlerWrapper.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.handler.api;
+
+public class ShuffleWriteHandlerWrapper {
+ private final ShuffleWriteHandler handler;
+ private final boolean isNewlyCreated;
+
+ public ShuffleWriteHandlerWrapper(ShuffleWriteHandler handler, boolean
isNewlyCreated) {
+ this.handler = handler;
+ this.isNewlyCreated = isNewlyCreated;
+ }
+
+ public ShuffleWriteHandler getHandler() {
+ return handler;
+ }
+
+ public boolean isNewlyCreated() {
+ return isNewlyCreated;
+ }
+}