This is an automated email from the ASF dual-hosted git repository.

rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new b4ed85429 [#1903] improvement(server): Add some metrics for shuffle 
server (#1904)
b4ed85429 is described below

commit b4ed85429c601216508d48b6e6c9b848f032b797
Author: kqhzz <kuangqinghu...@163.com>
AuthorDate: Thu Jul 18 02:38:28 2024 +0800

    [#1903] improvement(server): Add some metrics for shuffle server (#1904)
    
    ### What changes were proposed in this pull request?
    
    Add some metrics for the shuffle server.
    
    ### Why are the changes needed?
    
    This would be more helpful for us to understand the status of the shuffle 
server.
    
    Fix: #1903
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
---
 .../common/storage/ApplicationStorageInfo.java     | 56 ++++++++++++++++++++++
 .../uniffle/server/ShuffleServerMetrics.java       | 15 ++++++
 .../server/storage/HadoopStorageManager.java       |  1 +
 .../server/storage/LocalStorageManager.java        |  1 +
 .../server/storage/SingleStorageManager.java       | 27 +++++++++++
 .../uniffle/storage/common/AbstractStorage.java    |  5 ++
 .../org/apache/uniffle/storage/common/Storage.java |  2 +
 7 files changed, 107 insertions(+)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/storage/ApplicationStorageInfo.java
 
b/common/src/main/java/org/apache/uniffle/common/storage/ApplicationStorageInfo.java
new file mode 100644
index 000000000..06ddcdf1f
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/storage/ApplicationStorageInfo.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.storage;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ApplicationStorageInfo {
+  private String appId;
+  private AtomicLong fileNum;
+  private AtomicLong usedBytes;
+
+  public ApplicationStorageInfo(String appId) {
+    this.appId = appId;
+    this.fileNum = new AtomicLong();
+    this.usedBytes = new AtomicLong();
+  }
+
+  public String getAppId() {
+    return appId;
+  }
+
+  public void setAppId(String appId) {
+    this.appId = appId;
+  }
+
+  public long getFileNum() {
+    return fileNum.get();
+  }
+
+  public void incFileNum(long fileNum) {
+    this.fileNum.addAndGet(fileNum);
+  }
+
+  public long getUsedBytes() {
+    return usedBytes.get();
+  }
+
+  public void incUsedBytes(long usedBytes) {
+    this.usedBytes.addAndGet(usedBytes);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index 8eada73c5..309cee553 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -39,6 +39,11 @@ public class ShuffleServerMetrics {
 
   private static final String TOTAL_RECEIVED_DATA = "total_received_data";
   private static final String TOTAL_WRITE_DATA = "total_write_data";
+  private static final String TOTAL_DELETE_DATA = "total_delete_data";
+  private static final String TOTAL_FLUSH_FILE_NUM = "total_flush_file_num";
+  private static final String TOTAL_DELETE_FILE_NUM = "total_delete_file_num";
+  private static final String STORAGE_USED_BYTES = "storage_used_bytes";
+  private static final String FLUSH_FILE_NUM = "flush_file_num";
   private static final String TOTAL_WRITE_BLOCK = "total_write_block";
   private static final String WRITE_BLOCK_SIZE = "write_block_size";
   private static final String TOTAL_WRITE_TIME = "total_write_time";
@@ -155,6 +160,11 @@ public class ShuffleServerMetrics {
 
   public static Counter.Child counterTotalReceivedDataSize;
   public static Counter.Child counterTotalWriteDataSize;
+  public static Counter.Child counterTotalDeleteDataSize;
+  public static Counter.Child counterTotalFlushFileNum;
+  public static Counter.Child counterTotalDeleteFileNum;
+  public static Gauge.Child gaugeStorageUsedBytes;
+  public static Gauge.Child gaugeFlushFileNum;
   public static Counter.Child counterTotalWriteBlockSize;
   public static Histogram appHistogramWriteBlockSize;
   public static Counter.Child counterTotalWriteTime;
@@ -337,6 +347,11 @@ public class ShuffleServerMetrics {
   private static void setUpMetrics(ShuffleServerConf serverConf) {
     counterTotalReceivedDataSize = 
metricsManager.addLabeledCounter(TOTAL_RECEIVED_DATA);
     counterTotalWriteDataSize = 
metricsManager.addLabeledCounter(TOTAL_WRITE_DATA);
+    counterTotalDeleteDataSize = 
metricsManager.addLabeledCounter(TOTAL_DELETE_DATA);
+    counterTotalFlushFileNum = 
metricsManager.addLabeledCounter(TOTAL_FLUSH_FILE_NUM);
+    counterTotalDeleteFileNum = 
metricsManager.addLabeledCounter(TOTAL_DELETE_FILE_NUM);
+    gaugeStorageUsedBytes = metricsManager.addLabeledGauge(STORAGE_USED_BYTES);
+    gaugeFlushFileNum = metricsManager.addLabeledGauge(FLUSH_FILE_NUM);
     counterTotalWriteBlockSize = 
metricsManager.addLabeledCounter(TOTAL_WRITE_BLOCK);
     appHistogramWriteBlockSize =
         metricsManager.addHistogram(
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
index 90d62a1ed..66c4349cf 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
@@ -148,6 +148,7 @@ public class HadoopStorageManager extends 
SingleStorageManager {
         }
       }
       deleteHandler.delete(deletePaths.toArray(new String[0]), appId, 
event.getUser());
+      removeAppStorageInfo(event);
     } else {
       LOG.warn("Storage gotten is null when removing resources for event: {}", 
event);
     }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index c55fe9060..488629544 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -326,6 +326,7 @@ public class LocalStorageManager extends 
SingleStorageManager {
             .collect(Collectors.toList());
 
     deleteHandler.delete(deletePaths.toArray(new String[deletePaths.size()]), 
appId, user);
+    removeAppStorageInfo(event);
   }
 
   private void cleanupStorageSelectionCache(PurgeEvent event) {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
index 6acead608..fb974d5f5 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/SingleStorageManager.java
@@ -18,6 +18,7 @@
 package org.apache.uniffle.server.storage;
 
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Lists;
@@ -26,10 +27,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.storage.ApplicationStorageInfo;
+import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.server.ShuffleDataFlushEvent;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.server.ShuffleServerMetrics;
+import org.apache.uniffle.server.event.PurgeEvent;
 import org.apache.uniffle.storage.common.Storage;
 import org.apache.uniffle.storage.common.StorageWriteMetrics;
 import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
@@ -42,6 +46,8 @@ public abstract class SingleStorageManager implements 
StorageManager {
   private final long eventSizeThresholdL1;
   private final long eventSizeThresholdL2;
   private final long eventSizeThresholdL3;
+  protected final Map<String, ApplicationStorageInfo> appStorageInfoMap =
+      JavaUtils.newConcurrentMap();
 
   public SingleStorageManager(ShuffleServerConf conf) {
     writeSlowThreshold = 
conf.getSizeAsBytes(ShuffleServerConf.SERVER_WRITE_SLOW_THRESHOLD);
@@ -92,6 +98,16 @@ public abstract class SingleStorageManager implements 
StorageManager {
       } else {
         ShuffleServerMetrics.counterEventSizeThresholdLevel4.inc();
       }
+      String appId = event.getAppId();
+      ApplicationStorageInfo appStorage =
+          appStorageInfoMap.computeIfAbsent(appId, id -> new 
ApplicationStorageInfo(appId));
+      appStorage.incUsedBytes(event.getSize());
+      ShuffleServerMetrics.gaugeStorageUsedBytes.inc(event.getSize());
+      if (event.getUnderStorage().containsWriteHandler(appId)) {
+        appStorage.incFileNum(1);
+        ShuffleServerMetrics.gaugeFlushFileNum.inc();
+        ShuffleServerMetrics.counterTotalFlushFileNum.inc();
+      }
       Storage storage = event.getUnderStorage();
       if (storage != null) {
         storage.updateWriteMetrics(metrics);
@@ -149,4 +165,15 @@ public abstract class SingleStorageManager implements 
StorageManager {
   public void stop() {
     // do nothing
   }
+
+  public void removeAppStorageInfo(PurgeEvent event) {
+    String appId = event.getAppId();
+    ApplicationStorageInfo info = appStorageInfoMap.remove(appId);
+    if (info != null) {
+      ShuffleServerMetrics.gaugeStorageUsedBytes.dec(info.getUsedBytes());
+      ShuffleServerMetrics.gaugeFlushFileNum.dec(info.getFileNum());
+      ShuffleServerMetrics.counterTotalDeleteDataSize.inc(info.getUsedBytes());
+      ShuffleServerMetrics.counterTotalDeleteFileNum.inc(info.getFileNum());
+    }
+  }
 }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java 
b/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java
index f8bf8c9b4..ef2b14b80 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java
@@ -68,6 +68,11 @@ public abstract class AbstractStorage implements Storage {
 
   protected abstract ServerReadHandler 
newReadHandler(CreateShuffleReadHandlerRequest request);
 
+  @Override
+  public boolean containsWriteHandler(String appId) {
+    return writerHandlers.containsKey(appId);
+  }
+
   public boolean containsWriteHandler(String appId, int shuffleId, int 
partition) {
     Map<String, ShuffleWriteHandler> map = writerHandlers.get(appId);
     if (map == null || map.isEmpty()) {
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java 
b/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java
index 43168c324..80f36a7bd 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/Storage.java
@@ -35,6 +35,8 @@ public interface Storage {
   ShuffleWriteHandler getOrCreateWriteHandler(CreateShuffleWriteHandlerRequest 
request)
       throws IOException;
 
+  boolean containsWriteHandler(String appId);
+
   ServerReadHandler getOrCreateReadHandler(CreateShuffleReadHandlerRequest 
request);
 
   void removeHandlers(String appId);

Reply via email to