This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c636ac85 [#2460] feat(spark3): Add fine-grained shuffle write times 
with different parts (#2498)
6c636ac85 is described below

commit 6c636ac8535768d657356a8d31e4e5a18814292c
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Jun 12 14:54:02 2025 +0800

    [#2460] feat(spark3): Add fine-grained shuffle write times with different 
parts (#2498)
    
    ### What changes were proposed in this pull request?
    
     Add fine-grained shuffle write times with different parts
    
    ### Why are the changes needed?
    
    followup #2460 . After this, we could inspect the different parts duration 
ratio for the whole shuffle process.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Neen't
---
 .../spark/shuffle/events/ShuffleWriteTimes.java    | 85 ++++++++++++++++++++++
 .../shuffle/events/TaskShuffleWriteInfoEvent.java  | 12 ++-
 .../spark/shuffle/writer/WriteBufferManager.java   | 20 +++++
 .../shuffle/manager/ShuffleManagerGrpcService.java |  4 +-
 .../scala/org/apache/spark/UniffleListener.scala   |  7 +-
 .../org/apache/spark/UniffleStatusStore.scala      | 16 ++++
 .../scala/org/apache/spark/ui/ShufflePage.scala    | 53 ++++++++++++++
 .../spark/shuffle/writer/RssShuffleWriter.java     | 17 ++++-
 .../RssReportShuffleWriteMetricRequest.java        | 77 +++++++++++++++++++-
 proto/src/main/proto/Rss.proto                     | 11 +++
 10 files changed, 297 insertions(+), 5 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleWriteTimes.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleWriteTimes.java
new file mode 100644
index 000000000..0c12d04c8
--- /dev/null
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleWriteTimes.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.events;
+
+import org.apache.uniffle.proto.RssProtos;
+
+public class ShuffleWriteTimes {
+  private long total;
+
+  private long copy = 0;
+  private long serialize = 0;
+  private long compress = 0;
+  private long sort = 0;
+  private long requireMemory = 0;
+  private long waitFinish = 0;
+
+  public static ShuffleWriteTimes fromProto(RssProtos.ShuffleWriteTimes times) 
{
+    ShuffleWriteTimes writeTimes = new ShuffleWriteTimes();
+    writeTimes.copy = times.getCopy();
+    writeTimes.serialize = times.getSerialize();
+    writeTimes.compress = times.getCompress();
+    writeTimes.sort = times.getSort();
+    writeTimes.requireMemory = times.getRequireMemory();
+    writeTimes.waitFinish = times.getWaitFinish();
+    writeTimes.total = times.getTotal();
+    return writeTimes;
+  }
+
+  public long getTotal() {
+    return total;
+  }
+
+  public long getCopy() {
+    return copy;
+  }
+
+  public long getSerialize() {
+    return serialize;
+  }
+
+  public long getCompress() {
+    return compress;
+  }
+
+  public long getSort() {
+    return sort;
+  }
+
+  public long getRequireMemory() {
+    return requireMemory;
+  }
+
+  public long getWaitFinish() {
+    return waitFinish;
+  }
+
+  public void inc(ShuffleWriteTimes times) {
+    if (times == null) {
+      return;
+    }
+    total += times.getTotal();
+    copy += times.getCopy();
+    serialize += times.getSerialize();
+    compress += times.getCompress();
+    sort += times.getSort();
+    requireMemory += times.getRequireMemory();
+    waitFinish += times.getWaitFinish();
+    total += times.getTotal();
+  }
+}
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
index 6368566d1..aecfd9a5d 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
@@ -24,13 +24,19 @@ public class TaskShuffleWriteInfoEvent extends UniffleEvent 
{
   private int shuffleId;
   private long taskId;
   private Map<String, ShuffleWriteMetric> metrics;
+  private ShuffleWriteTimes writeTimes;
 
   public TaskShuffleWriteInfoEvent(
-      int stageId, int shuffleId, long taskId, Map<String, ShuffleWriteMetric> 
metrics) {
+      int stageId,
+      int shuffleId,
+      long taskId,
+      Map<String, ShuffleWriteMetric> metrics,
+      ShuffleWriteTimes writeTimes) {
     this.stageId = stageId;
     this.shuffleId = shuffleId;
     this.taskId = taskId;
     this.metrics = metrics;
+    this.writeTimes = writeTimes;
   }
 
   public int getStageId() {
@@ -48,4 +54,8 @@ public class TaskShuffleWriteInfoEvent extends UniffleEvent {
   public Map<String, ShuffleWriteMetric> getMetrics() {
     return metrics;
   }
+
+  public ShuffleWriteTimes getWriteTimes() {
+    return writeTimes;
+  }
 }
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index f2a980cb3..adfdaa8c7 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -645,6 +645,26 @@ public class WriteBufferManager extends MemoryConsumer {
     return writeTime;
   }
 
+  public long getCopyTime() {
+    return copyTime;
+  }
+
+  public long getSerializeTime() {
+    return serializeTime;
+  }
+
+  public long getCompressTime() {
+    return compressTime;
+  }
+
+  public long getSortTime() {
+    return sortTime;
+  }
+
+  public long getRequireMemoryTime() {
+    return requireMemoryTime;
+  }
+
   public String getManagerCostInfo() {
     return "WriteBufferManager cost copyTime["
         + copyTime
diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index 03998889f..db3a956cb 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -34,6 +34,7 @@ import org.apache.spark.SparkException;
 import org.apache.spark.shuffle.RssSparkShuffleUtils;
 import org.apache.spark.shuffle.events.ShuffleReadMetric;
 import org.apache.spark.shuffle.events.ShuffleWriteMetric;
+import org.apache.spark.shuffle.events.ShuffleWriteTimes;
 import org.apache.spark.shuffle.events.TaskShuffleReadInfoEvent;
 import org.apache.spark.shuffle.events.TaskShuffleWriteInfoEvent;
 import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
@@ -729,7 +730,8 @@ public class ShuffleManagerGrpcService extends 
ShuffleManagerImplBase {
             request.getMetricsMap().entrySet().stream()
                 .collect(
                     Collectors.toMap(
-                        Map.Entry::getKey, x -> 
ShuffleWriteMetric.from(x.getValue()))));
+                        Map.Entry::getKey, x -> 
ShuffleWriteMetric.from(x.getValue()))),
+            ShuffleWriteTimes.fromProto(request.getShuffleWriteTimes()));
     RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(event);
     RssProtos.ReportShuffleWriteMetricResponse reply =
         RssProtos.ReportShuffleWriteMetricResponse.newBuilder()
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
index f290bd09a..ae974982b 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
@@ -20,7 +20,7 @@ package org.apache.spark
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, 
SparkListenerJobEnd, SparkListenerJobStart, SparkListenerTaskEnd}
-import org.apache.spark.shuffle.events.{ShuffleAssignmentInfoEvent, 
TaskShuffleReadInfoEvent, TaskShuffleWriteInfoEvent}
+import org.apache.spark.shuffle.events.{ShuffleAssignmentInfoEvent, 
ShuffleWriteTimes, TaskShuffleReadInfoEvent, TaskShuffleWriteInfoEvent}
 import org.apache.spark.status.ElementTrackingStore
 
 import java.util.concurrent.ConcurrentHashMap
@@ -30,6 +30,7 @@ import scala.collection.JavaConverters.mapAsScalaMapConverter
 class UniffleListener(conf: SparkConf, kvstore: ElementTrackingStore)
   extends SparkListener with Logging {
 
+  private val aggregatedShuffleWriteTimes = new ShuffleWriteTimes()
   private val aggregatedShuffleWriteMetric = new ConcurrentHashMap[String, 
AggregatedShuffleWriteMetric]
   private val aggregatedShuffleReadMetric = new ConcurrentHashMap[String, 
AggregatedShuffleReadMetric]
 
@@ -55,6 +56,9 @@ class UniffleListener(conf: SparkConf, kvstore: 
ElementTrackingStore)
       kvstore.write(
         AggregatedTaskInfoUIData(totalTaskCpuTime.get(), 
totalShuffleWriteTime.get(), totalShuffleReadTime.get(), 
totalShuffleBytes.get())
       )
+      kvstore.write(
+        AggregatedShuffleWriteTimesUIData(aggregatedShuffleWriteTimes)
+      )
     }
   }
 
@@ -105,6 +109,7 @@ class UniffleListener(conf: SparkConf, kvstore: 
ElementTrackingStore)
         agg_metric.lastPushFailureReason = metric._2.getLastFailureReason
       }
     }
+    aggregatedShuffleWriteTimes.inc(event.getWriteTimes)
   }
 
   private def onTaskShuffleReadInfo(event: TaskShuffleReadInfoEvent): Unit = {
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
index 17063528f..5176048d4 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
@@ -18,6 +18,7 @@
 package org.apache.spark
 
 import com.fasterxml.jackson.annotation.JsonIgnore
+import org.apache.spark.shuffle.events.ShuffleWriteTimes
 import org.apache.spark.status.KVUtils.KVIndexParam
 import org.apache.spark.util.Utils
 import org.apache.spark.util.kvstore.{KVIndex, KVStore, KVStoreView}
@@ -48,6 +49,15 @@ class UniffleStatusStore(store: KVStore) {
     }
   }
 
+  def shuffleWriteTimes(): AggregatedShuffleWriteTimesUIData = {
+    val kClass = classOf[AggregatedShuffleWriteTimesUIData]
+    try {
+      store.read(kClass, kClass.getName)
+    } catch {
+      case _: NoSuchElementException => AggregatedShuffleWriteTimesUIData(null)
+    }
+  }
+
   def assignmentInfos(): Seq[ShuffleAssignmentUIData] = {
     viewToSeq(store.view(classOf[ShuffleAssignmentUIData]))
   }
@@ -136,4 +146,10 @@ case class AggregatedTaskInfoUIData(cpuTimeMillis: Long,
   @JsonIgnore
   @KVIndex
   def id: String = classOf[AggregatedTaskInfoUIData].getName()
+}
+
+case class AggregatedShuffleWriteTimesUIData(times: ShuffleWriteTimes) {
+  @JsonIgnore
+  @KVIndex
+  def id: String = classOf[AggregatedShuffleWriteTimesUIData].getName()
 }
\ No newline at end of file
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala 
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
index d3e811243..22422d423 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.ui
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.shuffle.events.ShuffleWriteTimes
 import org.apache.spark.util.Utils
 import org.apache.spark.{AggregatedShuffleMetric, AggregatedShuffleReadMetric, 
AggregatedShuffleWriteMetric, AggregatedTaskInfoUIData}
 
@@ -39,6 +40,16 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("") 
with Logging {
     </td>
   </tr>
 
+  private def shuffleWriteTimesRow(kv: Seq[String]) = <tr>
+    <td>{kv(0)}</td>
+    <td>{kv(1)}</td>
+    <td>{kv(2)}</td>
+    <td>{kv(3)}</td>
+    <td>{kv(4)}</td>
+    <td>{kv(5)}</td>
+    <td>{kv(6)}</td>
+  </tr>
+
   private def allServerRow(kv: (String, String, String, Double, Long, Long, 
String, String, String, Double)) = <tr>
     <td>{kv._1}</td>
     <td>{kv._2}</td>
@@ -115,6 +126,35 @@ class ShufflePage(parent: ShuffleTab) extends 
WebUIPage("") with Logging {
       fixedWidth = true
     )
 
+    // render shuffle write times
+    val writeTimes = 
Option(runtimeStatusStore.shuffleWriteTimes()).map(_.times).getOrElse(new 
ShuffleWriteTimes())
+    val total = if (writeTimes.getTotal <= 0) -1 else writeTimes.getTotal
+    val writeTimesUI = UIUtils.listingTable(
+      Seq("Total Time", "Wait Finish Time", "Copy Time", "Serialize Time", 
"Compress Time", "Sort Time", "Require Memory Time"),
+      shuffleWriteTimesRow,
+      Seq(
+        Seq(
+          UIUtils.formatDuration(writeTimes.getTotal),
+          UIUtils.formatDuration(writeTimes.getWaitFinish),
+          UIUtils.formatDuration(writeTimes.getCopy),
+          UIUtils.formatDuration(writeTimes.getSerialize),
+          UIUtils.formatDuration(writeTimes.getCompress),
+          UIUtils.formatDuration(writeTimes.getSort),
+          UIUtils.formatDuration(writeTimes.getRequireMemory),
+        ),
+        Seq(
+          1.toDouble,
+          writeTimes.getWaitFinish.toDouble / total,
+          writeTimes.getCopy.toDouble / total,
+          writeTimes.getSerialize.toDouble / total,
+          writeTimes.getCompress.toDouble / total,
+          writeTimes.getSort.toDouble / total,
+          writeTimes.getRequireMemory.toDouble / total,
+        ).map(x => roundToTwoDecimals(x).toString)
+      ),
+      fixedWidth = true
+    )
+
     // render shuffle-servers write+read statistics
     val shuffleWriteMetrics = 
shuffleSpeedStatistics(originWriteMetric.metrics.asScala.toSeq)
     val shuffleReadMetrics = 
shuffleSpeedStatistics(originReadMetric.metrics.asScala.toSeq)
@@ -297,6 +337,19 @@ class ShufflePage(parent: ShuffleTab) extends 
WebUIPage("") with Logging {
             {assignmentTableUI}
           </div>
         </div>
+
+        <div>
+          <span class="collapse-write-times-properties collapse-table"
+                onClick="collapseTable('collapse-write-times-properties', 
'write-times-table')">
+            <h4>
+              <span class="collapse-table-arrow arrow-closed"></span>
+              <a>Shuffle Write Times</a>
+            </h4>
+          </span>
+          <div class="write-times-table collapsible-table collapsed">
+            {writeTimesUI}
+          </div>
+        </div>
       </div>
     }
 
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index a4e538e00..396de414a 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -143,6 +143,9 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   private boolean enableWriteFailureRetry;
   private Set<ShuffleServerInfo> recordReportFailedShuffleservers;
 
+  private long totalShuffleWriteMills = 0L;
+  private long checkSendResultMills = 0L;
+
   // Only for tests
   @VisibleForTesting
   public RssShuffleWriter(
@@ -377,6 +380,8 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
       sendCommit();
     }
     long writeDurationMs = bufferManager.getWriteTime() + 
(System.currentTimeMillis() - start);
+    this.totalShuffleWriteMills = writeDurationMs;
+    this.checkSendResultMills = checkDuration;
     
shuffleWriteMetrics.incWriteTime(TimeUnit.MILLISECONDS.toNanos(writeDurationMs));
     LOG.info(
         "Finish write shuffle for appId["
@@ -930,13 +935,23 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
       if (managerClientSupplier != null) {
         ShuffleManagerClient shuffleManagerClient = 
managerClientSupplier.get();
         if (shuffleManagerClient != null) {
+          RssReportShuffleWriteMetricRequest.TaskShuffleWriteTimes writeTimes =
+              new RssReportShuffleWriteMetricRequest.TaskShuffleWriteTimes(
+                  totalShuffleWriteMills,
+                  bufferManager.getCopyTime(),
+                  bufferManager.getSerializeTime(),
+                  bufferManager.getCompressTime(),
+                  bufferManager.getSortTime(),
+                  bufferManager.getRequireMemoryTime(),
+                  checkSendResultMills);
           RssReportShuffleWriteMetricResponse response =
               shuffleManagerClient.reportShuffleWriteMetric(
                   new RssReportShuffleWriteMetricRequest(
                       taskContext.stageId(),
                       shuffleId,
                       taskContext.taskAttemptId(),
-                      
bufferManager.getShuffleServerPushCostTracker().toMetric()));
+                      
bufferManager.getShuffleServerPushCostTracker().toMetric(),
+                      writeTimes));
           if (response.getStatusCode() != StatusCode.SUCCESS) {
             LOG.error("Errors on reporting shuffle write metrics to driver");
           }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
index d07ac9536..612cc181e 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
@@ -28,13 +28,19 @@ public class RssReportShuffleWriteMetricRequest {
   private int shuffleId;
   private long taskId;
   private Map<String, TaskShuffleWriteMetric> metrics;
+  private TaskShuffleWriteTimes writeTimes;
 
   public RssReportShuffleWriteMetricRequest(
-      int stageId, int shuffleId, long taskId, Map<String, 
TaskShuffleWriteMetric> metrics) {
+      int stageId,
+      int shuffleId,
+      long taskId,
+      Map<String, TaskShuffleWriteMetric> metrics,
+      TaskShuffleWriteTimes writeTimes) {
     this.stageId = stageId;
     this.shuffleId = shuffleId;
     this.taskId = taskId;
     this.metrics = metrics;
+    this.writeTimes = writeTimes;
   }
 
   public RssProtos.ReportShuffleWriteMetricRequest toProto() {
@@ -45,6 +51,7 @@ public class RssReportShuffleWriteMetricRequest {
         .setShuffleId(request.shuffleId)
         .setStageId(request.stageId)
         .setTaskId(request.taskId)
+        .setShuffleWriteTimes(writeTimes.toProto())
         .putAllMetrics(
             request.metrics.entrySet().stream()
                 .collect(
@@ -64,6 +71,74 @@ public class RssReportShuffleWriteMetricRequest {
     return builder.build();
   }
 
+  public static class TaskShuffleWriteTimes {
+    private long totalTime;
+
+    private long copyTime = 0;
+    private long serializeTime = 0;
+    private long compressTime = 0;
+    private long sortTime = 0;
+    private long requireMemoryTime = 0;
+    private long waitFinishTime = 0;
+
+    public TaskShuffleWriteTimes(
+        long totalTime,
+        long copyTime,
+        long serializeTime,
+        long compressTime,
+        long sortTime,
+        long requireMemoryTime,
+        long waitFinishTime) {
+      this.totalTime = totalTime;
+      this.copyTime = copyTime;
+      this.serializeTime = serializeTime;
+      this.compressTime = compressTime;
+      this.sortTime = sortTime;
+      this.requireMemoryTime = requireMemoryTime;
+      this.waitFinishTime = waitFinishTime;
+    }
+
+    public long getTotalTime() {
+      return totalTime;
+    }
+
+    public long getCopyTime() {
+      return copyTime;
+    }
+
+    public long getSerializeTime() {
+      return serializeTime;
+    }
+
+    public long getCompressTime() {
+      return compressTime;
+    }
+
+    public long getSortTime() {
+      return sortTime;
+    }
+
+    public long getRequireMemoryTime() {
+      return requireMemoryTime;
+    }
+
+    public long getWaitFinishTime() {
+      return waitFinishTime;
+    }
+
+    public RssProtos.ShuffleWriteTimes toProto() {
+      return RssProtos.ShuffleWriteTimes.newBuilder()
+          .setTotal(this.totalTime)
+          .setCopy(this.copyTime)
+          .setSerialize(this.serializeTime)
+          .setCompress(this.compressTime)
+          .setSort(this.sortTime)
+          .setRequireMemory(this.requireMemoryTime)
+          .setWaitFinish(this.waitFinishTime)
+          .build();
+    }
+  }
+
   public static class TaskShuffleWriteMetric {
     private long durationMillis;
     private long byteSize;
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 0df3e0ffd..266b903a9 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -590,6 +590,17 @@ message ReportShuffleWriteMetricRequest {
   int32 stageId = 2;
   int64 taskId = 3;
   map<string, ShuffleWriteMetric> metrics = 4;
+  ShuffleWriteTimes shuffleWriteTimes = 5;
+}
+
+message ShuffleWriteTimes {
+  int64 total = 1;
+  int64 copy = 2;
+  int64 serialize = 3;
+  int64 compress = 4;
+  int64 sort = 5;
+  int64 requireMemory = 6;
+  int64 waitFinish = 7;
 }
 
 message ShuffleWriteMetric {

Reply via email to