This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 262519e5b [#2540] feat(spark): Show compression ratio into spark UI
tab (#2542)
262519e5b is described below
commit 262519e5b5e43231b815d8ebe100c1d4168f169f
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Jul 9 11:12:49 2025 +0800
[#2540] feat(spark): Show compression ratio into spark UI tab (#2542)
### What changes were proposed in this pull request?
Show compression ratio into spark UI tab
### Why are the changes needed?
for #2540
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Needn't
---
.../spark/shuffle/events/TaskShuffleWriteInfoEvent.java | 9 ++++++++-
.../org/apache/spark/shuffle/writer/WriteBufferManager.java | 4 ++++
.../uniffle/shuffle/manager/ShuffleManagerGrpcService.java | 3 ++-
.../src/main/scala/org/apache/spark/UniffleListener.scala | 9 ++++++++-
.../main/scala/org/apache/spark/UniffleStatusStore.scala | 5 +++--
.../src/main/scala/org/apache/spark/ui/ShufflePage.scala | 13 ++++++++++++-
.../org/apache/spark/shuffle/writer/RssShuffleWriter.java | 3 ++-
.../client/request/RssReportShuffleWriteMetricRequest.java | 7 ++++++-
proto/src/main/proto/Rss.proto | 1 +
9 files changed, 46 insertions(+), 8 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
index 760b63651..77fff3eda 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
@@ -27,6 +27,7 @@ public class TaskShuffleWriteInfoEvent extends UniffleEvent {
private ShuffleWriteTimes writeTimes;
private boolean isShuffleWriteFailed;
private String failureReason;
+ private long uncompressedByteSize;
public TaskShuffleWriteInfoEvent(
int stageId,
@@ -35,7 +36,8 @@ public class TaskShuffleWriteInfoEvent extends UniffleEvent {
Map<String, ShuffleWriteMetric> metrics,
ShuffleWriteTimes writeTimes,
boolean isShuffleWriteFailed,
- String failureReason) {
+ String failureReason,
+ long uncompressedByteSize) {
this.stageId = stageId;
this.shuffleId = shuffleId;
this.taskId = taskId;
@@ -43,6 +45,7 @@ public class TaskShuffleWriteInfoEvent extends UniffleEvent {
this.writeTimes = writeTimes;
this.isShuffleWriteFailed = isShuffleWriteFailed;
this.failureReason = failureReason;
+ this.uncompressedByteSize = uncompressedByteSize;
}
public int getStageId() {
@@ -72,4 +75,8 @@ public class TaskShuffleWriteInfoEvent extends UniffleEvent {
public String getFailureReason() {
return failureReason;
}
+
+ public long getUncompressedByteSize() {
+ return uncompressedByteSize;
+ }
}
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index 31d32e9b9..3da4b147b 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -796,4 +796,8 @@ public class WriteBufferManager extends MemoryConsumer {
LOG.error("Errors on closing buffer manager", e);
}
}
+
+ public long getUncompressedDataLen() {
+ return uncompressedDataLen;
+ }
}
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index 142d80280..b2ce7fbc9 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -733,7 +733,8 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
Map.Entry::getKey, x ->
ShuffleWriteMetric.from(x.getValue()))),
ShuffleWriteTimes.fromProto(request.getShuffleWriteTimes()),
request.getIsTaskWriteFailed(),
- request.getShuffleWriteFailureReason());
+ request.getShuffleWriteFailureReason(),
+ request.getUncompressedByteSize());
RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(event);
RssProtos.ReportShuffleWriteMetricResponse reply =
RssProtos.ReportShuffleWriteMetricResponse.newBuilder()
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
index 7175257ae..8f4aeb529 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
@@ -38,6 +38,7 @@ class UniffleListener(conf: SparkConf, kvstore:
ElementTrackingStore)
private val totalShuffleReadTime = new AtomicLong(0)
private val totalShuffleWriteTime = new AtomicLong(0)
private val totalShuffleBytes = new AtomicLong(0)
+ private val totalUncompressedShuffleBytes = new AtomicLong(0)
private val updateIntervalMillis = 5000
private var updateLastTimeMillis: Long = -1
@@ -54,7 +55,12 @@ class UniffleListener(conf: SparkConf, kvstore:
ElementTrackingStore)
new
AggregatedShuffleReadMetricsUIData(this.aggregatedShuffleReadMetric)
)
kvstore.write(
- AggregatedTaskInfoUIData(totalTaskCpuTime.get(),
totalShuffleWriteTime.get(), totalShuffleReadTime.get(),
totalShuffleBytes.get())
+ AggregatedTaskInfoUIData(
+ totalTaskCpuTime.get(),
+ totalShuffleWriteTime.get(),
+ totalShuffleReadTime.get(),
+ totalShuffleBytes.get(),
+ totalUncompressedShuffleBytes.get())
)
kvstore.write(
AggregatedShuffleWriteTimesUIData(aggregatedShuffleWriteTimes)
@@ -110,6 +116,7 @@ class UniffleListener(conf: SparkConf, kvstore:
ElementTrackingStore)
}
}
aggregatedShuffleWriteTimes.inc(event.getWriteTimes)
+ totalUncompressedShuffleBytes.addAndGet(event.getUncompressedByteSize)
}
private def onTaskShuffleReadInfo(event: TaskShuffleReadInfoEvent): Unit = {
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
index 2eabbd06a..e7aa3240f 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
@@ -87,7 +87,7 @@ class UniffleStatusStore(store: KVStore) {
try {
store.read(kClass, kClass.getName)
} catch {
- case _: Exception => AggregatedTaskInfoUIData(0, 0, 0, 0)
+ case _: Exception => AggregatedTaskInfoUIData(0, 0, 0, 0, 0)
}
}
@@ -151,7 +151,8 @@ class AggregatedShuffleReadMetric(durationMillis: Long,
case class AggregatedTaskInfoUIData(cpuTimeMillis: Long,
shuffleWriteMillis: Long,
shuffleReadMillis: Long,
- shuffleBytes: Long) {
+ shuffleBytes: Long,
+ uncompressedShuffleBytes: Long) {
@JsonIgnore
@KVIndex
def id: String = classOf[AggregatedTaskInfoUIData].getName()
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
index 25413d0cb..4efa5d151 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
@@ -99,7 +99,7 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("")
with Logging {
val aggTaskInfo = runtimeStatusStore.aggregatedTaskInfo
val taskInfo =
if (aggTaskInfo == null)
- AggregatedTaskInfoUIData(0, 0, 0, 0)
+ AggregatedTaskInfoUIData(0, 0, 0, 0, 0)
else
aggTaskInfo
val percent = {
@@ -109,6 +109,11 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
(taskInfo.shuffleWriteMillis + taskInfo.shuffleReadMillis).toDouble /
taskInfo.cpuTimeMillis
}
}
+ // compression ratio
+ val compressionRatio = if (taskInfo.shuffleBytes == 0) 0 else {
+ taskInfo.uncompressedShuffleBytes / taskInfo.shuffleBytes
+ }
+
// speed unit is MB/sec
val clientObservedWriteAvgSpeed = if (aggTaskInfo.shuffleWriteMillis == 0)
0 else {
roundToTwoDecimals(aggTaskInfo.shuffleBytes.toDouble /
aggTaskInfo.shuffleWriteMillis / 1000)
@@ -265,6 +270,12 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
</a>
{Utils.bytesToString(taskInfo.shuffleBytes)}
</li>
+ <li>
+ <a>
+ <strong>CompressionRatio: </strong>
+ </a>
+
{Utils.bytesToString(taskInfo.uncompressedShuffleBytes)}/{Utils.bytesToString(taskInfo.shuffleBytes)}={roundToTwoDecimals(compressionRatio)}
+ </li>
<li>
<a>
<strong>Shuffle Duration (write+read) / Task Duration:</strong>
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 761ca368b..e2c29b324 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -969,7 +969,8 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
bufferManager.getShuffleServerPushCostTracker().toMetric(),
writeTimes,
isShuffleWriteFailed,
- shuffleWriteFailureReason));
+ shuffleWriteFailureReason,
+ bufferManager.getUncompressedDataLen()));
if (response.getStatusCode() != StatusCode.SUCCESS) {
LOG.error("Errors on reporting shuffle write metrics to driver");
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
index 8867100de..5c3092ca1 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
@@ -33,6 +33,8 @@ public class RssReportShuffleWriteMetricRequest {
private boolean isShuffleWriteFailed;
private Optional<String> shuffleWriteFailureReason;
+ private long uncompressedByteSize;
+
public RssReportShuffleWriteMetricRequest(
int stageId,
int shuffleId,
@@ -40,7 +42,8 @@ public class RssReportShuffleWriteMetricRequest {
Map<String, TaskShuffleWriteMetric> metrics,
TaskShuffleWriteTimes writeTimes,
boolean isShuffleWriteFailed,
- Optional<String> shuffleWriteFailureReason) {
+ Optional<String> shuffleWriteFailureReason,
+ long uncompressedByteSize) {
this.stageId = stageId;
this.shuffleId = shuffleId;
this.taskId = taskId;
@@ -48,6 +51,7 @@ public class RssReportShuffleWriteMetricRequest {
this.writeTimes = writeTimes;
this.isShuffleWriteFailed = isShuffleWriteFailed;
this.shuffleWriteFailureReason = shuffleWriteFailureReason;
+ this.uncompressedByteSize = uncompressedByteSize;
}
public RssProtos.ReportShuffleWriteMetricRequest toProto() {
@@ -61,6 +65,7 @@ public class RssReportShuffleWriteMetricRequest {
.setShuffleWriteTimes(writeTimes.toProto())
.setIsTaskWriteFailed(isShuffleWriteFailed)
.setShuffleWriteFailureReason(shuffleWriteFailureReason.orElse(""))
+ .setUncompressedByteSize(request.uncompressedByteSize)
.putAllMetrics(
request.metrics.entrySet().stream()
.collect(
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 9893fe997..be1589952 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -593,6 +593,7 @@ message ReportShuffleWriteMetricRequest {
ShuffleWriteTimes shuffleWriteTimes = 5;
bool isTaskWriteFailed = 6;
string shuffleWriteFailureReason = 7;
+ int64 uncompressedByteSize = 8;
}
message ShuffleWriteTimes {