This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 34bf686c8 [#2203] feat(server) Add write file quantity info for 
application. (#2204)
34bf686c8 is described below

commit 34bf686c8440a1a86738176b5d67440ef16a2134
Author: leewish <[email protected]>
AuthorDate: Wed Oct 23 19:44:26 2024 +0800

    [#2203] feat(server) Add write file quantity info for application. (#2204)
    
    ### What changes were proposed in this pull request?
    
    Add write file quantity info for application.
    
    ### Why are the changes needed?
    
    Fix: #2203
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    Locally
    
    Co-authored-by: wenlongwlli <[email protected]>
---
 .../apache/uniffle/server/ShuffleFlushManager.java | 16 +++++-----
 .../org/apache/uniffle/server/ShuffleTaskInfo.java | 12 ++++++--
 .../uniffle/storage/common/AbstractStorage.java    | 10 ++++--
 .../org/apache/uniffle/storage/common/Storage.java |  4 +--
 .../handler/api/ShuffleWriteHandlerWrapper.java    | 36 ++++++++++++++++++++++
 5 files changed, 65 insertions(+), 13 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 574b9ef0a..7d62bb96c 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -44,7 +44,7 @@ import org.apache.uniffle.server.flush.EventRetryException;
 import org.apache.uniffle.server.storage.StorageManager;
 import org.apache.uniffle.storage.common.LocalStorage;
 import org.apache.uniffle.storage.common.Storage;
-import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
+import org.apache.uniffle.storage.handler.api.ShuffleWriteHandlerWrapper;
 import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest;
 
 import static 
org.apache.uniffle.server.ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION;
@@ -170,16 +170,16 @@ public class ShuffleFlushManager {
               storageDataReplica,
               user,
               maxConcurrencyPerPartitionToWrite);
-      ShuffleWriteHandler handler;
+      ShuffleWriteHandlerWrapper handlerWrapper;
       try {
-        handler = storage.getOrCreateWriteHandler(request);
+        handlerWrapper = storage.getOrCreateWriteHandler(request);
       } catch (Exception e) {
-        LOG.warn("Failed to create write handler for event: {}", event, e);
+        LOG.warn("Failed to create write handlerWrapper for event: {}", event, 
e);
         throw new EventRetryException(e);
       }
 
       long startTime = System.currentTimeMillis();
-      boolean writeSuccess = storageManager.write(storage, handler, event);
+      boolean writeSuccess = storageManager.write(storage, 
handlerWrapper.getHandler(), event);
       if (!writeSuccess) {
         throw new EventRetryException();
       }
@@ -207,9 +207,11 @@ public class ShuffleFlushManager {
       if (null != shuffleTaskInfo) {
         String storageHost = event.getUnderStorage().getStorageHost();
         if (LocalStorage.STORAGE_HOST.equals(storageHost)) {
-          shuffleTaskInfo.addOnLocalFileDataSize(event.getEncodedLength());
+          shuffleTaskInfo.addOnLocalFileDataSize(
+              event.getEncodedLength(), handlerWrapper.isNewlyCreated());
         } else {
-          shuffleTaskInfo.addOnHadoopDataSize(event.getEncodedLength());
+          shuffleTaskInfo.addOnHadoopDataSize(
+              event.getEncodedLength(), handlerWrapper.isNewlyCreated());
         }
       }
     } finally {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index 94987d661..be030769f 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -54,7 +54,9 @@ public class ShuffleTaskInfo {
 
   private final AtomicLong totalDataSize = new AtomicLong(0);
   private final AtomicLong inMemoryDataSize = new AtomicLong(0);
+  private final AtomicLong onLocalFileNum = new AtomicLong(0);
   private final AtomicLong onLocalFileDataSize = new AtomicLong(0);
+  private final AtomicLong onHadoopFileNum = new AtomicLong(0);
   private final AtomicLong onHadoopDataSize = new AtomicLong(0);
 
   /** shuffleId, partitionId, partitionSize */
@@ -166,7 +168,10 @@ public class ShuffleTaskInfo {
     return inMemoryDataSize.get();
   }
 
-  public long addOnLocalFileDataSize(long delta) {
+  public long addOnLocalFileDataSize(long delta, boolean isNewlyCreated) {
+    if (isNewlyCreated) {
+      onLocalFileNum.incrementAndGet();
+    }
     inMemoryDataSize.addAndGet(-delta);
     return onLocalFileDataSize.addAndGet(delta);
   }
@@ -175,7 +180,10 @@ public class ShuffleTaskInfo {
     return onLocalFileDataSize.get();
   }
 
-  public long addOnHadoopDataSize(long delta) {
+  public long addOnHadoopDataSize(long delta, boolean isNewlyCreated) {
+    if (isNewlyCreated) {
+      onHadoopDataSize.incrementAndGet();
+    }
     inMemoryDataSize.addAndGet(-delta);
     return onHadoopDataSize.addAndGet(delta);
   }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java 
b/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java
index ef2b14b80..90c1c3b2a 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java
@@ -25,6 +25,7 @@ import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.storage.handler.api.ServerReadHandler;
 import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
+import org.apache.uniffle.storage.handler.api.ShuffleWriteHandlerWrapper;
 import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest;
 import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest;
 import org.apache.uniffle.storage.util.ShuffleStorageUtils;
@@ -40,17 +41,22 @@ public abstract class AbstractStorage implements Storage {
   abstract ShuffleWriteHandler 
newWriteHandler(CreateShuffleWriteHandlerRequest request);
 
   @Override
-  public ShuffleWriteHandler 
getOrCreateWriteHandler(CreateShuffleWriteHandlerRequest request) {
+  public ShuffleWriteHandlerWrapper getOrCreateWriteHandler(
+      CreateShuffleWriteHandlerRequest request) {
     writerHandlers.computeIfAbsent(request.getAppId(), key -> 
JavaUtils.newConcurrentMap());
     requests.computeIfAbsent(request.getAppId(), key -> 
JavaUtils.newConcurrentMap());
     Map<String, ShuffleWriteHandler> map = 
writerHandlers.get(request.getAppId());
     String partitionKey =
         RssUtils.generatePartitionKey(
             request.getAppId(), request.getShuffleId(), 
request.getStartPartition());
+    boolean isNewlyCreated = false;
+    if (!map.containsKey(partitionKey)) {
+      isNewlyCreated = true;
+    }
     map.computeIfAbsent(partitionKey, key -> newWriteHandler(request));
     Map<String, CreateShuffleWriteHandlerRequest> requestMap = 
requests.get(request.getAppId());
     requestMap.putIfAbsent(partitionKey, request);
-    return map.get(partitionKey);
+    return new ShuffleWriteHandlerWrapper(map.get(partitionKey), 
isNewlyCreated);
   }
 
   @Override
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java 
b/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java
index 80f36a7bd..d440de534 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java
@@ -20,7 +20,7 @@ package org.apache.uniffle.storage.common;
 import java.io.IOException;
 
 import org.apache.uniffle.storage.handler.api.ServerReadHandler;
-import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
+import org.apache.uniffle.storage.handler.api.ShuffleWriteHandlerWrapper;
 import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest;
 import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest;
 
@@ -32,7 +32,7 @@ public interface Storage {
 
   void updateReadMetrics(StorageReadMetrics metrics);
 
-  ShuffleWriteHandler getOrCreateWriteHandler(CreateShuffleWriteHandlerRequest 
request)
+  ShuffleWriteHandlerWrapper 
getOrCreateWriteHandler(CreateShuffleWriteHandlerRequest request)
       throws IOException;
 
   boolean containsWriteHandler(String appId);
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleWriteHandlerWrapper.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleWriteHandlerWrapper.java
new file mode 100644
index 000000000..82248ab3a
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleWriteHandlerWrapper.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.handler.api;
+
+public class ShuffleWriteHandlerWrapper {
+  private final ShuffleWriteHandler handler;
+  private final boolean isNewlyCreated;
+
+  public ShuffleWriteHandlerWrapper(ShuffleWriteHandler handler, boolean 
isNewlyCreated) {
+    this.handler = handler;
+    this.isNewlyCreated = isNewlyCreated;
+  }
+
+  public ShuffleWriteHandler getHandler() {
+    return handler;
+  }
+
+  public boolean isNewlyCreated() {
+    return isNewlyCreated;
+  }
+}

Reply via email to