This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 5f8d6de1b [#2133][FOLLOWUP] improvement(server): Support output the
shuffle data size and block count before purge app (#2180)
5f8d6de1b is described below
commit 5f8d6de1b1a278bd42e6b7cf4f79e0721f67dbb9
Author: maobaolong <[email protected]>
AuthorDate: Thu Oct 17 21:09:51 2024 +0800
[#2133][FOLLOWUP] improvement(server): Support output the shuffle data size
and block count before purge app (#2180)
### What changes were proposed in this pull request?
Output the shuffle data size and block count before purge app
### Why are the changes needed?
Clearly and easily to view the shuffle blocks info.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Run locally.
```
[2024-10-15 11:25:31.438] [clearResourceThread] [INFO] ShuffleTaskManager -
Removing app summary info: appId: app-20241015112429-0069_1728962668484
The app task info:
ShuffleTaskInfo{appId='app-20241015112429-0069_1728962668484',
totalDataSize=741B, inMemoryDataSize=0B, onLocalFileDataSize=741B,
onHadoopDataSize=0B, maxSizePartitionInfo=[id=1, shuffleId=0, size=663B,
blockCount=17], shuffleDetailInfo={0=ShuffleDetail [id=0, dataSize=741B,
blockCount=19, startTime=2024-10-15 11:24:36]}}
```
---
.../org/apache/uniffle/common/PartitionInfo.java | 5 +-
.../org/apache/uniffle/common/util/ByteUnit.java | 4 ++
.../org/apache/uniffle/common/util/Constants.java | 2 +
.../apache/uniffle/common/util/UnitConverter.java | 22 ++++++
.../uniffle/common/util/UnitConverterTest.java | 11 +++
.../apache/uniffle/server/ShuffleDetailInfo.java | 80 ++++++++++++++++++++++
.../org/apache/uniffle/server/ShuffleTaskInfo.java | 36 ++++++++--
.../apache/uniffle/server/ShuffleTaskManager.java | 22 +-----
8 files changed, 155 insertions(+), 27 deletions(-)
diff --git a/common/src/main/java/org/apache/uniffle/common/PartitionInfo.java
b/common/src/main/java/org/apache/uniffle/common/PartitionInfo.java
index ee9e64088..ea0a5976e 100644
--- a/common/src/main/java/org/apache/uniffle/common/PartitionInfo.java
+++ b/common/src/main/java/org/apache/uniffle/common/PartitionInfo.java
@@ -17,6 +17,8 @@
package org.apache.uniffle.common;
+import org.apache.uniffle.common.util.UnitConverter;
+
public class PartitionInfo {
private int id;
private int shuffleId;
@@ -49,7 +51,8 @@ public class PartitionInfo {
@Override
public String toString() {
return String.format(
- "[id=%s, shuffleId=%s, size=%s, blockCount=%s]", id, shuffleId, size,
blockCount);
+ "[id=%s, shuffleId=%s, size=%s, blockCount=%s]",
+ id, shuffleId, UnitConverter.formatSize(size), blockCount);
}
public boolean isCurrentPartition(int shuffleId, int partitionId) {
diff --git a/common/src/main/java/org/apache/uniffle/common/util/ByteUnit.java
b/common/src/main/java/org/apache/uniffle/common/util/ByteUnit.java
index c3866201e..ffef7dcbc 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/ByteUnit.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/ByteUnit.java
@@ -83,5 +83,9 @@ public enum ByteUnit {
return convertTo(d, PiB);
}
+ public long getMultiplier() {
+ return multiplier;
+ }
+
private final long multiplier;
}
diff --git a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
index 2d1de3551..d63c2e46e 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
@@ -89,4 +89,6 @@ public final class Constants {
// We are accessing this configuration through RssConf, the spark prefix is
stripped, hence, this
// field.
public static final String DRIVER_HOST = "driver.host";
+
+ public static final String DATE_PATTERN = "yyyy-MM-dd HH:mm:ss";
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/util/UnitConverter.java
b/common/src/main/java/org/apache/uniffle/common/util/UnitConverter.java
index bbef038cb..f3b4d5e0f 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/UnitConverter.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/UnitConverter.java
@@ -148,4 +148,26 @@ public final class UnitConverter {
throw new NumberFormatException(timeError + "\n" + e.getMessage());
}
}
+
+ /**
+ * Returns a human-readable version of bytes 10GiB 2048KiB etc.
+ *
+ * @param bytes the number of bytes
+ * @return human-readable version
+ */
+ public static String formatSize(long bytes) {
+ if (bytes < ByteUnit.KiB.getMultiplier()) {
+ return bytes + "B";
+ } else if (bytes < ByteUnit.MiB.getMultiplier()) {
+ return String.format("%.2fKiB", bytes / (double)
ByteUnit.KiB.getMultiplier());
+ } else if (bytes < ByteUnit.GiB.getMultiplier()) {
+ return String.format("%.2fMiB", bytes / (double)
ByteUnit.MiB.getMultiplier());
+ } else if (bytes < ByteUnit.TiB.getMultiplier()) {
+ return String.format("%.2fGiB", bytes / (double)
ByteUnit.GiB.getMultiplier());
+ } else if (bytes < ByteUnit.PiB.getMultiplier()) {
+ return String.format("%.2fTiB", bytes / (double)
ByteUnit.TiB.getMultiplier());
+ } else {
+ return String.format("%.2fPiB", bytes / (double)
ByteUnit.PiB.getMultiplier());
+ }
+ }
}
diff --git
a/common/src/test/java/org/apache/uniffle/common/util/UnitConverterTest.java
b/common/src/test/java/org/apache/uniffle/common/util/UnitConverterTest.java
index f3ad5d1e8..89f215727 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/UnitConverterTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/UnitConverterTest.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.common.util;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
@@ -115,4 +116,14 @@ public class UnitConverterTest {
assertEquals(expected, UnitConverter.timeStringAs(value, unit));
}
}
+
+ @Test
+ public void testFormatSize() {
+ assertEquals("500B", UnitConverter.formatSize(500), "Should display in
bytes");
+ assertEquals("1.90KiB", UnitConverter.formatSize(1946), "Should display in
KiB");
+ assertEquals("11.77MiB", UnitConverter.formatSize(12345678), "Should
display in MiB");
+ assertEquals("11.50GiB", UnitConverter.formatSize(12345678901L), "Should
display in GiB");
+ assertEquals("1.00TiB", UnitConverter.formatSize(1099511627776L), "Should
display in TiB");
+ assertEquals("9.77PiB", UnitConverter.formatSize(10995116277760000L),
"Should display in PiB");
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleDetailInfo.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleDetailInfo.java
new file mode 100644
index 000000000..5f0c27ed1
--- /dev/null
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleDetailInfo.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang3.time.DateFormatUtils;
+
+import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.UnitConverter;
+
+public class ShuffleDetailInfo {
+ private int id;
+ private AtomicLong dataSize;
+ private AtomicLong blockCount;
+ private AtomicLong partitionCount;
+ private long startTime;
+
+ public ShuffleDetailInfo(int id, long startTime) {
+ this.id = id;
+ this.dataSize = new AtomicLong();
+ this.blockCount = new AtomicLong();
+ this.partitionCount = new AtomicLong();
+ this.startTime = startTime;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getBlockCount() {
+ return blockCount.get();
+ }
+
+ public long getDataSize() {
+ return dataSize.get();
+ }
+
+ public void incrDataSize(long size) {
+ dataSize.addAndGet(size);
+ }
+
+ public void incrBlockCount(long count) {
+ blockCount.addAndGet(count);
+ }
+
+ public void incrPartitionCount() {
+ partitionCount.addAndGet(1);
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "ShuffleDetail [%d: partitionCount=%s, blockCount=%s, size=%s,
startTime=%s]",
+ id,
+ partitionCount,
+ blockCount,
+ UnitConverter.formatSize(dataSize.get()),
+ DateFormatUtils.format(startTime, Constants.DATE_PATTERN));
+ }
+}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index 2cbbf59c2..d4e6eeb32 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.PartitionInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.util.JavaUtils;
+import org.apache.uniffle.common.util.UnitConverter;
/**
* ShuffleTaskInfo contains the information of submitting the shuffle, the
information of the cache
@@ -67,7 +68,10 @@ public class ShuffleTaskInfo {
private final AtomicReference<ShuffleSpecification> specification;
+ /** shuffleId -> partitionId -> block counter */
private final Map<Integer, Map<Integer, AtomicLong>> partitionBlockCounters;
+ /** shuffleId -> shuffleDetailInfo */
+ private final Map<Integer, ShuffleDetailInfo> shuffleDetailInfos;
private final Map<Integer, Integer> latestStageAttemptNumbers;
@@ -84,6 +88,7 @@ public class ShuffleTaskInfo {
this.specification = new AtomicReference<>();
this.partitionBlockCounters = JavaUtils.newConcurrentMap();
this.latestStageAttemptNumbers = JavaUtils.newConcurrentMap();
+ this.shuffleDetailInfos = JavaUtils.newConcurrentMap();
}
public Long getCurrentTimes() {
@@ -129,9 +134,18 @@ public class ShuffleTaskInfo {
public long addPartitionDataSize(int shuffleId, int partitionId, long delta)
{
totalDataSize.addAndGet(delta);
inMemoryDataSize.addAndGet(delta);
+ ShuffleDetailInfo shuffleDetailInfo =
+ shuffleDetailInfos.computeIfAbsent(
+ shuffleId, key -> new ShuffleDetailInfo(shuffleId,
System.currentTimeMillis()));
+ shuffleDetailInfo.incrDataSize(delta);
partitionDataSizes.computeIfAbsent(shuffleId, key ->
JavaUtils.newConcurrentMap());
Map<Integer, Long> partitions = partitionDataSizes.get(shuffleId);
- partitions.putIfAbsent(partitionId, 0L);
+ partitions.computeIfAbsent(
+ partitionId,
+ k -> {
+ shuffleDetailInfo.incrPartitionCount();
+ return 0L;
+ });
return partitions.computeIfPresent(
partitionId,
(k, v) -> {
@@ -235,6 +249,10 @@ public class ShuffleTaskInfo {
if (maxSizePartitionInfo.isCurrentPartition(shuffleId, partitionId)) {
maxSizePartitionInfo.setBlockCount(blockCount);
}
+ shuffleDetailInfos
+ .computeIfAbsent(
+ shuffleId, key -> new ShuffleDetailInfo(shuffleId,
System.currentTimeMillis()))
+ .incrBlockCount(delta);
}
public long getBlockNumber(int shuffleId, int partitionId) {
@@ -261,6 +279,10 @@ public class ShuffleTaskInfo {
return maxSizePartitionInfo;
}
+ public ShuffleDetailInfo getShuffleDetailInfo(int shuffleId) {
+ return shuffleDetailInfos.get(shuffleId);
+ }
+
@Override
public String toString() {
return "ShuffleTaskInfo{"
@@ -268,17 +290,17 @@ public class ShuffleTaskInfo {
+ appId
+ '\''
+ ", totalDataSize="
- + totalDataSize
+ + UnitConverter.formatSize(totalDataSize.get())
+ ", inMemoryDataSize="
- + inMemoryDataSize
+ + UnitConverter.formatSize(inMemoryDataSize.get())
+ ", onLocalFileDataSize="
- + onLocalFileDataSize
+ + UnitConverter.formatSize(onLocalFileDataSize.get())
+ ", onHadoopDataSize="
- + onHadoopDataSize
- + ", partitionDataSizes="
- + partitionDataSizes
+ + UnitConverter.formatSize(onHadoopDataSize.get())
+ ", maxSizePartitionInfo="
+ maxSizePartitionInfo
+ + ", shuffleDetailInfo="
+ + shuffleDetailInfos
+ '}';
}
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index e51ab5352..fa531ba02 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -72,6 +72,7 @@ import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.common.util.UnitConverter;
import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
import org.apache.uniffle.server.buffer.ShuffleBuffer;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
@@ -866,13 +867,6 @@ public class ShuffleTaskManager {
StringBuilder partitionInfoSummary = new StringBuilder();
partitionInfoSummary.append("appId: ").append(appId).append("\n");
for (int shuffleId : shuffleTaskInfo.getShuffleIds()) {
- Roaring64NavigableMap[] bitmaps =
partitionsToBlockIds.get(appId).get(shuffleId);
- long shuffleBlockCount = 0L;
- if (bitmaps != null) {
- for (Roaring64NavigableMap bitmap : bitmaps) {
- shuffleBlockCount += bitmap.getLongCardinality();
- }
- }
if
(conf.getBoolean(ShuffleServerConf.SERVER_LOG_APP_DETAIL_WHILE_REMOVE_ENABLED))
{
for (int partitionId : shuffleTaskInfo.getPartitionIds(shuffleId)) {
long partitionSize =
shuffleTaskInfo.getPartitionDataSize(shuffleId, partitionId);
@@ -883,21 +877,11 @@ public class ShuffleTaskManager {
shuffleId,
partitionId,
partitionBlockCount,
- partitionSize);
+ UnitConverter.formatSize(partitionSize));
}
}
- partitionInfoSummary
- .append(" shuffleId: ")
- .append(shuffleId)
- .append(" contains partition/block: ")
- .append(shuffleTaskInfo.getPartitionIds(shuffleId).size())
- .append("/")
- .append(shuffleBlockCount)
- .append("\n");
}
- partitionInfoSummary
- .append("The maxSizePartitionInfo: ")
- .append(shuffleTaskInfo.getMaxSizePartitionInfo());
+ partitionInfoSummary.append("The app task info:
").append(shuffleTaskInfo);
LOG.info("Removing app summary info: {}", partitionInfoSummary);
partitionsToBlockIds.remove(appId);