This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 6c636ac85 [#2460] feat(spark3): Add fine-grained shuffle write times
with different parts (#2498)
6c636ac85 is described below
commit 6c636ac8535768d657356a8d31e4e5a18814292c
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Jun 12 14:54:02 2025 +0800
[#2460] feat(spark3): Add fine-grained shuffle write times with different
parts (#2498)
### What changes were proposed in this pull request?
Add fine-grained shuffle write times with different parts
### Why are the changes needed?
followup #2460 . After this, we could inspect the different parts duration
ratio for the whole shuffle process.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Neen't
---
.../spark/shuffle/events/ShuffleWriteTimes.java | 85 ++++++++++++++++++++++
.../shuffle/events/TaskShuffleWriteInfoEvent.java | 12 ++-
.../spark/shuffle/writer/WriteBufferManager.java | 20 +++++
.../shuffle/manager/ShuffleManagerGrpcService.java | 4 +-
.../scala/org/apache/spark/UniffleListener.scala | 7 +-
.../org/apache/spark/UniffleStatusStore.scala | 16 ++++
.../scala/org/apache/spark/ui/ShufflePage.scala | 53 ++++++++++++++
.../spark/shuffle/writer/RssShuffleWriter.java | 17 ++++-
.../RssReportShuffleWriteMetricRequest.java | 77 +++++++++++++++++++-
proto/src/main/proto/Rss.proto | 11 +++
10 files changed, 297 insertions(+), 5 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleWriteTimes.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleWriteTimes.java
new file mode 100644
index 000000000..0c12d04c8
--- /dev/null
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleWriteTimes.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.events;
+
+import org.apache.uniffle.proto.RssProtos;
+
+public class ShuffleWriteTimes {
+ private long total;
+
+ private long copy = 0;
+ private long serialize = 0;
+ private long compress = 0;
+ private long sort = 0;
+ private long requireMemory = 0;
+ private long waitFinish = 0;
+
+ public static ShuffleWriteTimes fromProto(RssProtos.ShuffleWriteTimes times)
{
+ ShuffleWriteTimes writeTimes = new ShuffleWriteTimes();
+ writeTimes.copy = times.getCopy();
+ writeTimes.serialize = times.getSerialize();
+ writeTimes.compress = times.getCompress();
+ writeTimes.sort = times.getSort();
+ writeTimes.requireMemory = times.getRequireMemory();
+ writeTimes.waitFinish = times.getWaitFinish();
+ writeTimes.total = times.getTotal();
+ return writeTimes;
+ }
+
+ public long getTotal() {
+ return total;
+ }
+
+ public long getCopy() {
+ return copy;
+ }
+
+ public long getSerialize() {
+ return serialize;
+ }
+
+ public long getCompress() {
+ return compress;
+ }
+
+ public long getSort() {
+ return sort;
+ }
+
+ public long getRequireMemory() {
+ return requireMemory;
+ }
+
+ public long getWaitFinish() {
+ return waitFinish;
+ }
+
+ public void inc(ShuffleWriteTimes times) {
+ if (times == null) {
+ return;
+ }
+ total += times.getTotal();
+ copy += times.getCopy();
+ serialize += times.getSerialize();
+ compress += times.getCompress();
+ sort += times.getSort();
+ requireMemory += times.getRequireMemory();
+ waitFinish += times.getWaitFinish();
+ total += times.getTotal();
+ }
+}
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
index 6368566d1..aecfd9a5d 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
@@ -24,13 +24,19 @@ public class TaskShuffleWriteInfoEvent extends UniffleEvent
{
private int shuffleId;
private long taskId;
private Map<String, ShuffleWriteMetric> metrics;
+ private ShuffleWriteTimes writeTimes;
public TaskShuffleWriteInfoEvent(
- int stageId, int shuffleId, long taskId, Map<String, ShuffleWriteMetric>
metrics) {
+ int stageId,
+ int shuffleId,
+ long taskId,
+ Map<String, ShuffleWriteMetric> metrics,
+ ShuffleWriteTimes writeTimes) {
this.stageId = stageId;
this.shuffleId = shuffleId;
this.taskId = taskId;
this.metrics = metrics;
+ this.writeTimes = writeTimes;
}
public int getStageId() {
@@ -48,4 +54,8 @@ public class TaskShuffleWriteInfoEvent extends UniffleEvent {
public Map<String, ShuffleWriteMetric> getMetrics() {
return metrics;
}
+
+ public ShuffleWriteTimes getWriteTimes() {
+ return writeTimes;
+ }
}
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index f2a980cb3..adfdaa8c7 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -645,6 +645,26 @@ public class WriteBufferManager extends MemoryConsumer {
return writeTime;
}
+ public long getCopyTime() {
+ return copyTime;
+ }
+
+ public long getSerializeTime() {
+ return serializeTime;
+ }
+
+ public long getCompressTime() {
+ return compressTime;
+ }
+
+ public long getSortTime() {
+ return sortTime;
+ }
+
+ public long getRequireMemoryTime() {
+ return requireMemoryTime;
+ }
+
public String getManagerCostInfo() {
return "WriteBufferManager cost copyTime["
+ copyTime
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index 03998889f..db3a956cb 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -34,6 +34,7 @@ import org.apache.spark.SparkException;
import org.apache.spark.shuffle.RssSparkShuffleUtils;
import org.apache.spark.shuffle.events.ShuffleReadMetric;
import org.apache.spark.shuffle.events.ShuffleWriteMetric;
+import org.apache.spark.shuffle.events.ShuffleWriteTimes;
import org.apache.spark.shuffle.events.TaskShuffleReadInfoEvent;
import org.apache.spark.shuffle.events.TaskShuffleWriteInfoEvent;
import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
@@ -729,7 +730,8 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
request.getMetricsMap().entrySet().stream()
.collect(
Collectors.toMap(
- Map.Entry::getKey, x ->
ShuffleWriteMetric.from(x.getValue()))));
+ Map.Entry::getKey, x ->
ShuffleWriteMetric.from(x.getValue()))),
+ ShuffleWriteTimes.fromProto(request.getShuffleWriteTimes()));
RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(event);
RssProtos.ReportShuffleWriteMetricResponse reply =
RssProtos.ReportShuffleWriteMetricResponse.newBuilder()
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
index f290bd09a..ae974982b 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
@@ -20,7 +20,7 @@ package org.apache.spark
import org.apache.commons.lang3.StringUtils
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent,
SparkListenerJobEnd, SparkListenerJobStart, SparkListenerTaskEnd}
-import org.apache.spark.shuffle.events.{ShuffleAssignmentInfoEvent,
TaskShuffleReadInfoEvent, TaskShuffleWriteInfoEvent}
+import org.apache.spark.shuffle.events.{ShuffleAssignmentInfoEvent,
ShuffleWriteTimes, TaskShuffleReadInfoEvent, TaskShuffleWriteInfoEvent}
import org.apache.spark.status.ElementTrackingStore
import java.util.concurrent.ConcurrentHashMap
@@ -30,6 +30,7 @@ import scala.collection.JavaConverters.mapAsScalaMapConverter
class UniffleListener(conf: SparkConf, kvstore: ElementTrackingStore)
extends SparkListener with Logging {
+ private val aggregatedShuffleWriteTimes = new ShuffleWriteTimes()
private val aggregatedShuffleWriteMetric = new ConcurrentHashMap[String,
AggregatedShuffleWriteMetric]
private val aggregatedShuffleReadMetric = new ConcurrentHashMap[String,
AggregatedShuffleReadMetric]
@@ -55,6 +56,9 @@ class UniffleListener(conf: SparkConf, kvstore:
ElementTrackingStore)
kvstore.write(
AggregatedTaskInfoUIData(totalTaskCpuTime.get(),
totalShuffleWriteTime.get(), totalShuffleReadTime.get(),
totalShuffleBytes.get())
)
+ kvstore.write(
+ AggregatedShuffleWriteTimesUIData(aggregatedShuffleWriteTimes)
+ )
}
}
@@ -105,6 +109,7 @@ class UniffleListener(conf: SparkConf, kvstore:
ElementTrackingStore)
agg_metric.lastPushFailureReason = metric._2.getLastFailureReason
}
}
+ aggregatedShuffleWriteTimes.inc(event.getWriteTimes)
}
private def onTaskShuffleReadInfo(event: TaskShuffleReadInfoEvent): Unit = {
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
index 17063528f..5176048d4 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
@@ -18,6 +18,7 @@
package org.apache.spark
import com.fasterxml.jackson.annotation.JsonIgnore
+import org.apache.spark.shuffle.events.ShuffleWriteTimes
import org.apache.spark.status.KVUtils.KVIndexParam
import org.apache.spark.util.Utils
import org.apache.spark.util.kvstore.{KVIndex, KVStore, KVStoreView}
@@ -48,6 +49,15 @@ class UniffleStatusStore(store: KVStore) {
}
}
+ def shuffleWriteTimes(): AggregatedShuffleWriteTimesUIData = {
+ val kClass = classOf[AggregatedShuffleWriteTimesUIData]
+ try {
+ store.read(kClass, kClass.getName)
+ } catch {
+ case _: NoSuchElementException => AggregatedShuffleWriteTimesUIData(null)
+ }
+ }
+
def assignmentInfos(): Seq[ShuffleAssignmentUIData] = {
viewToSeq(store.view(classOf[ShuffleAssignmentUIData]))
}
@@ -136,4 +146,10 @@ case class AggregatedTaskInfoUIData(cpuTimeMillis: Long,
@JsonIgnore
@KVIndex
def id: String = classOf[AggregatedTaskInfoUIData].getName()
+}
+
+case class AggregatedShuffleWriteTimesUIData(times: ShuffleWriteTimes) {
+ @JsonIgnore
+ @KVIndex
+ def id: String = classOf[AggregatedShuffleWriteTimesUIData].getName()
}
\ No newline at end of file
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
index d3e811243..22422d423 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
@@ -18,6 +18,7 @@
package org.apache.spark.ui
import org.apache.spark.internal.Logging
+import org.apache.spark.shuffle.events.ShuffleWriteTimes
import org.apache.spark.util.Utils
import org.apache.spark.{AggregatedShuffleMetric, AggregatedShuffleReadMetric,
AggregatedShuffleWriteMetric, AggregatedTaskInfoUIData}
@@ -39,6 +40,16 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("")
with Logging {
</td>
</tr>
+ private def shuffleWriteTimesRow(kv: Seq[String]) = <tr>
+ <td>{kv(0)}</td>
+ <td>{kv(1)}</td>
+ <td>{kv(2)}</td>
+ <td>{kv(3)}</td>
+ <td>{kv(4)}</td>
+ <td>{kv(5)}</td>
+ <td>{kv(6)}</td>
+ </tr>
+
private def allServerRow(kv: (String, String, String, Double, Long, Long,
String, String, String, Double)) = <tr>
<td>{kv._1}</td>
<td>{kv._2}</td>
@@ -115,6 +126,35 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
fixedWidth = true
)
+ // render shuffle write times
+ val writeTimes =
Option(runtimeStatusStore.shuffleWriteTimes()).map(_.times).getOrElse(new
ShuffleWriteTimes())
+ val total = if (writeTimes.getTotal <= 0) -1 else writeTimes.getTotal
+ val writeTimesUI = UIUtils.listingTable(
+ Seq("Total Time", "Wait Finish Time", "Copy Time", "Serialize Time",
"Compress Time", "Sort Time", "Require Memory Time"),
+ shuffleWriteTimesRow,
+ Seq(
+ Seq(
+ UIUtils.formatDuration(writeTimes.getTotal),
+ UIUtils.formatDuration(writeTimes.getWaitFinish),
+ UIUtils.formatDuration(writeTimes.getCopy),
+ UIUtils.formatDuration(writeTimes.getSerialize),
+ UIUtils.formatDuration(writeTimes.getCompress),
+ UIUtils.formatDuration(writeTimes.getSort),
+ UIUtils.formatDuration(writeTimes.getRequireMemory),
+ ),
+ Seq(
+ 1.toDouble,
+ writeTimes.getWaitFinish.toDouble / total,
+ writeTimes.getCopy.toDouble / total,
+ writeTimes.getSerialize.toDouble / total,
+ writeTimes.getCompress.toDouble / total,
+ writeTimes.getSort.toDouble / total,
+ writeTimes.getRequireMemory.toDouble / total,
+ ).map(x => roundToTwoDecimals(x).toString)
+ ),
+ fixedWidth = true
+ )
+
// render shuffle-servers write+read statistics
val shuffleWriteMetrics =
shuffleSpeedStatistics(originWriteMetric.metrics.asScala.toSeq)
val shuffleReadMetrics =
shuffleSpeedStatistics(originReadMetric.metrics.asScala.toSeq)
@@ -297,6 +337,19 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
{assignmentTableUI}
</div>
</div>
+
+ <div>
+ <span class="collapse-write-times-properties collapse-table"
+ onClick="collapseTable('collapse-write-times-properties',
'write-times-table')">
+ <h4>
+ <span class="collapse-table-arrow arrow-closed"></span>
+ <a>Shuffle Write Times</a>
+ </h4>
+ </span>
+ <div class="write-times-table collapsible-table collapsed">
+ {writeTimesUI}
+ </div>
+ </div>
</div>
}
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index a4e538e00..396de414a 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -143,6 +143,9 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
private boolean enableWriteFailureRetry;
private Set<ShuffleServerInfo> recordReportFailedShuffleservers;
+ private long totalShuffleWriteMills = 0L;
+ private long checkSendResultMills = 0L;
+
// Only for tests
@VisibleForTesting
public RssShuffleWriter(
@@ -377,6 +380,8 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
sendCommit();
}
long writeDurationMs = bufferManager.getWriteTime() +
(System.currentTimeMillis() - start);
+ this.totalShuffleWriteMills = writeDurationMs;
+ this.checkSendResultMills = checkDuration;
shuffleWriteMetrics.incWriteTime(TimeUnit.MILLISECONDS.toNanos(writeDurationMs));
LOG.info(
"Finish write shuffle for appId["
@@ -930,13 +935,23 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
if (managerClientSupplier != null) {
ShuffleManagerClient shuffleManagerClient =
managerClientSupplier.get();
if (shuffleManagerClient != null) {
+ RssReportShuffleWriteMetricRequest.TaskShuffleWriteTimes writeTimes =
+ new RssReportShuffleWriteMetricRequest.TaskShuffleWriteTimes(
+ totalShuffleWriteMills,
+ bufferManager.getCopyTime(),
+ bufferManager.getSerializeTime(),
+ bufferManager.getCompressTime(),
+ bufferManager.getSortTime(),
+ bufferManager.getRequireMemoryTime(),
+ checkSendResultMills);
RssReportShuffleWriteMetricResponse response =
shuffleManagerClient.reportShuffleWriteMetric(
new RssReportShuffleWriteMetricRequest(
taskContext.stageId(),
shuffleId,
taskContext.taskAttemptId(),
-
bufferManager.getShuffleServerPushCostTracker().toMetric()));
+
bufferManager.getShuffleServerPushCostTracker().toMetric(),
+ writeTimes));
if (response.getStatusCode() != StatusCode.SUCCESS) {
LOG.error("Errors on reporting shuffle write metrics to driver");
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
index d07ac9536..612cc181e 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
@@ -28,13 +28,19 @@ public class RssReportShuffleWriteMetricRequest {
private int shuffleId;
private long taskId;
private Map<String, TaskShuffleWriteMetric> metrics;
+ private TaskShuffleWriteTimes writeTimes;
public RssReportShuffleWriteMetricRequest(
- int stageId, int shuffleId, long taskId, Map<String,
TaskShuffleWriteMetric> metrics) {
+ int stageId,
+ int shuffleId,
+ long taskId,
+ Map<String, TaskShuffleWriteMetric> metrics,
+ TaskShuffleWriteTimes writeTimes) {
this.stageId = stageId;
this.shuffleId = shuffleId;
this.taskId = taskId;
this.metrics = metrics;
+ this.writeTimes = writeTimes;
}
public RssProtos.ReportShuffleWriteMetricRequest toProto() {
@@ -45,6 +51,7 @@ public class RssReportShuffleWriteMetricRequest {
.setShuffleId(request.shuffleId)
.setStageId(request.stageId)
.setTaskId(request.taskId)
+ .setShuffleWriteTimes(writeTimes.toProto())
.putAllMetrics(
request.metrics.entrySet().stream()
.collect(
@@ -64,6 +71,74 @@ public class RssReportShuffleWriteMetricRequest {
return builder.build();
}
+ public static class TaskShuffleWriteTimes {
+ private long totalTime;
+
+ private long copyTime = 0;
+ private long serializeTime = 0;
+ private long compressTime = 0;
+ private long sortTime = 0;
+ private long requireMemoryTime = 0;
+ private long waitFinishTime = 0;
+
+ public TaskShuffleWriteTimes(
+ long totalTime,
+ long copyTime,
+ long serializeTime,
+ long compressTime,
+ long sortTime,
+ long requireMemoryTime,
+ long waitFinishTime) {
+ this.totalTime = totalTime;
+ this.copyTime = copyTime;
+ this.serializeTime = serializeTime;
+ this.compressTime = compressTime;
+ this.sortTime = sortTime;
+ this.requireMemoryTime = requireMemoryTime;
+ this.waitFinishTime = waitFinishTime;
+ }
+
+ public long getTotalTime() {
+ return totalTime;
+ }
+
+ public long getCopyTime() {
+ return copyTime;
+ }
+
+ public long getSerializeTime() {
+ return serializeTime;
+ }
+
+ public long getCompressTime() {
+ return compressTime;
+ }
+
+ public long getSortTime() {
+ return sortTime;
+ }
+
+ public long getRequireMemoryTime() {
+ return requireMemoryTime;
+ }
+
+ public long getWaitFinishTime() {
+ return waitFinishTime;
+ }
+
+ public RssProtos.ShuffleWriteTimes toProto() {
+ return RssProtos.ShuffleWriteTimes.newBuilder()
+ .setTotal(this.totalTime)
+ .setCopy(this.copyTime)
+ .setSerialize(this.serializeTime)
+ .setCompress(this.compressTime)
+ .setSort(this.sortTime)
+ .setRequireMemory(this.requireMemoryTime)
+ .setWaitFinish(this.waitFinishTime)
+ .build();
+ }
+ }
+
public static class TaskShuffleWriteMetric {
private long durationMillis;
private long byteSize;
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 0df3e0ffd..266b903a9 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -590,6 +590,17 @@ message ReportShuffleWriteMetricRequest {
int32 stageId = 2;
int64 taskId = 3;
map<string, ShuffleWriteMetric> metrics = 4;
+ ShuffleWriteTimes shuffleWriteTimes = 5;
+}
+
+message ShuffleWriteTimes {
+ int64 total = 1;
+ int64 copy = 2;
+ int64 serialize = 3;
+ int64 compress = 4;
+ int64 sort = 5;
+ int64 requireMemory = 6;
+ int64 waitFinish = 7;
}
message ShuffleWriteMetric {