This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 262519e5b [#2540] feat(spark): Show compression ratio into spark UI 
tab (#2542)
262519e5b is described below

commit 262519e5b5e43231b815d8ebe100c1d4168f169f
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Jul 9 11:12:49 2025 +0800

    [#2540] feat(spark): Show compression ratio into spark UI tab (#2542)
    
    ### What changes were proposed in this pull request?
    
    Show compression ratio into spark UI tab
    
    ### Why are the changes needed?
    
    for #2540
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Needn't
---
 .../spark/shuffle/events/TaskShuffleWriteInfoEvent.java     |  9 ++++++++-
 .../org/apache/spark/shuffle/writer/WriteBufferManager.java |  4 ++++
 .../uniffle/shuffle/manager/ShuffleManagerGrpcService.java  |  3 ++-
 .../src/main/scala/org/apache/spark/UniffleListener.scala   |  9 ++++++++-
 .../main/scala/org/apache/spark/UniffleStatusStore.scala    |  5 +++--
 .../src/main/scala/org/apache/spark/ui/ShufflePage.scala    | 13 ++++++++++++-
 .../org/apache/spark/shuffle/writer/RssShuffleWriter.java   |  3 ++-
 .../client/request/RssReportShuffleWriteMetricRequest.java  |  7 ++++++-
 proto/src/main/proto/Rss.proto                              |  1 +
 9 files changed, 46 insertions(+), 8 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
index 760b63651..77fff3eda 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
@@ -27,6 +27,7 @@ public class TaskShuffleWriteInfoEvent extends UniffleEvent {
   private ShuffleWriteTimes writeTimes;
   private boolean isShuffleWriteFailed;
   private String failureReason;
+  private long uncompressedByteSize;
 
   public TaskShuffleWriteInfoEvent(
       int stageId,
@@ -35,7 +36,8 @@ public class TaskShuffleWriteInfoEvent extends UniffleEvent {
       Map<String, ShuffleWriteMetric> metrics,
       ShuffleWriteTimes writeTimes,
       boolean isShuffleWriteFailed,
-      String failureReason) {
+      String failureReason,
+      long uncompressedByteSize) {
     this.stageId = stageId;
     this.shuffleId = shuffleId;
     this.taskId = taskId;
@@ -43,6 +45,7 @@ public class TaskShuffleWriteInfoEvent extends UniffleEvent {
     this.writeTimes = writeTimes;
     this.isShuffleWriteFailed = isShuffleWriteFailed;
     this.failureReason = failureReason;
+    this.uncompressedByteSize = uncompressedByteSize;
   }
 
   public int getStageId() {
@@ -72,4 +75,8 @@ public class TaskShuffleWriteInfoEvent extends UniffleEvent {
   public String getFailureReason() {
     return failureReason;
   }
+
+  public long getUncompressedByteSize() {
+    return uncompressedByteSize;
+  }
 }
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index 31d32e9b9..3da4b147b 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -796,4 +796,8 @@ public class WriteBufferManager extends MemoryConsumer {
       LOG.error("Errors on closing buffer manager", e);
     }
   }
+
+  public long getUncompressedDataLen() {
+    return uncompressedDataLen;
+  }
 }
diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index 142d80280..b2ce7fbc9 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -733,7 +733,8 @@ public class ShuffleManagerGrpcService extends 
ShuffleManagerImplBase {
                         Map.Entry::getKey, x -> 
ShuffleWriteMetric.from(x.getValue()))),
             ShuffleWriteTimes.fromProto(request.getShuffleWriteTimes()),
             request.getIsTaskWriteFailed(),
-            request.getShuffleWriteFailureReason());
+            request.getShuffleWriteFailureReason(),
+            request.getUncompressedByteSize());
     RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(event);
     RssProtos.ReportShuffleWriteMetricResponse reply =
         RssProtos.ReportShuffleWriteMetricResponse.newBuilder()
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
index 7175257ae..8f4aeb529 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
@@ -38,6 +38,7 @@ class UniffleListener(conf: SparkConf, kvstore: 
ElementTrackingStore)
   private val totalShuffleReadTime = new AtomicLong(0)
   private val totalShuffleWriteTime = new AtomicLong(0)
   private val totalShuffleBytes = new AtomicLong(0)
+  private val totalUncompressedShuffleBytes = new AtomicLong(0)
 
   private val updateIntervalMillis = 5000
   private var updateLastTimeMillis: Long = -1
@@ -54,7 +55,12 @@ class UniffleListener(conf: SparkConf, kvstore: 
ElementTrackingStore)
         new 
AggregatedShuffleReadMetricsUIData(this.aggregatedShuffleReadMetric)
       )
       kvstore.write(
-        AggregatedTaskInfoUIData(totalTaskCpuTime.get(), 
totalShuffleWriteTime.get(), totalShuffleReadTime.get(), 
totalShuffleBytes.get())
+        AggregatedTaskInfoUIData(
+          totalTaskCpuTime.get(),
+          totalShuffleWriteTime.get(),
+          totalShuffleReadTime.get(),
+          totalShuffleBytes.get(),
+          totalUncompressedShuffleBytes.get())
       )
       kvstore.write(
         AggregatedShuffleWriteTimesUIData(aggregatedShuffleWriteTimes)
@@ -110,6 +116,7 @@ class UniffleListener(conf: SparkConf, kvstore: 
ElementTrackingStore)
       }
     }
     aggregatedShuffleWriteTimes.inc(event.getWriteTimes)
+    totalUncompressedShuffleBytes.addAndGet(event.getUncompressedByteSize)
   }
 
   private def onTaskShuffleReadInfo(event: TaskShuffleReadInfoEvent): Unit = {
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
index 2eabbd06a..e7aa3240f 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
@@ -87,7 +87,7 @@ class UniffleStatusStore(store: KVStore) {
     try {
       store.read(kClass, kClass.getName)
     } catch {
-      case _: Exception => AggregatedTaskInfoUIData(0, 0, 0, 0)
+      case _: Exception => AggregatedTaskInfoUIData(0, 0, 0, 0, 0)
     }
   }
 
@@ -151,7 +151,8 @@ class AggregatedShuffleReadMetric(durationMillis: Long,
 case class AggregatedTaskInfoUIData(cpuTimeMillis: Long,
                                     shuffleWriteMillis: Long,
                                     shuffleReadMillis: Long,
-                                    shuffleBytes: Long) {
+                                    shuffleBytes: Long,
+                                    uncompressedShuffleBytes: Long) {
   @JsonIgnore
   @KVIndex
   def id: String = classOf[AggregatedTaskInfoUIData].getName()
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala 
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
index 25413d0cb..4efa5d151 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
@@ -99,7 +99,7 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("") 
with Logging {
     val aggTaskInfo = runtimeStatusStore.aggregatedTaskInfo
     val taskInfo =
       if (aggTaskInfo == null)
-        AggregatedTaskInfoUIData(0, 0, 0, 0)
+        AggregatedTaskInfoUIData(0, 0, 0, 0, 0)
       else
         aggTaskInfo
     val percent = {
@@ -109,6 +109,11 @@ class ShufflePage(parent: ShuffleTab) extends 
WebUIPage("") with Logging {
         (taskInfo.shuffleWriteMillis + taskInfo.shuffleReadMillis).toDouble / 
taskInfo.cpuTimeMillis
       }
     }
+    // compression ratio
+    val compressionRatio = if (taskInfo.shuffleBytes == 0) 0 else {
+      taskInfo.uncompressedShuffleBytes / taskInfo.shuffleBytes
+    }
+
     // speed unit is MB/sec
     val clientObservedWriteAvgSpeed = if (aggTaskInfo.shuffleWriteMillis == 0) 
0 else {
       roundToTwoDecimals(aggTaskInfo.shuffleBytes.toDouble / 
aggTaskInfo.shuffleWriteMillis / 1000)
@@ -265,6 +270,12 @@ class ShufflePage(parent: ShuffleTab) extends 
WebUIPage("") with Logging {
               </a>
               {Utils.bytesToString(taskInfo.shuffleBytes)}
             </li>
+            <li>
+              <a>
+                <strong>CompressionRatio: </strong>
+              </a>
+              
{Utils.bytesToString(taskInfo.uncompressedShuffleBytes)}/{Utils.bytesToString(taskInfo.shuffleBytes)}={roundToTwoDecimals(compressionRatio)}
+            </li>
             <li>
               <a>
                 <strong>Shuffle Duration (write+read) / Task Duration:</strong>
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 761ca368b..e2c29b324 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -969,7 +969,8 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
                       
bufferManager.getShuffleServerPushCostTracker().toMetric(),
                       writeTimes,
                       isShuffleWriteFailed,
-                      shuffleWriteFailureReason));
+                      shuffleWriteFailureReason,
+                      bufferManager.getUncompressedDataLen()));
           if (response.getStatusCode() != StatusCode.SUCCESS) {
             LOG.error("Errors on reporting shuffle write metrics to driver");
           }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
index 8867100de..5c3092ca1 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
@@ -33,6 +33,8 @@ public class RssReportShuffleWriteMetricRequest {
   private boolean isShuffleWriteFailed;
   private Optional<String> shuffleWriteFailureReason;
 
+  private long uncompressedByteSize;
+
   public RssReportShuffleWriteMetricRequest(
       int stageId,
       int shuffleId,
@@ -40,7 +42,8 @@ public class RssReportShuffleWriteMetricRequest {
       Map<String, TaskShuffleWriteMetric> metrics,
       TaskShuffleWriteTimes writeTimes,
       boolean isShuffleWriteFailed,
-      Optional<String> shuffleWriteFailureReason) {
+      Optional<String> shuffleWriteFailureReason,
+      long uncompressedByteSize) {
     this.stageId = stageId;
     this.shuffleId = shuffleId;
     this.taskId = taskId;
@@ -48,6 +51,7 @@ public class RssReportShuffleWriteMetricRequest {
     this.writeTimes = writeTimes;
     this.isShuffleWriteFailed = isShuffleWriteFailed;
     this.shuffleWriteFailureReason = shuffleWriteFailureReason;
+    this.uncompressedByteSize = uncompressedByteSize;
   }
 
   public RssProtos.ReportShuffleWriteMetricRequest toProto() {
@@ -61,6 +65,7 @@ public class RssReportShuffleWriteMetricRequest {
         .setShuffleWriteTimes(writeTimes.toProto())
         .setIsTaskWriteFailed(isShuffleWriteFailed)
         .setShuffleWriteFailureReason(shuffleWriteFailureReason.orElse(""))
+        .setUncompressedByteSize(request.uncompressedByteSize)
         .putAllMetrics(
             request.metrics.entrySet().stream()
                 .collect(
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 9893fe997..be1589952 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -593,6 +593,7 @@ message ReportShuffleWriteMetricRequest {
   ShuffleWriteTimes shuffleWriteTimes = 5;
   bool isTaskWriteFailed = 6;
   string shuffleWriteFailureReason = 7;
+  int64 uncompressedByteSize = 8;
 }
 
 message ShuffleWriteTimes {

Reply via email to