This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f8d6de1b [#2133][FOLLOWUP] improvement(server): Support output the 
shuffle data size and block count before purge app (#2180)
5f8d6de1b is described below

commit 5f8d6de1b1a278bd42e6b7cf4f79e0721f67dbb9
Author: maobaolong <[email protected]>
AuthorDate: Thu Oct 17 21:09:51 2024 +0800

    [#2133][FOLLOWUP] improvement(server): Support output the shuffle data size 
and block count before purge app (#2180)
    
    ### What changes were proposed in this pull request?
    
    Output the shuffle data size and block count before purge app
    
    ### Why are the changes needed?
    
    Clearly and easily to view the shuffle blocks info.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Run locally.
    
    ```
    [2024-10-15 11:25:31.438] [clearResourceThread] [INFO] ShuffleTaskManager - 
Removing app summary info: appId: app-20241015112429-0069_1728962668484
    The app task info: 
ShuffleTaskInfo{appId='app-20241015112429-0069_1728962668484', 
totalDataSize=741B, inMemoryDataSize=0B, onLocalFileDataSize=741B, 
onHadoopDataSize=0B, maxSizePartitionInfo=[id=1, shuffleId=0, size=663B, 
blockCount=17], shuffleDetailInfo={0=ShuffleDetail [id=0, dataSize=741B, 
blockCount=19, startTime=2024-10-15 11:24:36]}}
    
    ```
---
 .../org/apache/uniffle/common/PartitionInfo.java   |  5 +-
 .../org/apache/uniffle/common/util/ByteUnit.java   |  4 ++
 .../org/apache/uniffle/common/util/Constants.java  |  2 +
 .../apache/uniffle/common/util/UnitConverter.java  | 22 ++++++
 .../uniffle/common/util/UnitConverterTest.java     | 11 +++
 .../apache/uniffle/server/ShuffleDetailInfo.java   | 80 ++++++++++++++++++++++
 .../org/apache/uniffle/server/ShuffleTaskInfo.java | 36 ++++++++--
 .../apache/uniffle/server/ShuffleTaskManager.java  | 22 +-----
 8 files changed, 155 insertions(+), 27 deletions(-)

diff --git a/common/src/main/java/org/apache/uniffle/common/PartitionInfo.java 
b/common/src/main/java/org/apache/uniffle/common/PartitionInfo.java
index ee9e64088..ea0a5976e 100644
--- a/common/src/main/java/org/apache/uniffle/common/PartitionInfo.java
+++ b/common/src/main/java/org/apache/uniffle/common/PartitionInfo.java
@@ -17,6 +17,8 @@
 
 package org.apache.uniffle.common;
 
+import org.apache.uniffle.common.util.UnitConverter;
+
 public class PartitionInfo {
   private int id;
   private int shuffleId;
@@ -49,7 +51,8 @@ public class PartitionInfo {
   @Override
   public String toString() {
     return String.format(
-        "[id=%s, shuffleId=%s, size=%s, blockCount=%s]", id, shuffleId, size, 
blockCount);
+        "[id=%s, shuffleId=%s, size=%s, blockCount=%s]",
+        id, shuffleId, UnitConverter.formatSize(size), blockCount);
   }
 
   public boolean isCurrentPartition(int shuffleId, int partitionId) {
diff --git a/common/src/main/java/org/apache/uniffle/common/util/ByteUnit.java 
b/common/src/main/java/org/apache/uniffle/common/util/ByteUnit.java
index c3866201e..ffef7dcbc 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/ByteUnit.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/ByteUnit.java
@@ -83,5 +83,9 @@ public enum ByteUnit {
     return convertTo(d, PiB);
   }
 
+  public long getMultiplier() {
+    return multiplier;
+  }
+
   private final long multiplier;
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/util/Constants.java 
b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
index 2d1de3551..d63c2e46e 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
@@ -89,4 +89,6 @@ public final class Constants {
   // We are accessing this configuration through RssConf, the spark prefix is 
stripped, hence, this
   // field.
   public static final String DRIVER_HOST = "driver.host";
+
+  public static final String DATE_PATTERN = "yyyy-MM-dd HH:mm:ss";
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/util/UnitConverter.java 
b/common/src/main/java/org/apache/uniffle/common/util/UnitConverter.java
index bbef038cb..f3b4d5e0f 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/UnitConverter.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/UnitConverter.java
@@ -148,4 +148,26 @@ public final class UnitConverter {
       throw new NumberFormatException(timeError + "\n" + e.getMessage());
     }
   }
+
+  /**
+   * Returns a human-readable version of bytes 10GiB 2048KiB etc.
+   *
+   * @param bytes the number of bytes
+   * @return human-readable version
+   */
+  public static String formatSize(long bytes) {
+    if (bytes < ByteUnit.KiB.getMultiplier()) {
+      return bytes + "B";
+    } else if (bytes < ByteUnit.MiB.getMultiplier()) {
+      return String.format("%.2fKiB", bytes / (double) 
ByteUnit.KiB.getMultiplier());
+    } else if (bytes < ByteUnit.GiB.getMultiplier()) {
+      return String.format("%.2fMiB", bytes / (double) 
ByteUnit.MiB.getMultiplier());
+    } else if (bytes < ByteUnit.TiB.getMultiplier()) {
+      return String.format("%.2fGiB", bytes / (double) 
ByteUnit.GiB.getMultiplier());
+    } else if (bytes < ByteUnit.PiB.getMultiplier()) {
+      return String.format("%.2fTiB", bytes / (double) 
ByteUnit.TiB.getMultiplier());
+    } else {
+      return String.format("%.2fPiB", bytes / (double) 
ByteUnit.PiB.getMultiplier());
+    }
+  }
 }
diff --git 
a/common/src/test/java/org/apache/uniffle/common/util/UnitConverterTest.java 
b/common/src/test/java/org/apache/uniffle/common/util/UnitConverterTest.java
index f3ad5d1e8..89f215727 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/UnitConverterTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/UnitConverterTest.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.common.util;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
@@ -115,4 +116,14 @@ public class UnitConverterTest {
       assertEquals(expected, UnitConverter.timeStringAs(value, unit));
     }
   }
+
+  @Test
+  public void testFormatSize() {
+    assertEquals("500B", UnitConverter.formatSize(500), "Should display in 
bytes");
+    assertEquals("1.90KiB", UnitConverter.formatSize(1946), "Should display in 
KiB");
+    assertEquals("11.77MiB", UnitConverter.formatSize(12345678), "Should 
display in MiB");
+    assertEquals("11.50GiB", UnitConverter.formatSize(12345678901L), "Should 
display in GiB");
+    assertEquals("1.00TiB", UnitConverter.formatSize(1099511627776L), "Should 
display in TiB");
+    assertEquals("9.77PiB", UnitConverter.formatSize(10995116277760000L), 
"Should display in PiB");
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleDetailInfo.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleDetailInfo.java
new file mode 100644
index 000000000..5f0c27ed1
--- /dev/null
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleDetailInfo.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang3.time.DateFormatUtils;
+
+import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.UnitConverter;
+
+public class ShuffleDetailInfo {
+  private int id;
+  private AtomicLong dataSize;
+  private AtomicLong blockCount;
+  private AtomicLong partitionCount;
+  private long startTime;
+
+  public ShuffleDetailInfo(int id, long startTime) {
+    this.id = id;
+    this.dataSize = new AtomicLong();
+    this.blockCount = new AtomicLong();
+    this.partitionCount = new AtomicLong();
+    this.startTime = startTime;
+  }
+
+  public int getId() {
+    return id;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public long getBlockCount() {
+    return blockCount.get();
+  }
+
+  public long getDataSize() {
+    return dataSize.get();
+  }
+
+  public void incrDataSize(long size) {
+    dataSize.addAndGet(size);
+  }
+
+  public void incrBlockCount(long count) {
+    blockCount.addAndGet(count);
+  }
+
+  public void incrPartitionCount() {
+    partitionCount.addAndGet(1);
+  }
+
+  @Override
+  public String toString() {
+    return String.format(
+        "ShuffleDetail [%d: partitionCount=%s, blockCount=%s, size=%s, 
startTime=%s]",
+        id,
+        partitionCount,
+        blockCount,
+        UnitConverter.formatSize(dataSize.get()),
+        DateFormatUtils.format(startTime, Constants.DATE_PATTERN));
+  }
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index 2cbbf59c2..d4e6eeb32 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.common.PartitionInfo;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.util.JavaUtils;
+import org.apache.uniffle.common.util.UnitConverter;
 
 /**
  * ShuffleTaskInfo contains the information of submitting the shuffle, the 
information of the cache
@@ -67,7 +68,10 @@ public class ShuffleTaskInfo {
 
   private final AtomicReference<ShuffleSpecification> specification;
 
+  /** shuffleId -> partitionId -> block counter */
   private final Map<Integer, Map<Integer, AtomicLong>> partitionBlockCounters;
+  /** shuffleId -> shuffleDetailInfo */
+  private final Map<Integer, ShuffleDetailInfo> shuffleDetailInfos;
 
   private final Map<Integer, Integer> latestStageAttemptNumbers;
 
@@ -84,6 +88,7 @@ public class ShuffleTaskInfo {
     this.specification = new AtomicReference<>();
     this.partitionBlockCounters = JavaUtils.newConcurrentMap();
     this.latestStageAttemptNumbers = JavaUtils.newConcurrentMap();
+    this.shuffleDetailInfos = JavaUtils.newConcurrentMap();
   }
 
   public Long getCurrentTimes() {
@@ -129,9 +134,18 @@ public class ShuffleTaskInfo {
   public long addPartitionDataSize(int shuffleId, int partitionId, long delta) 
{
     totalDataSize.addAndGet(delta);
     inMemoryDataSize.addAndGet(delta);
+    ShuffleDetailInfo shuffleDetailInfo =
+        shuffleDetailInfos.computeIfAbsent(
+            shuffleId, key -> new ShuffleDetailInfo(shuffleId, 
System.currentTimeMillis()));
+    shuffleDetailInfo.incrDataSize(delta);
     partitionDataSizes.computeIfAbsent(shuffleId, key -> 
JavaUtils.newConcurrentMap());
     Map<Integer, Long> partitions = partitionDataSizes.get(shuffleId);
-    partitions.putIfAbsent(partitionId, 0L);
+    partitions.computeIfAbsent(
+        partitionId,
+        k -> {
+          shuffleDetailInfo.incrPartitionCount();
+          return 0L;
+        });
     return partitions.computeIfPresent(
         partitionId,
         (k, v) -> {
@@ -235,6 +249,10 @@ public class ShuffleTaskInfo {
     if (maxSizePartitionInfo.isCurrentPartition(shuffleId, partitionId)) {
       maxSizePartitionInfo.setBlockCount(blockCount);
     }
+    shuffleDetailInfos
+        .computeIfAbsent(
+            shuffleId, key -> new ShuffleDetailInfo(shuffleId, 
System.currentTimeMillis()))
+        .incrBlockCount(delta);
   }
 
   public long getBlockNumber(int shuffleId, int partitionId) {
@@ -261,6 +279,10 @@ public class ShuffleTaskInfo {
     return maxSizePartitionInfo;
   }
 
+  public ShuffleDetailInfo getShuffleDetailInfo(int shuffleId) {
+    return shuffleDetailInfos.get(shuffleId);
+  }
+
   @Override
   public String toString() {
     return "ShuffleTaskInfo{"
@@ -268,17 +290,17 @@ public class ShuffleTaskInfo {
         + appId
         + '\''
         + ", totalDataSize="
-        + totalDataSize
+        + UnitConverter.formatSize(totalDataSize.get())
         + ", inMemoryDataSize="
-        + inMemoryDataSize
+        + UnitConverter.formatSize(inMemoryDataSize.get())
         + ", onLocalFileDataSize="
-        + onLocalFileDataSize
+        + UnitConverter.formatSize(onLocalFileDataSize.get())
         + ", onHadoopDataSize="
-        + onHadoopDataSize
-        + ", partitionDataSizes="
-        + partitionDataSizes
+        + UnitConverter.formatSize(onHadoopDataSize.get())
         + ", maxSizePartitionInfo="
         + maxSizePartitionInfo
+        + ", shuffleDetailInfo="
+        + shuffleDetailInfos
         + '}';
   }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index e51ab5352..fa531ba02 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -72,6 +72,7 @@ import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.common.util.UnitConverter;
 import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
 import org.apache.uniffle.server.buffer.ShuffleBuffer;
 import org.apache.uniffle.server.buffer.ShuffleBufferManager;
@@ -866,13 +867,6 @@ public class ShuffleTaskManager {
       StringBuilder partitionInfoSummary = new StringBuilder();
       partitionInfoSummary.append("appId: ").append(appId).append("\n");
       for (int shuffleId : shuffleTaskInfo.getShuffleIds()) {
-        Roaring64NavigableMap[] bitmaps = 
partitionsToBlockIds.get(appId).get(shuffleId);
-        long shuffleBlockCount = 0L;
-        if (bitmaps != null) {
-          for (Roaring64NavigableMap bitmap : bitmaps) {
-            shuffleBlockCount += bitmap.getLongCardinality();
-          }
-        }
         if 
(conf.getBoolean(ShuffleServerConf.SERVER_LOG_APP_DETAIL_WHILE_REMOVE_ENABLED)) 
{
           for (int partitionId : shuffleTaskInfo.getPartitionIds(shuffleId)) {
             long partitionSize = 
shuffleTaskInfo.getPartitionDataSize(shuffleId, partitionId);
@@ -883,21 +877,11 @@ public class ShuffleTaskManager {
                 shuffleId,
                 partitionId,
                 partitionBlockCount,
-                partitionSize);
+                UnitConverter.formatSize(partitionSize));
           }
         }
-        partitionInfoSummary
-            .append(" shuffleId: ")
-            .append(shuffleId)
-            .append(" contains partition/block: ")
-            .append(shuffleTaskInfo.getPartitionIds(shuffleId).size())
-            .append("/")
-            .append(shuffleBlockCount)
-            .append("\n");
       }
-      partitionInfoSummary
-          .append("The maxSizePartitionInfo: ")
-          .append(shuffleTaskInfo.getMaxSizePartitionInfo());
+      partitionInfoSummary.append("The app task info: 
").append(shuffleTaskInfo);
       LOG.info("Removing app summary info: {}", partitionInfoSummary);
 
       partitionsToBlockIds.remove(appId);

Reply via email to