This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 4e3dfbc64 [#2460] feat(spark)(part-1): Introducing spark uniffle ui
for better observability (#2459)
4e3dfbc64 is described below
commit 4e3dfbc64e0f9ffd26a818aa38305fbff198484b
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Apr 25 14:18:44 2025 +0800
[#2460] feat(spark)(part-1): Introducing spark uniffle ui for better
observability (#2459)
### What changes were proposed in this pull request?
Introducing spark uniffle ui for better observability.

### Why are the changes needed?
Through the spark uniffle UI, we could find out slow shuffle-servers more
easiler.
### Does this PR introduce _any_ user-facing change?
Yes.
`spark.plugins org.apache.spark.UnifflePlugin`
### How was this patch tested?
Internal tests
---
.../shuffle/events/ShuffleAssignmentInfoEvent.java | 38 ++++
.../apache/spark/shuffle/events/ShuffleMetric.java | 36 ++++
.../spark/shuffle/events/ShuffleReadMetric.java | 24 +++
.../spark/shuffle/events/ShuffleWriteMetric.java | 24 +++
.../shuffle/events/TaskShuffleReadInfoEvent.java | 51 +++++
.../shuffle/events/TaskShuffleWriteInfoEvent.java | 51 +++++
.../apache/spark/shuffle/events/UniffleEvent.java | 27 +++
.../shuffle/manager/ShuffleManagerGrpcService.java | 55 +++++
client-spark/{spark3 => extension}/pom.xml | 81 ++++---
.../scala/org/apache/spark/UniffleListener.scala | 79 +++++++
.../scala/org/apache/spark/UnifflePlugin.scala | 64 ++++++
.../org/apache/spark/UniffleStatusStore.scala | 67 ++++++
.../scala/org/apache/spark/ui/ShufflePage.scala | 240 +++++++++++++++++++++
.../scala/org/apache/spark/ui/ShuffleTab.scala | 35 +++
client-spark/spark3/pom.xml | 5 +
.../apache/spark/shuffle/RssShuffleManager.java | 15 ++
.../spark/shuffle/writer/RssShuffleWriter.java | 19 ++
.../client/impl/ShuffleServerPushCostTracker.java | 13 ++
dev/scripts/checkshade.sh | 2 +-
.../uniffle/client/api/ShuffleManagerClient.java | 10 +
.../client/impl/grpc/ShuffleManagerGrpcClient.java | 20 ++
.../request/RssReportShuffleReadMetricRequest.java | 77 +++++++
.../RssReportShuffleWriteMetricRequest.java | 77 +++++++
.../RssReportShuffleReadMetricResponse.java | 33 +++
.../RssReportShuffleWriteMetricResponse.java | 33 +++
pom.xml | 45 +++-
proto/src/main/proto/Rss.proto | 35 +++
27 files changed, 1209 insertions(+), 47 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleAssignmentInfoEvent.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleAssignmentInfoEvent.java
new file mode 100644
index 000000000..add7a1440
--- /dev/null
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleAssignmentInfoEvent.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.events;
+
+import java.util.List;
+
+public class ShuffleAssignmentInfoEvent extends UniffleEvent {
+ private int shuffleId;
+ private List<String> assignedServers;
+
+ public ShuffleAssignmentInfoEvent(int shuffleId, List<String>
assignedServers) {
+ this.shuffleId = shuffleId;
+ this.assignedServers = assignedServers;
+ }
+
+ public int getShuffleId() {
+ return shuffleId;
+ }
+
+ public List<String> getAssignedServers() {
+ return assignedServers;
+ }
+}
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleMetric.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleMetric.java
new file mode 100644
index 000000000..72cd97d38
--- /dev/null
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleMetric.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.events;
+
+public class ShuffleMetric {
+ private long durationMillis;
+ private long byteSize;
+
+ public ShuffleMetric(long durationMillis, long byteSize) {
+ this.durationMillis = durationMillis;
+ this.byteSize = byteSize;
+ }
+
+ public long getDurationMillis() {
+ return durationMillis;
+ }
+
+ public long getByteSize() {
+ return byteSize;
+ }
+}
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleReadMetric.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleReadMetric.java
new file mode 100644
index 000000000..1cb54c69f
--- /dev/null
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleReadMetric.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.events;
+
+public class ShuffleReadMetric extends ShuffleMetric {
+ public ShuffleReadMetric(long durationMillis, long byteSize) {
+ super(durationMillis, byteSize);
+ }
+}
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleWriteMetric.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleWriteMetric.java
new file mode 100644
index 000000000..4b24c1fa8
--- /dev/null
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleWriteMetric.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.events;
+
+public class ShuffleWriteMetric extends ShuffleMetric {
+ public ShuffleWriteMetric(long durationMillis, long byteSize) {
+ super(durationMillis, byteSize);
+ }
+}
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleReadInfoEvent.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleReadInfoEvent.java
new file mode 100644
index 000000000..bf847a873
--- /dev/null
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleReadInfoEvent.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.events;
+
+import java.util.Map;
+
+public class TaskShuffleReadInfoEvent extends UniffleEvent {
+ private int stageId;
+ private int shuffleId;
+ private long taskId;
+ private Map<String, ShuffleReadMetric> metrics;
+
+ public TaskShuffleReadInfoEvent(
+ int stageId, int shuffleId, long taskId, Map<String, ShuffleReadMetric>
metrics) {
+ this.stageId = stageId;
+ this.shuffleId = shuffleId;
+ this.taskId = taskId;
+ this.metrics = metrics;
+ }
+
+ public int getStageId() {
+ return stageId;
+ }
+
+ public int getShuffleId() {
+ return shuffleId;
+ }
+
+ public long getTaskId() {
+ return taskId;
+ }
+
+ public Map<String, ShuffleReadMetric> getMetrics() {
+ return metrics;
+ }
+}
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
new file mode 100644
index 000000000..6368566d1
--- /dev/null
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.events;
+
+import java.util.Map;
+
+public class TaskShuffleWriteInfoEvent extends UniffleEvent {
+ private int stageId;
+ private int shuffleId;
+ private long taskId;
+ private Map<String, ShuffleWriteMetric> metrics;
+
+ public TaskShuffleWriteInfoEvent(
+ int stageId, int shuffleId, long taskId, Map<String, ShuffleWriteMetric>
metrics) {
+ this.stageId = stageId;
+ this.shuffleId = shuffleId;
+ this.taskId = taskId;
+ this.metrics = metrics;
+ }
+
+ public int getStageId() {
+ return stageId;
+ }
+
+ public int getShuffleId() {
+ return shuffleId;
+ }
+
+ public long getTaskId() {
+ return taskId;
+ }
+
+ public Map<String, ShuffleWriteMetric> getMetrics() {
+ return metrics;
+ }
+}
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/UniffleEvent.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/UniffleEvent.java
new file mode 100644
index 000000000..a73b3bcaf
--- /dev/null
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/UniffleEvent.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.events;
+
+import org.apache.spark.scheduler.SparkListenerEvent;
+
+public abstract class UniffleEvent implements SparkListenerEvent {
+
+ public boolean logEvent() {
+ return true;
+ }
+}
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index e01f5f892..2ba39eac1 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -31,6 +31,11 @@ import java.util.stream.Collectors;
import com.google.protobuf.UnsafeByteOperations;
import io.grpc.stub.StreamObserver;
import org.apache.spark.SparkException;
+import org.apache.spark.shuffle.RssSparkShuffleUtils;
+import org.apache.spark.shuffle.events.ShuffleReadMetric;
+import org.apache.spark.shuffle.events.ShuffleWriteMetric;
+import org.apache.spark.shuffle.events.TaskShuffleReadInfoEvent;
+import org.apache.spark.shuffle.events.TaskShuffleWriteInfoEvent;
import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
import org.apache.spark.shuffle.handle.StageAttemptShuffleHandleInfo;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -711,4 +716,54 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
+
+ @Override
+ public void reportShuffleWriteMetric(
+ RssProtos.ReportShuffleWriteMetricRequest request,
+ StreamObserver<RssProtos.ReportShuffleWriteMetricResponse>
responseObserver) {
+ TaskShuffleWriteInfoEvent event =
+ new TaskShuffleWriteInfoEvent(
+ request.getStageId(),
+ request.getShuffleId(),
+ request.getTaskId(),
+ request.getMetricsMap().entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ x ->
+ new ShuffleWriteMetric(
+ x.getValue().getDurationMillis(),
x.getValue().getByteSize()))));
+ RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(event);
+ RssProtos.ReportShuffleWriteMetricResponse reply =
+ RssProtos.ReportShuffleWriteMetricResponse.newBuilder()
+ .setStatus(RssProtos.StatusCode.SUCCESS)
+ .build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void reportShuffleReadMetric(
+ RssProtos.ReportShuffleReadMetricRequest request,
+ StreamObserver<RssProtos.ReportShuffleReadMetricResponse>
responseObserver) {
+ TaskShuffleReadInfoEvent event =
+ new TaskShuffleReadInfoEvent(
+ request.getStageId(),
+ request.getShuffleId(),
+ request.getTaskId(),
+ request.getMetricsMap().entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ x ->
+ new ShuffleReadMetric(
+ x.getValue().getDurationMillis(),
x.getValue().getByteSize()))));
+ RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(event);
+ RssProtos.ReportShuffleReadMetricResponse reply =
+ RssProtos.ReportShuffleReadMetricResponse.newBuilder()
+ .setStatus(RssProtos.StatusCode.SUCCESS)
+ .build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
}
diff --git a/client-spark/spark3/pom.xml b/client-spark/extension/pom.xml
similarity index 61%
copy from client-spark/spark3/pom.xml
copy to client-spark/extension/pom.xml
index 94a124d0b..77c290484 100644
--- a/client-spark/spark3/pom.xml
+++ b/client-spark/extension/pom.xml
@@ -20,6 +20,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
+
<parent>
<artifactId>uniffle-parent</artifactId>
<groupId>org.apache.uniffle</groupId>
@@ -27,15 +28,16 @@
<relativePath>../../pom.xml</relativePath>
</parent>
- <artifactId>rss-client-spark3</artifactId>
+ <artifactId>rss-client-spark-ui</artifactId>
+ <version>0.11.0-SNAPSHOT</version>
<packaging>jar</packaging>
- <name>Apache Uniffle Client (Spark 3)</name>
+ <name>Apache Uniffle Client spark ui</name>
<dependencies>
<dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- <version>${protobuf.version}</version>
+ <groupId>org.apache.uniffle</groupId>
+ <artifactId>rss-common</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
@@ -50,53 +52,42 @@
</exclusions>
</dependency>
<dependency>
- <groupId>org.apache.uniffle</groupId>
- <artifactId>rss-client</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.uniffle</groupId>
- <artifactId>rss-client-spark-common</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-inline</artifactId>
- <scope>test</scope>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>rss-client-spark-common</artifactId>
<version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.uniffle</groupId>
- <artifactId>shuffle-storage</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-minicluster</artifactId>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
- <version>${spark.version}</version>
- <scope>provided</scope>
</dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>3.2.2</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <scalaVersion>${scala.version}</scalaVersion>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
new file mode 100644
index 000000000..e674439f1
--- /dev/null
+++
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
+import org.apache.spark.shuffle.events.{ShuffleAssignmentInfoEvent,
TaskShuffleReadInfoEvent, TaskShuffleWriteInfoEvent}
+import org.apache.spark.status.ElementTrackingStore
+
+class UniffleListener(conf: SparkConf, kvstore: ElementTrackingStore)
+ extends SparkListener with Logging {
+
+ private def onBuildInfo(event: BuildInfoEvent): Unit = {
+ val uiData = new BuildInfoUIData(event.info.toSeq.sortBy(_._1))
+ kvstore.write(uiData)
+ }
+
+ private def onAssignmentInfo(info: ShuffleAssignmentInfoEvent): Unit = {
+ kvstore.write(
+ new ShuffleAssignmentUIData(
+ info.getShuffleId,
+ info.getAssignedServers
+ )
+ )
+ }
+
+ private def onTaskShuffleWriteInfo(event: TaskShuffleWriteInfoEvent): Unit =
{
+ kvstore.write(
+ new TaskShuffleWriteMetricUIData(
+ event.getStageId,
+ event.getShuffleId,
+ event.getTaskId,
+ event.getMetrics
+ )
+ )
+ }
+
+ private def onTaskShuffleReadInfo(event: TaskShuffleReadInfoEvent): Unit = {
+ kvstore.write(
+ new TaskShuffleReadMetricUIData(
+ event.getStageId,
+ event.getShuffleId,
+ event.getTaskId,
+ event.getMetrics
+ )
+ )
+ }
+
+ override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
+ case e: BuildInfoEvent => onBuildInfo(e)
+ case e: TaskShuffleWriteInfoEvent => onTaskShuffleWriteInfo(e)
+ case e: TaskShuffleReadInfoEvent => onTaskShuffleReadInfo(e)
+ case e: ShuffleAssignmentInfoEvent => onAssignmentInfo(e)
+ case _ => // Ignore
+ }
+}
+
+object UniffleListener {
+ def register(ctx: SparkContext): Unit = {
+ val kvStore = ctx.statusStore.store.asInstanceOf[ElementTrackingStore]
+ val listener = new UniffleListener(ctx.conf, kvStore)
+ ctx.listenerBus.addToStatusQueue(listener)
+ }
+}
\ No newline at end of file
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/UnifflePlugin.scala
b/client-spark/extension/src/main/scala/org/apache/spark/UnifflePlugin.scala
new file mode 100644
index 000000000..8a59dbf47
--- /dev/null
+++ b/client-spark/extension/src/main/scala/org/apache/spark/UnifflePlugin.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin,
PluginContext, SparkPlugin}
+import org.apache.spark.internal.Logging
+import org.apache.spark.shuffle.events.UniffleEvent
+import org.apache.spark.status.ElementTrackingStore
+import org.apache.spark.ui.ShuffleTab
+import org.apache.uniffle.common.ProjectConstants
+
+import java.util.Collections
+import scala.collection.mutable
+
+class UnifflePlugin extends SparkPlugin {
+ override def driverPlugin(): DriverPlugin = new UniffleDriverPlugin()
+
+ override def executorPlugin(): ExecutorPlugin = null
+}
+
+private class UniffleDriverPlugin extends DriverPlugin with Logging {
+ private var _sc: Option[SparkContext] = None
+
+ override def init(sc: SparkContext, pluginContext: PluginContext):
java.util.Map[String, String] = {
+ _sc = Some(sc)
+ UniffleListener.register(sc)
+ postBuildInfoEvent(sc)
+ attachUI(sc)
+ Collections.emptyMap()
+ }
+
+ private def attachUI(context: SparkContext): Unit = {
+ val kvStore = context.statusStore.store.asInstanceOf[ElementTrackingStore]
+ val statusStore = new UniffleStatusStore(kvStore)
+ context.ui.foreach(new ShuffleTab(statusStore, _))
+ }
+
+ private def postBuildInfoEvent(context: SparkContext): Unit = {
+ val buildInfo = new mutable.LinkedHashMap[String, String]()
+ buildInfo.put("Version", ProjectConstants.VERSION)
+ buildInfo.put("Commit Id", ProjectConstants.getGitCommitId)
+ buildInfo.put("Revision", ProjectConstants.REVISION)
+
+ val event = new BuildInfoEvent(buildInfo.toMap)
+ context.listenerBus.post(event)
+ }
+}
+
+case class BuildInfoEvent(info: Map[String, String]) extends UniffleEvent
\ No newline at end of file
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
new file mode 100644
index 000000000..fd19ffb02
--- /dev/null
+++
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import com.fasterxml.jackson.annotation.JsonIgnore
+import org.apache.spark.shuffle.events.{ShuffleReadMetric, ShuffleWriteMetric}
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.Utils
+import org.apache.spark.util.kvstore.{KVIndex, KVStore, KVStoreView}
+
+import scala.collection.JavaConverters.asScalaIteratorConverter
+
+class UniffleStatusStore(store: KVStore) {
+ private def viewToSeq[T](view: KVStoreView[T]): Seq[T] = {
+ Utils.tryWithResource(view.closeableIterator())(iter =>
iter.asScala.toList)
+ }
+
+ def buildInfo(): BuildInfoUIData = {
+ val kClass = classOf[BuildInfoUIData]
+ store.read(kClass, kClass.getName)
+ }
+
+ def taskShuffleReadMetrics(): Seq[TaskShuffleReadMetricUIData] = {
+ viewToSeq(store.view(classOf[TaskShuffleReadMetricUIData]))
+ }
+
+ def taskShuffleWriteMetrics(): Seq[TaskShuffleWriteMetricUIData] = {
+ viewToSeq(store.view(classOf[TaskShuffleWriteMetricUIData]))
+ }
+
+ def assignmentInfos(): Seq[ShuffleAssignmentUIData] = {
+ viewToSeq(store.view(classOf[ShuffleAssignmentUIData]))
+ }
+}
+
+class BuildInfoUIData(val info: Seq[(String, String)]) {
+ @JsonIgnore
+ @KVIndex
+ def id: String = classOf[BuildInfoUIData].getName()
+}
+
+class TaskShuffleWriteMetricUIData(val stageId: Int,
+ val shuffleId: Int,
+ @KVIndexParam val taskId: Long,
+ val metrics: java.util.Map[String,
ShuffleWriteMetric])
+
+class TaskShuffleReadMetricUIData(val stageId: Int,
+ val shuffleId: Int,
+ @KVIndexParam val taskId: Long,
+ val metrics: java.util.Map[String,
ShuffleReadMetric])
+class ShuffleAssignmentUIData(@KVIndexParam val shuffleId: Int,
+ val shuffleServerIdList: java.util.List[String])
\ No newline at end of file
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
new file mode 100644
index 000000000..6b7826a69
--- /dev/null
+++
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.shuffle.events.ShuffleMetric
+
+import javax.servlet.http.HttpServletRequest
+import scala.collection.JavaConverters.mapAsScalaMapConverter
+import scala.xml.{Node, NodeSeq}
+
+class ShufflePage(parent: ShuffleTab) extends WebUIPage("") with Logging {
+ private val runtimeStatusStore = parent.store
+
+ private def propertyHeader = Seq("Name", "Value")
+
+ private def propertyRow(kv: (String, String)) = <tr>
+ <td>
+ {kv._1}
+ </td> <td>
+ {kv._2}
+ </td>
+ </tr>
+
+ private def allServerRow(kv: (String, Long, Long, Long, Long, Long, Long)) =
<tr>
+ <td>{kv._1}</td>
+ <td>{kv._2}</td>
+ <td>{kv._3}</td>
+ <td>{kv._4}</td>
+ <td>{kv._5}</td>
+ <td>{kv._6}</td>
+ </tr>
+
+ private def shuffleStatisticsCalculate(shuffleMetrics: Seq[(String,
ShuffleMetric)]): (Seq[Long], Seq[String]) = {
+ if (shuffleMetrics.isEmpty) {
+ return (Seq.empty[Long], Seq.empty[String])
+ }
+
+ val trackerData = shuffleMetrics
+ val groupedAndSortedMetrics = trackerData
+ .groupBy(_._1)
+ .map {
+ case (key, metrics) =>
+ val totalByteSize = metrics.map(_._2.getByteSize).sum
+ val totalDuration = metrics.map(_._2.getDurationMillis).sum
+ (key, totalByteSize, totalDuration, totalByteSize / totalDuration)
+ }
+ .toSeq
+ .sortBy(_._4)
+
+ val minMetric = groupedAndSortedMetrics.head
+ val maxMetric = groupedAndSortedMetrics.last
+ val p25Metric = groupedAndSortedMetrics((groupedAndSortedMetrics.size *
0.25).toInt)
+ val p50Metric = groupedAndSortedMetrics(groupedAndSortedMetrics.size / 2)
+ val p75Metric = groupedAndSortedMetrics((groupedAndSortedMetrics.size *
0.75).toInt)
+
+ val speeds = Seq(minMetric, p25Metric, p50Metric, p75Metric,
maxMetric).map(_._4)
+ val shuffleServerIds = Seq(minMetric, p25Metric, p50Metric, p75Metric,
maxMetric).map(_._1)
+
+ (speeds, shuffleServerIds)
+ }
+
+ private def createShuffleMetricsRows(shuffleWriteMetrics: (Seq[Long],
Seq[String]), shuffleReadMetrics: (Seq[Long], Seq[String])):
Seq[scala.xml.Elem] = {
+ val (writeSpeeds, writeServerIds) = if (shuffleWriteMetrics != null)
shuffleWriteMetrics else (Seq.empty, Seq.empty)
+ val (readSpeeds, readServerIds) = if (shuffleReadMetrics != null)
shuffleReadMetrics else (Seq.empty, Seq.empty)
+
+ def createSpeedRow(metricType: String, speeds: Seq[Long]) = <tr>
+ <td>
+ {metricType}
+ </td>{speeds.map(speed => <td>
+ {f"$speed%.2f"}
+ </td>)}
+ </tr>
+
+ def createServerIdRow(metricType: String, serverIds: Seq[String]) = <tr>
+ <td>
+ {metricType}
+ </td>{serverIds.map(serverId => <td>
+ {serverId}
+ </td>)}
+ </tr>
+
+ val writeSpeedRow = if (writeSpeeds.nonEmpty) Some(createSpeedRow("Write
Speed (bytes/sec)", writeSpeeds)) else None
+ val writeServerIdRow = if (writeServerIds.nonEmpty)
Some(createServerIdRow("Shuffle Write Server ID", writeServerIds)) else None
+ val readSpeedRow = if (readSpeeds.nonEmpty) Some(createSpeedRow("Read
Speed (bytes/sec)", readSpeeds)) else None
+ val readServerIdRow = if (readServerIds.nonEmpty)
Some(createServerIdRow("Shuffle Read Server ID", readServerIds)) else None
+
+ Seq(writeSpeedRow, writeServerIdRow, readSpeedRow, readServerIdRow).flatten
+ }
+
+ private def combineReadWriteByServerId(writeMetrics: Seq[(String,
ShuffleMetric)], readMetrics: Seq[(String, ShuffleMetric)]): Seq[(String, Long,
Long, Long, Long, Long, Long)] = {
+ val write = groupByShuffleServer(writeMetrics)
+ val read = groupByShuffleServer(readMetrics)
+ val allServerIds = write.keySet ++ read.keySet
+ val combinedMetrics = allServerIds.toSeq.map { serverId =>
+ val writeMetric = write.getOrElse(serverId, (0L, 0L, 0L))
+ val readMetric = read.getOrElse(serverId, (0L, 0L, 0L))
+ (serverId, writeMetric._1, writeMetric._2, writeMetric._3,
readMetric._1, readMetric._2, readMetric._3)
+ }
+ combinedMetrics
+ }
+
+ private def groupByShuffleServer(shuffleMetrics: Seq[(String,
ShuffleMetric)]): Map[String, (Long, Long, Long)] = {
+ if (shuffleMetrics.isEmpty) {
+ return Map.empty[String, (Long, Long, Long)]
+ }
+ val metrics = shuffleMetrics
+ .groupBy(_._1)
+ .mapValues {
+ metrics =>
+ val totalByteSize = metrics.map(_._2.getByteSize).sum
+ val totalDuration = metrics.map(_._2.getDurationMillis).sum
+ (totalByteSize, totalDuration, totalByteSize / totalDuration)
+ }
+ .toMap
+ metrics
+ }
+
+ override def render(request: HttpServletRequest): Seq[Node] = {
+ // render build info
+ val buildInfo = runtimeStatusStore.buildInfo()
+ val buildInfoTableUI = UIUtils.listingTable(
+ propertyHeader,
+ propertyRow,
+ buildInfo.info,
+ fixedWidth = true
+ )
+
+ // render shuffle-servers write+read statistics
+ val shuffleWriteMetrics =
shuffleStatisticsCalculate(runtimeStatusStore.taskShuffleWriteMetrics().flatMap(x
=> x.metrics.asScala))
+ val shuffleReadMetrics =
shuffleStatisticsCalculate(runtimeStatusStore.taskShuffleReadMetrics().flatMap(x
=> x.metrics.asScala))
+ val shuffleHeader = Seq("Min", "P25", "P50", "P75", "Max")
+ val shuffleMetricsRows = createShuffleMetricsRows(shuffleWriteMetrics,
shuffleReadMetrics)
+ val shuffleMetricsTableUI =
+ <table class="table table-bordered table-condensed table-striped
table-head-clickable">
+ <thead>
+ <tr>
+ {("Metric" +: shuffleHeader).map(header => <th>
+ {header}
+ </th>)}
+ </tr>
+ </thead>
+ <tbody>
+ {shuffleMetricsRows}
+ </tbody>
+ </table>
+
+ // render all assigned shuffle-servers
+ val allServers = combineReadWriteByServerId(
+ runtimeStatusStore.taskShuffleWriteMetrics().flatMap(x =>
x.metrics.asScala),
+ runtimeStatusStore.taskShuffleReadMetrics().flatMap(x =>
x.metrics.asScala)
+ )
+ val allServersTableUI = UIUtils.listingTable(
+ Seq("Shuffle Server ID", "Write Bytes", "Write Duration", "Write Speed",
"Read Bytes", "Read Duration", "Read Speed"),
+ allServerRow,
+ allServers,
+ fixedWidth = true
+ )
+
+ // render assignment info
+ val assignmentInfos = runtimeStatusStore.assignmentInfos
+ val assignmentTableUI = UIUtils.listingTable(
+ Seq("Shuffle ID", "Assigned Server Number"),
+ propertyRow,
+ assignmentInfos.map(x => (x.shuffleId.toString,
x.shuffleServerIdList.size().toString)),
+ fixedWidth = true
+ )
+
+ val summary: NodeSeq =
+ <div>
+ <div>
+ <span class="collapse-sql-properties collapse-table"
+ onClick="collapseTable('build-info-table')">
+ <h4>
+ <span class="collapse-table-arrow arrow-closed"></span>
+ <a>Uniffle Build Information</a>
+ </h4>
+ </span>
+ <div class="build-info-table collapsible-table">
+ {buildInfoTableUI}
+ </div>
+ </div>
+
+ <div>
+ <span class="collapse-sql-properties collapse-table"
+ onClick="collapseTable('statistics-table')">
+ <h4>
+ <span class="collapse-table-arrow arrow-closed"></span>
+ <a>Shuffle Throughput Statistics</a>
+ </h4>
+ <div class="statistics-table collapsible-table">
+ {shuffleMetricsTableUI}
+ </div>
+ </span>
+ </div>
+
+ <div>
+ <span class="collapse-table"
onClick="collapseTable('all-servers-table')">
+ <h4>
+ <span class="collapse-table-arrow"></span>
+ <a>Shuffle Server</a>
+ </h4>
+ <div class="all-servers-table collapsed">
+ {allServersTableUI}
+ </div>
+ </span>
+ </div>
+
+ <div>
+ <span class="collapse-sql-properties collapse-table"
+ onClick="collapseTable('assignment-table')">
+ <h4>
+ <span class="collapse-table-arrow arrow-closed"></span>
+ <a>Assignment</a>
+ </h4>
+ </span>
+ <div class="assignment-table collapsible-table">
+ {assignmentTableUI}
+ </div>
+ </div>
+ </div>
+
+ UIUtils.headerSparkPage(request, "Uniffle", summary, parent)
+ }
+}
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShuffleTab.scala
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShuffleTab.scala
new file mode 100644
index 000000000..99711e9f1
--- /dev/null
+++ b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShuffleTab.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.{SparkConf, UniffleStatusStore}
+
+class ShuffleTab(val store: UniffleStatusStore, sparkUI: SparkUI)
+ extends SparkUITab(sparkUI, "uniffle")
+ with Logging {
+
+ def conf: SparkConf = sparkUI.conf
+
+ override val name = "Uniffle"
+
+ val parent = sparkUI
+
+ attachPage(new ShufflePage(this))
+ parent.attachTab(this)
+}
\ No newline at end of file
diff --git a/client-spark/spark3/pom.xml b/client-spark/spark3/pom.xml
index 94a124d0b..cea3c88ad 100644
--- a/client-spark/spark3/pom.xml
+++ b/client-spark/spark3/pom.xml
@@ -59,6 +59,11 @@
<artifactId>rss-client-spark-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.uniffle</groupId>
+ <artifactId>rss-client-spark-ui</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 12f2fada5..c6569ec91 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -17,6 +17,7 @@
package org.apache.spark.shuffle;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -41,6 +42,7 @@ import org.apache.spark.TaskContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.executor.ShuffleReadMetrics;
import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.shuffle.events.ShuffleAssignmentInfoEvent;
import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
import org.apache.spark.shuffle.handle.ShuffleHandleInfo;
import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo;
@@ -205,6 +207,19 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
+ partitionToServers.size()
+ "], shuffleServerForResult: "
+ partitionToServers);
+
+ // Post assignment event
+ RssSparkShuffleUtils.getActiveSparkContext()
+ .listenerBus()
+ .post(
+ new ShuffleAssignmentInfoEvent(
+ shuffleId,
+ new ArrayList<>(
+ partitionToServers.values().stream()
+ .flatMap(x -> x.stream())
+ .map(x -> x.getId())
+ .collect(Collectors.toSet()))));
+
return new RssShuffleHandle<>(
shuffleId, id.get(), dependency.rdd().getNumPartitions(), dependency,
hdlInfoBd);
}
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 1b40a4c07..a4e538e00 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -74,8 +74,10 @@ import org.apache.uniffle.client.impl.FailedBlockSendTracker;
import org.apache.uniffle.client.impl.TrackingBlockStatus;
import org.apache.uniffle.client.request.RssReassignOnBlockSendFailureRequest;
import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
+import org.apache.uniffle.client.request.RssReportShuffleWriteMetricRequest;
import
org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
+import org.apache.uniffle.client.response.RssReportShuffleWriteMetricResponse;
import org.apache.uniffle.common.ReceivingFailureServer;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
@@ -924,6 +926,23 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
throw e;
}
} finally {
+ // report shuffle write metrics to driver
+ if (managerClientSupplier != null) {
+ ShuffleManagerClient shuffleManagerClient =
managerClientSupplier.get();
+ if (shuffleManagerClient != null) {
+ RssReportShuffleWriteMetricResponse response =
+ shuffleManagerClient.reportShuffleWriteMetric(
+ new RssReportShuffleWriteMetricRequest(
+ taskContext.stageId(),
+ shuffleId,
+ taskContext.taskAttemptId(),
+
bufferManager.getShuffleServerPushCostTracker().toMetric()));
+ if (response.getStatusCode() != StatusCode.SUCCESS) {
+ LOG.error("Errors on reporting shuffle write metrics to driver");
+ }
+ }
+ }
+
if (blockFailSentRetryEnabled) {
if (success) {
if
(CollectionUtils.isNotEmpty(shuffleManager.getFailedBlockIds(taskId))) {
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleServerPushCostTracker.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleServerPushCostTracker.java
index f238c1073..d3d434320 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleServerPushCostTracker.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleServerPushCostTracker.java
@@ -23,11 +23,14 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.client.request.RssReportShuffleWriteMetricRequest;
+
/** This class is to track the underlying assigned shuffle servers' data
pushing speed. */
public class ShuffleServerPushCostTracker {
private static final Logger LOGGER =
LoggerFactory.getLogger(ShuffleServerPushCostTracker.class);
@@ -88,4 +91,14 @@ public class ShuffleServerPushCostTracker {
int index = (int) Math.ceil(percentile / 100.0 * costs.size()) - 1;
return costs.get(Math.min(Math.max(index, 0), costs.size() - 1));
}
+
+ public Map<String,
RssReportShuffleWriteMetricRequest.TaskShuffleWriteMetric> toMetric() {
+ return this.tracking.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ x ->
+ new
RssReportShuffleWriteMetricRequest.TaskShuffleWriteMetric(
+ x.getValue().sentDurationMillis(),
x.getValue().sentBytes())));
+ }
}
diff --git a/dev/scripts/checkshade.sh b/dev/scripts/checkshade.sh
old mode 100644
new mode 100755
index 0bede9a73..b741720b1
--- a/dev/scripts/checkshade.sh
+++ b/dev/scripts/checkshade.sh
@@ -27,7 +27,7 @@ if [ ! -f "${SHADED_JAR_PATH}" ]; then
exit 1
fi
-UNSHADED=$(unzip -l "${SHADED_JAR_PATH}" | awk 'NR>3 {print $4}' | grep -vE
'uniffle|org/apache/spark/shuffle|META-INF|git.properties|/$|\.html$|\.css$|\.xml$|^javax|^$')
+UNSHADED=$(unzip -l "${SHADED_JAR_PATH}" | awk 'NR>3 {print $4}' | grep -vE
'uniffle|org/apache/spark|META-INF|git.properties|/$|\.html$|\.css$|\.xml$|^javax|^$')
UNSHADED_COUNT=0
if [ -n "$UNSHADED" ]; then
UNSHADED_COUNT=$(wc -l <<< "$UNSHADED")
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
index d3a78ce29..381ad706f 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
@@ -22,14 +22,18 @@ import
org.apache.uniffle.client.request.RssGetShuffleResultRequest;
import org.apache.uniffle.client.request.RssPartitionToShuffleServerRequest;
import org.apache.uniffle.client.request.RssReassignOnBlockSendFailureRequest;
import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.request.RssReportShuffleReadMetricRequest;
import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
+import org.apache.uniffle.client.request.RssReportShuffleWriteMetricRequest;
import org.apache.uniffle.client.response.RssGetShuffleResultResponse;
import
org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
import org.apache.uniffle.client.response.RssReassignOnStageRetryResponse;
import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.client.response.RssReportShuffleReadMetricResponse;
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
+import org.apache.uniffle.client.response.RssReportShuffleWriteMetricResponse;
import org.apache.uniffle.common.util.StatefulCloseable;
public interface ShuffleManagerClient extends StatefulCloseable {
@@ -68,4 +72,10 @@ public interface ShuffleManagerClient extends
StatefulCloseable {
RssGetShuffleResultForMultiPartRequest request);
RssReportShuffleResultResponse
reportShuffleResult(RssReportShuffleResultRequest request);
+
+ RssReportShuffleWriteMetricResponse reportShuffleWriteMetric(
+ RssReportShuffleWriteMetricRequest request);
+
+ RssReportShuffleReadMetricResponse reportShuffleReadMetric(
+ RssReportShuffleReadMetricRequest request);
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
index 9c34f3f7d..0040e56ef 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
@@ -28,14 +28,18 @@ import
org.apache.uniffle.client.request.RssGetShuffleResultRequest;
import org.apache.uniffle.client.request.RssPartitionToShuffleServerRequest;
import org.apache.uniffle.client.request.RssReassignOnBlockSendFailureRequest;
import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.request.RssReportShuffleReadMetricRequest;
import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
+import org.apache.uniffle.client.request.RssReportShuffleWriteMetricRequest;
import org.apache.uniffle.client.response.RssGetShuffleResultResponse;
import
org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
import org.apache.uniffle.client.response.RssReassignOnStageRetryResponse;
import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.client.response.RssReportShuffleReadMetricResponse;
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
+import org.apache.uniffle.client.response.RssReportShuffleWriteMetricResponse;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureRequest;
@@ -155,6 +159,22 @@ public class ShuffleManagerGrpcClient extends GrpcClient
implements ShuffleManag
return RssReportShuffleResultResponse.fromProto(response);
}
+ @Override
+ public RssReportShuffleWriteMetricResponse reportShuffleWriteMetric(
+ RssReportShuffleWriteMetricRequest request) {
+ RssProtos.ReportShuffleWriteMetricResponse response =
+ getBlockingStub().reportShuffleWriteMetric(request.toProto());
+ return RssReportShuffleWriteMetricResponse.fromProto(response);
+ }
+
+ @Override
+ public RssReportShuffleReadMetricResponse reportShuffleReadMetric(
+ RssReportShuffleReadMetricRequest request) {
+ RssProtos.ReportShuffleReadMetricResponse response =
+ getBlockingStub().reportShuffleReadMetric(request.toProto());
+ return RssReportShuffleReadMetricResponse.fromProto(response);
+ }
+
@Override
public boolean isClosed() {
return channel.isShutdown() || channel.isTerminated();
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
new file mode 100644
index 000000000..b5f4a6624
--- /dev/null
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.request;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.uniffle.proto.RssProtos;
+
+public class RssReportShuffleReadMetricRequest {
+ private int stageId;
+ private int shuffleId;
+ private long taskId;
+ private Map<String, TaskShuffleReadMetric> metrics;
+
+ public RssReportShuffleReadMetricRequest(
+ int stageId, int shuffleId, long taskId, Map<String,
TaskShuffleReadMetric> metrics) {
+ this.stageId = stageId;
+ this.shuffleId = shuffleId;
+ this.taskId = taskId;
+ this.metrics = metrics;
+ }
+
+ public RssProtos.ReportShuffleReadMetricRequest toProto() {
+ RssReportShuffleReadMetricRequest request = this;
+ RssProtos.ReportShuffleReadMetricRequest.Builder builder =
+ RssProtos.ReportShuffleReadMetricRequest.newBuilder();
+ builder
+ .setShuffleId(request.shuffleId)
+ .setStageId(request.stageId)
+ .setTaskId(request.taskId)
+ .putAllMetrics(
+ request.metrics.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ x ->
+ RssProtos.ShuffleReadMetric.newBuilder()
+ .setByteSize(x.getValue().getByteSize())
+
.setDurationMillis(x.getValue().getDurationMillis())
+ .build())));
+ return builder.build();
+ }
+
+ public static class TaskShuffleReadMetric {
+ private long durationMillis;
+ private long byteSize;
+
+ TaskShuffleReadMetric(long durationMillis, long byteSize) {
+ this.durationMillis = durationMillis;
+ this.byteSize = byteSize;
+ }
+
+ public long getDurationMillis() {
+ return durationMillis;
+ }
+
+ public long getByteSize() {
+ return byteSize;
+ }
+ }
+}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
new file mode 100644
index 000000000..1c0f75235
--- /dev/null
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.request;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.uniffle.proto.RssProtos;
+
+public class RssReportShuffleWriteMetricRequest {
+ private int stageId;
+ private int shuffleId;
+ private long taskId;
+ private Map<String, TaskShuffleWriteMetric> metrics;
+
+ public RssReportShuffleWriteMetricRequest(
+ int stageId, int shuffleId, long taskId, Map<String,
TaskShuffleWriteMetric> metrics) {
+ this.stageId = stageId;
+ this.shuffleId = shuffleId;
+ this.taskId = taskId;
+ this.metrics = metrics;
+ }
+
+ public RssProtos.ReportShuffleWriteMetricRequest toProto() {
+ RssReportShuffleWriteMetricRequest request = this;
+ RssProtos.ReportShuffleWriteMetricRequest.Builder builder =
+ RssProtos.ReportShuffleWriteMetricRequest.newBuilder();
+ builder
+ .setShuffleId(request.shuffleId)
+ .setStageId(request.stageId)
+ .setTaskId(request.taskId)
+ .putAllMetrics(
+ request.metrics.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ x ->
+ RssProtos.ShuffleWriteMetric.newBuilder()
+ .setByteSize(x.getValue().getByteSize())
+
.setDurationMillis(x.getValue().getDurationMillis())
+ .build())));
+ return builder.build();
+ }
+
+ public static class TaskShuffleWriteMetric {
+ private long durationMillis;
+ private long byteSize;
+
+ public TaskShuffleWriteMetric(long durationMillis, long byteSize) {
+ this.durationMillis = durationMillis;
+ this.byteSize = byteSize;
+ }
+
+ public long getDurationMillis() {
+ return durationMillis;
+ }
+
+ public long getByteSize() {
+ return byteSize;
+ }
+ }
+}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleReadMetricResponse.java
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleReadMetricResponse.java
new file mode 100644
index 000000000..976f8863f
--- /dev/null
+++
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleReadMetricResponse.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.response;
+
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.proto.RssProtos;
+
+public class RssReportShuffleReadMetricResponse extends ClientResponse {
+
+ public RssReportShuffleReadMetricResponse(StatusCode statusCode) {
+ super(statusCode);
+ }
+
+ public static RssReportShuffleReadMetricResponse fromProto(
+ RssProtos.ReportShuffleReadMetricResponse rpcResponse) {
+ return new
RssReportShuffleReadMetricResponse(StatusCode.fromProto(rpcResponse.getStatus()));
+ }
+}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleWriteMetricResponse.java
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleWriteMetricResponse.java
new file mode 100644
index 000000000..437042b8b
--- /dev/null
+++
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleWriteMetricResponse.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.response;
+
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.proto.RssProtos;
+
+public class RssReportShuffleWriteMetricResponse extends ClientResponse {
+
+ public RssReportShuffleWriteMetricResponse(StatusCode statusCode) {
+ super(statusCode);
+ }
+
+ public static RssReportShuffleWriteMetricResponse fromProto(
+ RssProtos.ReportShuffleWriteMetricResponse rpcResponse) {
+ return new
RssReportShuffleWriteMetricResponse(StatusCode.fromProto(rpcResponse.getStatus()));
+ }
+}
diff --git a/pom.xml b/pom.xml
index 9d96c71ab..284a7f23c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -107,6 +107,7 @@
<skipBuildImage>true</skipBuildImage>
<snakeyaml.version>2.2</snakeyaml.version>
<kryo.version>4.0.2</kryo.version>
+ <scala.version>2.12.18</scala.version>
</properties>
<repositories>
@@ -1476,6 +1477,7 @@
<module>client-spark/spark3-shaded</module>
<module>integration-test/spark-common</module>
<module>integration-test/spark3</module>
+ <module>client-spark/extension</module>
</modules>
<dependencyManagement>
<dependencies>
@@ -1519,6 +1521,11 @@
<artifactId>rss-client-spark-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.uniffle</groupId>
+ <artifactId>rss-client-spark-ui</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>rss-client-spark-common</artifactId>
@@ -1557,6 +1564,7 @@
<module>client-spark/spark3-shaded</module>
<module>integration-test/spark-common</module>
<module>integration-test/spark3</module>
+ <module>client-spark/extension</module>
</modules>
<dependencyManagement>
<dependencies>
@@ -1608,6 +1616,11 @@
<artifactId>rss-client-spark-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.uniffle</groupId>
+ <artifactId>rss-client-spark-ui</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>rss-client-spark-common</artifactId>
@@ -1647,6 +1660,7 @@
<module>client-spark/spark3-shaded</module>
<module>integration-test/spark-common</module>
<module>integration-test/spark3</module>
+ <module>client-spark/extension</module>
</modules>
<dependencyManagement>
<dependencies>
@@ -1698,6 +1712,11 @@
<artifactId>rss-client-spark-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.uniffle</groupId>
+ <artifactId>rss-client-spark-ui</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>rss-client-spark-common</artifactId>
@@ -1739,6 +1758,7 @@
<module>client-spark/spark3-shaded</module>
<module>integration-test/spark-common</module>
<module>integration-test/spark3</module>
+ <module>client-spark/extension</module>
</modules>
<dependencyManagement>
<dependencies>
@@ -1790,6 +1810,11 @@
<artifactId>rss-client-spark-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.uniffle</groupId>
+ <artifactId>rss-client-spark-ui</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>rss-client-spark-common</artifactId>
@@ -1844,6 +1869,7 @@
<module>client-spark/spark3-shaded</module>
<module>integration-test/spark-common</module>
<module>integration-test/spark3</module>
+ <module>client-spark/extension</module>
</modules>
<dependencyManagement>
<dependencies>
@@ -1908,7 +1934,11 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.uniffle</groupId>
+ <artifactId>rss-client-spark-ui</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>rss-integration-spark-common-test</artifactId>
@@ -1947,6 +1977,7 @@
<module>client-spark/spark3-shaded</module>
<module>integration-test/spark-common</module>
<module>integration-test/spark3</module>
+ <module>client-spark/extension</module>
</modules>
<dependencyManagement>
<dependencies>
@@ -1998,6 +2029,11 @@
<artifactId>rss-client-spark-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.uniffle</groupId>
+ <artifactId>rss-client-spark-ui</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>rss-client-spark-common</artifactId>
@@ -2037,6 +2073,7 @@
<module>client-spark/spark3-shaded</module>
<module>integration-test/spark-common</module>
<module>integration-test/spark3</module>
+ <module>client-spark/extension</module>
</modules>
<dependencyManagement>
<dependencies>
@@ -2070,6 +2107,11 @@
<artifactId>rss-client-spark-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.uniffle</groupId>
+ <artifactId>rss-client-spark-ui</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>rss-client-spark-common</artifactId>
@@ -2128,6 +2170,7 @@
<id>scala2.13</id>
<properties>
<scala.binary.version>2.13</scala.binary.version>
+ <scala.version>2.13.16</scala.version>
</properties>
</profile>
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 18213341b..e47c51e13 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -584,6 +584,41 @@ service ShuffleManager {
rpc reportShuffleResult (ReportShuffleResultRequest) returns
(ReportShuffleResultResponse);
rpc getShuffleResult (GetShuffleResultRequest) returns
(GetShuffleResultResponse);
rpc getShuffleResultForMultiPart (GetShuffleResultForMultiPartRequest)
returns (GetShuffleResultForMultiPartResponse);
+ // report task shuffle metrics
+ rpc reportShuffleWriteMetric (ReportShuffleWriteMetricRequest) returns
(ReportShuffleWriteMetricResponse);
+ rpc reportShuffleReadMetric (ReportShuffleReadMetricRequest) returns
(ReportShuffleReadMetricResponse);
+}
+
+message ReportShuffleWriteMetricRequest {
+ int32 shuffleId = 1;
+ int32 stageId = 2;
+ int64 taskId = 3;
+ map<string, ShuffleWriteMetric> metrics = 4;
+}
+
+message ShuffleWriteMetric {
+ int64 durationMillis = 1;
+ int64 byteSize = 2;
+}
+
+message ShuffleReadMetric {
+ int64 durationMillis = 1;
+ int64 byteSize = 2;
+}
+
+message ReportShuffleWriteMetricResponse {
+ StatusCode status = 1;
+}
+
+message ReportShuffleReadMetricRequest {
+ int32 shuffleId = 1;
+ int32 stageId = 2;
+ int64 taskId = 3;
+ map<string, ShuffleReadMetric> metrics = 4;
+}
+
+message ReportShuffleReadMetricResponse {
+ StatusCode status = 1;
}
message ReportShuffleFetchFailureRequest {