This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 2a32171b9 [#2569] feat(spark): Add statistic of shuffle read times
(#2598)
2a32171b9 is described below
commit 2a32171b9724272276797ede17eb057027df5767
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Sep 4 17:26:41 2025 +0800
[#2569] feat(spark): Add statistic of shuffle read times (#2598)
### What changes were proposed in this pull request?
Add statistic of shuffle read times to find the bottleneck for shuffle
reading
### Why are the changes needed?
for #2569
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Tests in cluster
---
.../hadoop/mapreduce/task/reduce/FetcherTest.java | 6 ++
.../shuffle/events/TaskShuffleReadInfoEvent.java | 11 ++-
.../shuffle/reader/RssShuffleDataIterator.java | 8 ++
.../shuffle/manager/ShuffleManagerGrpcService.java | 4 +-
.../scala/org/apache/spark/UniffleListener.scala | 6 ++
.../org/apache/spark/UniffleStatusStore.scala | 16 ++++
.../scala/org/apache/spark/ui/ShufflePage.scala | 63 ++++++++++++--
.../spark/shuffle/reader/RssShuffleReader.java | 7 +-
.../common/shuffle/impl/RssTezFetcherTest.java | 6 ++
.../RssTezShuffleDataFetcherTest.java | 6 ++
.../uniffle/client/api/ShuffleReadClient.java | 3 +
.../uniffle/client/impl/ShuffleReadClientImpl.java | 6 ++
.../apache/uniffle/common/ShuffleReadTimes.java | 97 ++++++++++++++++++++++
.../request/RssReportShuffleReadMetricRequest.java | 7 +-
proto/src/main/proto/Rss.proto | 9 ++
15 files changed, 244 insertions(+), 11 deletions(-)
diff --git
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
index 871f9e58f..85cc09614 100644
---
a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
+++
b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java
@@ -69,6 +69,7 @@ import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.ShuffleReadTimes;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.compression.Lz4Codec;
@@ -621,6 +622,11 @@ public class FetcherTest {
@Override
public void logStatics() {}
+
+ @Override
+ public ShuffleReadTimes getShuffleReadTimes() {
+ return new ShuffleReadTimes();
+ }
}
static class MockedReporter implements Reporter {
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleReadInfoEvent.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleReadInfoEvent.java
index 6e999d898..fb897ee72 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleReadInfoEvent.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleReadInfoEvent.java
@@ -19,6 +19,8 @@ package org.apache.spark.shuffle.events;
import java.util.Map;
+import org.apache.uniffle.common.ShuffleReadTimes;
+
public class TaskShuffleReadInfoEvent extends UniffleEvent {
private int stageId;
private int shuffleId;
@@ -26,6 +28,7 @@ public class TaskShuffleReadInfoEvent extends UniffleEvent {
private Map<String, ShuffleReadMetric> metrics;
private boolean isShuffleReadFailed;
private String failureReason;
+ private ShuffleReadTimes shuffleReadTimes;
public TaskShuffleReadInfoEvent(
int stageId,
@@ -33,13 +36,15 @@ public class TaskShuffleReadInfoEvent extends UniffleEvent {
long taskId,
Map<String, ShuffleReadMetric> metrics,
boolean isShuffleReadFailed,
- String failureReason) {
+ String failureReason,
+ ShuffleReadTimes shuffleReadTimes) {
this.stageId = stageId;
this.shuffleId = shuffleId;
this.taskId = taskId;
this.metrics = metrics;
this.isShuffleReadFailed = isShuffleReadFailed;
this.failureReason = failureReason;
+ this.shuffleReadTimes = shuffleReadTimes;
}
public int getStageId() {
@@ -65,4 +70,8 @@ public class TaskShuffleReadInfoEvent extends UniffleEvent {
public String getFailureReason() {
return failureReason;
}
+
+ public ShuffleReadTimes getShuffleReadTimes() {
+ return shuffleReadTimes;
+ }
}
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
index bd0ae0ccb..e6bec30e8 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
+import org.apache.uniffle.common.ShuffleReadTimes;
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.RssUtils;
@@ -230,4 +231,11 @@ public class RssShuffleDataIterator<K, C> extends
AbstractIterator<Product2<K, C
protected ShuffleReadMetrics getShuffleReadMetrics() {
return shuffleReadMetrics;
}
+
+ public ShuffleReadTimes getReadTimes() {
+ ShuffleReadTimes times = shuffleReadClient.getShuffleReadTimes();
+ times.withDecompressed(decompressTime);
+ times.withDeserialized(serializeTime);
+ return times;
+ }
}
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index b2ce7fbc9..9a7b48eb4 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -44,6 +44,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ReceivingFailureServer;
+import org.apache.uniffle.common.ShuffleReadTimes;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.JavaUtils;
@@ -768,7 +769,8 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
x.getValue().getHadoopDurationMillis(),
x.getValue().getHadoopByteSize()))),
request.getIsTaskReadFailed(),
- request.getShuffleReadFailureReason());
+ request.getShuffleReadFailureReason(),
+ ShuffleReadTimes.fromProto(request.getShuffleReadTimes()));
RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(event);
RssProtos.ReportShuffleReadMetricResponse reply =
RssProtos.ReportShuffleReadMetricResponse.newBuilder()
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
index 8f4aeb529..94fd0cc81 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
@@ -22,6 +22,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent,
SparkListenerJobEnd, SparkListenerJobStart, SparkListenerTaskEnd}
import org.apache.spark.shuffle.events.{ShuffleAssignmentInfoEvent,
ShuffleWriteTimes, TaskReassignInfoEvent, TaskShuffleReadInfoEvent,
TaskShuffleWriteInfoEvent}
import org.apache.spark.status.ElementTrackingStore
+import org.apache.uniffle.common.ShuffleReadTimes
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
@@ -30,6 +31,7 @@ import scala.collection.JavaConverters.mapAsScalaMapConverter
class UniffleListener(conf: SparkConf, kvstore: ElementTrackingStore)
extends SparkListener with Logging {
+ private val aggregatedShuffleReadTimes = new ShuffleReadTimes()
private val aggregatedShuffleWriteTimes = new ShuffleWriteTimes()
private val aggregatedShuffleWriteMetric = new ConcurrentHashMap[String,
AggregatedShuffleWriteMetric]
private val aggregatedShuffleReadMetric = new ConcurrentHashMap[String,
AggregatedShuffleReadMetric]
@@ -65,6 +67,9 @@ class UniffleListener(conf: SparkConf, kvstore:
ElementTrackingStore)
kvstore.write(
AggregatedShuffleWriteTimesUIData(aggregatedShuffleWriteTimes)
)
+ kvstore.write(
+ AggregatedShuffleReadTimesUIData(aggregatedShuffleReadTimes)
+ )
}
}
@@ -137,6 +142,7 @@ class UniffleListener(conf: SparkConf, kvstore:
ElementTrackingStore)
agg_metric.hadoopByteSize += rmetric.getHadoopByteSize
agg_metric.hadoopDurationMillis += rmetric.getHadoopDurationMillis
}
+ aggregatedShuffleReadTimes.merge(event.getShuffleReadTimes)
}
override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
index 931a1ce53..098013e05 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
@@ -22,6 +22,7 @@ import org.apache.spark.shuffle.events.{ShuffleWriteTimes,
TaskReassignInfoEvent
import org.apache.spark.status.KVUtils.KVIndexParam
import org.apache.spark.util.Utils
import org.apache.spark.util.kvstore.{KVIndex, KVStore, KVStoreView}
+import org.apache.uniffle.common.ShuffleReadTimes
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters.asScalaIteratorConverter
@@ -49,6 +50,15 @@ class UniffleStatusStore(store: KVStore) {
}
}
+ def shuffleReadTimes(): AggregatedShuffleReadTimesUIData = {
+ val kClass = classOf[AggregatedShuffleReadTimesUIData]
+ try {
+ store.read(kClass, kClass.getName)
+ } catch {
+ case _: NoSuchElementException => AggregatedShuffleReadTimesUIData(new
ShuffleReadTimes())
+ }
+ }
+
def shuffleWriteTimes(): AggregatedShuffleWriteTimesUIData = {
val kClass = classOf[AggregatedShuffleWriteTimesUIData]
try {
@@ -164,6 +174,12 @@ case class AggregatedShuffleWriteTimesUIData(times:
ShuffleWriteTimes) {
def id: String = classOf[AggregatedShuffleWriteTimesUIData].getName()
}
+case class AggregatedShuffleReadTimesUIData(times: ShuffleReadTimes) {
+ @JsonIgnore
+ @KVIndex
+ def id: String = classOf[AggregatedShuffleReadTimesUIData].getName()
+}
+
case class ReassignInfoUIData(event: TaskReassignInfoEvent) {
@JsonIgnore
@KVIndex
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
index e4b32bdb8..3fadde1e9 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
@@ -40,6 +40,15 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("")
with Logging {
</td>
</tr>
+ private def shuffleReadTimesRow(kv: Seq[String]) = <tr>
+ <td>{kv(0)}</td>
+ <td>{kv(1)}</td>
+ <td>{kv(2)}</td>
+ <td>{kv(3)}</td>
+ <td>{kv(4)}</td>
+ <td>{kv(5)}</td>
+ </tr>
+
private def shuffleWriteTimesRow(kv: Seq[String]) = <tr>
<td>{kv(0)}</td>
<td>{kv(1)}</td>
@@ -146,9 +155,36 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
fixedWidth = true
)
+ // render shuffle read times
+ val readTimes = runtimeStatusStore.shuffleReadTimes().times
+ val readTotal = if (readTimes.getTotal <= 0) -1 else readTimes.getTotal
+ val readTimesUI = UIUtils.listingTable(
+ Seq("Total", "Fetch", "Copy", "CRC", "Decompress", "Deserialize"),
+ shuffleReadTimesRow,
+ Seq(
+ Seq(
+ UIUtils.formatDuration(readTotal),
+ UIUtils.formatDuration(readTimes.getFetch),
+ UIUtils.formatDuration(readTimes.getCopy),
+ UIUtils.formatDuration(readTimes.getCrc),
+ UIUtils.formatDuration(readTimes.getDecompress),
+ UIUtils.formatDuration(readTimes.getDeserialize),
+ ),
+ Seq(
+ 1,
+ readTimes.getFetch.toDouble / readTotal,
+ readTimes.getCopy.toDouble / readTotal,
+ readTimes.getCrc.toDouble / readTotal,
+ readTimes.getDecompress.toDouble / readTotal,
+ readTimes.getDeserialize.toDouble / readTotal,
+ ).map(x => roundToTwoDecimals(x).toString)
+ ),
+ fixedWidth = true
+ )
+
// render shuffle write times
val writeTimes = runtimeStatusStore.shuffleWriteTimes().times
- val total = if (writeTimes.getTotal <= 0) -1 else writeTimes.getTotal
+ val writeTotal = if (writeTimes.getTotal <= 0) -1 else writeTimes.getTotal
val writeTimesUI = UIUtils.listingTable(
Seq("Total Time", "Wait Finish Time", "Copy Time", "Serialize Time",
"Compress Time", "Sort Time", "Require Memory Time"),
shuffleWriteTimesRow,
@@ -164,12 +200,12 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
),
Seq(
1.toDouble,
- writeTimes.getWaitFinish.toDouble / total,
- writeTimes.getCopy.toDouble / total,
- writeTimes.getSerialize.toDouble / total,
- writeTimes.getCompress.toDouble / total,
- writeTimes.getSort.toDouble / total,
- writeTimes.getRequireMemory.toDouble / total,
+ writeTimes.getWaitFinish.toDouble / writeTotal,
+ writeTimes.getCopy.toDouble / writeTotal,
+ writeTimes.getSerialize.toDouble / writeTotal,
+ writeTimes.getCompress.toDouble / writeTotal,
+ writeTimes.getSort.toDouble / writeTotal,
+ writeTimes.getRequireMemory.toDouble / writeTotal,
).map(x => roundToTwoDecimals(x).toString)
),
fixedWidth = true
@@ -407,6 +443,19 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
{writeTimesUI}
</div>
</div>
+
+ <div>
+ <span class="collapse-read-times-properties collapse-table"
+ onClick="collapseTable('collapse-read-times-properties',
'read-times-table')">
+ <h4>
+ <span class="collapse-table-arrow arrow-closed"></span>
+ <a>Shuffle Read Times</a>
+ </h4>
+ </span>
+ <div class="read-times-table collapsible-table collapsed">
+ {readTimesUI}
+ </div>
+ </div>
</div>
}
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 4113f0627..d6bb9b210 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -60,6 +60,7 @@ import
org.apache.uniffle.client.request.RssReportShuffleReadMetricRequest;
import org.apache.uniffle.client.response.RssReportShuffleReadMetricResponse;
import org.apache.uniffle.client.util.RssClientConfig;
import org.apache.uniffle.common.ShuffleDataDistributionType;
+import org.apache.uniffle.common.ShuffleReadTimes;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
@@ -99,6 +100,8 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
private boolean isShuffleReadFailed = false;
private Optional<String> shuffleReadReason = Optional.empty();
+ private ShuffleReadTimes shuffleReadTimes = new ShuffleReadTimes();
+
public RssShuffleReader(
int startPartition,
int endPartition,
@@ -314,6 +317,7 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
iterator,
FunctionUtils.once(
() -> {
+ shuffleReadTimes.merge(iterator.getReadTimes());
context.taskMetrics().mergeShuffleReadMetrics();
return iterator.cleanup();
}));
@@ -389,7 +393,8 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
x.getValue().getHadoopReadLocalFileDurationMillis(),
x.getValue().getHadoopReadLocalFileBytes()))),
isShuffleReadFailed,
- shuffleReadReason));
+ shuffleReadReason,
+ shuffleReadTimes));
if (response != null && response.getStatusCode() !=
StatusCode.SUCCESS) {
LOG.error("Errors on reporting shuffle read metrics to driver");
}
diff --git
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTest.java
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTest.java
index c36c2f297..fdf1f84cd 100644
---
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTest.java
+++
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTest.java
@@ -55,6 +55,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
+import org.apache.uniffle.common.ShuffleReadTimes;
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.compression.Lz4Codec;
import org.apache.uniffle.common.config.RssConf;
@@ -359,6 +360,11 @@ public class RssTezFetcherTest {
@Override
public void logStatics() {}
+
+ @Override
+ public ShuffleReadTimes getShuffleReadTimes() {
+ return new ShuffleReadTimes();
+ }
}
/**
diff --git
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcherTest.java
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcherTest.java
index c6b2efd77..2ea90a923 100644
---
a/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcherTest.java
+++
b/client-tez/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcherTest.java
@@ -66,6 +66,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
+import org.apache.uniffle.common.ShuffleReadTimes;
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.compression.Lz4Codec;
@@ -397,5 +398,10 @@ public class RssTezShuffleDataFetcherTest {
@Override
public void logStatics() {}
+
+ @Override
+ public ShuffleReadTimes getShuffleReadTimes() {
+ return new ShuffleReadTimes();
+ }
}
}
diff --git
a/client/src/main/java/org/apache/uniffle/client/api/ShuffleReadClient.java
b/client/src/main/java/org/apache/uniffle/client/api/ShuffleReadClient.java
index 33270dfc4..f4e7fe1f8 100644
--- a/client/src/main/java/org/apache/uniffle/client/api/ShuffleReadClient.java
+++ b/client/src/main/java/org/apache/uniffle/client/api/ShuffleReadClient.java
@@ -18,6 +18,7 @@
package org.apache.uniffle.client.api;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
+import org.apache.uniffle.common.ShuffleReadTimes;
public interface ShuffleReadClient {
@@ -28,4 +29,6 @@ public interface ShuffleReadClient {
void close();
void logStatics();
+
+ ShuffleReadTimes getShuffleReadTimes();
}
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index 44e8a3f67..20497cb99 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -40,6 +40,7 @@ import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.ShuffleReadTimes;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
@@ -356,4 +357,9 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
+ " ms");
clientReadHandler.logConsumedBlockInfo();
}
+
+ @Override
+ public ShuffleReadTimes getShuffleReadTimes() {
+ return new ShuffleReadTimes(readDataTime.get(), copyTime.get(),
crcCheckTime.get());
+ }
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/ShuffleReadTimes.java
b/common/src/main/java/org/apache/uniffle/common/ShuffleReadTimes.java
new file mode 100644
index 000000000..7be409fc0
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleReadTimes.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+import org.apache.uniffle.proto.RssProtos;
+
+/** The unit is millis */
+public class ShuffleReadTimes {
+ private long fetch;
+ private long crc;
+ private long copy;
+ private long deserialize;
+ private long decompress;
+
+ public ShuffleReadTimes() {}
+
+ public ShuffleReadTimes(long fetch, long crc, long copy) {
+ this.fetch = fetch;
+ this.crc = crc;
+ this.copy = copy;
+ }
+
+ public long getFetch() {
+ return fetch;
+ }
+
+ public long getCrc() {
+ return crc;
+ }
+
+ public long getCopy() {
+ return copy;
+ }
+
+ public void withDeserialized(long deserialized) {
+ this.deserialize = deserialized;
+ }
+
+ public void withDecompressed(long decompressed) {
+ this.decompress = decompressed;
+ }
+
+ public long getDeserialize() {
+ return deserialize;
+ }
+
+ public long getDecompress() {
+ return decompress;
+ }
+
+ public void merge(ShuffleReadTimes other) {
+ this.fetch += other.fetch;
+ this.crc += other.crc;
+ this.copy += other.copy;
+ this.deserialize += other.deserialize;
+ this.decompress += other.decompress;
+ }
+
+ public long getTotal() {
+ return fetch + crc + copy + deserialize + decompress;
+ }
+
+ public RssProtos.ShuffleReadTimes toProto() {
+ return RssProtos.ShuffleReadTimes.newBuilder()
+ .setFetch(fetch)
+ .setCrc(crc)
+ .setCopy(copy)
+ .setDecompress(decompress)
+ .setDeserialize(deserialize)
+ .build();
+ }
+
+ public static ShuffleReadTimes fromProto(RssProtos.ShuffleReadTimes proto) {
+ ShuffleReadTimes time = new ShuffleReadTimes();
+ time.fetch = proto.getFetch();
+ time.crc = proto.getCrc();
+ time.copy = proto.getCopy();
+ time.decompress = proto.getDecompress();
+ time.deserialize = proto.getDeserialize();
+ return time;
+ }
+}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
index d536d49eb..e88ea097d 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
@@ -21,6 +21,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
+import org.apache.uniffle.common.ShuffleReadTimes;
import org.apache.uniffle.proto.RssProtos;
public class RssReportShuffleReadMetricRequest {
@@ -30,6 +31,7 @@ public class RssReportShuffleReadMetricRequest {
private Map<String, TaskShuffleReadMetric> metrics;
private boolean isShuffleReadFailed;
private Optional<String> shuffleReadReason;
+ private ShuffleReadTimes shuffleReadTimes;
public RssReportShuffleReadMetricRequest(
int stageId,
@@ -37,13 +39,15 @@ public class RssReportShuffleReadMetricRequest {
long taskId,
Map<String, TaskShuffleReadMetric> metrics,
boolean isShuffleReadFailed,
- Optional<String> shuffleReadReason) {
+ Optional<String> shuffleReadReason,
+ ShuffleReadTimes shuffleReadTimes) {
this.stageId = stageId;
this.shuffleId = shuffleId;
this.taskId = taskId;
this.metrics = metrics;
this.isShuffleReadFailed = isShuffleReadFailed;
this.shuffleReadReason = shuffleReadReason;
+ this.shuffleReadTimes = shuffleReadTimes;
}
public RssProtos.ReportShuffleReadMetricRequest toProto() {
@@ -56,6 +60,7 @@ public class RssReportShuffleReadMetricRequest {
.setTaskId(request.taskId)
.setIsTaskReadFailed(request.isShuffleReadFailed)
.setShuffleReadFailureReason(request.shuffleReadReason.orElse(""))
+ .setShuffleReadTimes(shuffleReadTimes.toProto())
.putAllMetrics(
request.metrics.entrySet().stream()
.collect(
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 1ad6f2a5d..2967d98c0 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -641,6 +641,15 @@ message ReportShuffleReadMetricRequest {
map<string, ShuffleReadMetric> metrics = 4;
bool isTaskReadFailed = 5;
string shuffleReadFailureReason = 6;
+ ShuffleReadTimes shuffleReadTimes = 7;
+}
+
+message ShuffleReadTimes {
+ int64 fetch = 1;
+ int64 crc = 2;
+ int64 copy = 3;
+ int64 deserialize = 4;
+ int64 decompress = 5;
}
message ReportShuffleReadMetricResponse {