This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e3dfbc64 [#2460] feat(spark)(part-1): Introducing spark uniffle ui 
for better observability (#2459)
4e3dfbc64 is described below

commit 4e3dfbc64e0f9ffd26a818aa38305fbff198484b
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Apr 25 14:18:44 2025 +0800

    [#2460] feat(spark)(part-1): Introducing spark uniffle ui for better 
observability (#2459)
    
    ### What changes were proposed in this pull request?
    
    Introducing spark uniffle ui for better observability.
    
    
![image](https://github.com/user-attachments/assets/f2aff734-075b-4352-937f-9d0918f7193d)
    
    ### Why are the changes needed?
    
    Through the spark uniffle UI, we could find out slow shuffle-servers more 
easiler.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    `spark.plugins org.apache.spark.UnifflePlugin`
    
    ### How was this patch tested?
    
    Internal tests
---
 .../shuffle/events/ShuffleAssignmentInfoEvent.java |  38 ++++
 .../apache/spark/shuffle/events/ShuffleMetric.java |  36 ++++
 .../spark/shuffle/events/ShuffleReadMetric.java    |  24 +++
 .../spark/shuffle/events/ShuffleWriteMetric.java   |  24 +++
 .../shuffle/events/TaskShuffleReadInfoEvent.java   |  51 +++++
 .../shuffle/events/TaskShuffleWriteInfoEvent.java  |  51 +++++
 .../apache/spark/shuffle/events/UniffleEvent.java  |  27 +++
 .../shuffle/manager/ShuffleManagerGrpcService.java |  55 +++++
 client-spark/{spark3 => extension}/pom.xml         |  81 ++++---
 .../scala/org/apache/spark/UniffleListener.scala   |  79 +++++++
 .../scala/org/apache/spark/UnifflePlugin.scala     |  64 ++++++
 .../org/apache/spark/UniffleStatusStore.scala      |  67 ++++++
 .../scala/org/apache/spark/ui/ShufflePage.scala    | 240 +++++++++++++++++++++
 .../scala/org/apache/spark/ui/ShuffleTab.scala     |  35 +++
 client-spark/spark3/pom.xml                        |   5 +
 .../apache/spark/shuffle/RssShuffleManager.java    |  15 ++
 .../spark/shuffle/writer/RssShuffleWriter.java     |  19 ++
 .../client/impl/ShuffleServerPushCostTracker.java  |  13 ++
 dev/scripts/checkshade.sh                          |   2 +-
 .../uniffle/client/api/ShuffleManagerClient.java   |  10 +
 .../client/impl/grpc/ShuffleManagerGrpcClient.java |  20 ++
 .../request/RssReportShuffleReadMetricRequest.java |  77 +++++++
 .../RssReportShuffleWriteMetricRequest.java        |  77 +++++++
 .../RssReportShuffleReadMetricResponse.java        |  33 +++
 .../RssReportShuffleWriteMetricResponse.java       |  33 +++
 pom.xml                                            |  45 +++-
 proto/src/main/proto/Rss.proto                     |  35 +++
 27 files changed, 1209 insertions(+), 47 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleAssignmentInfoEvent.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleAssignmentInfoEvent.java
new file mode 100644
index 000000000..add7a1440
--- /dev/null
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleAssignmentInfoEvent.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.events;
+
+import java.util.List;
+
+public class ShuffleAssignmentInfoEvent extends UniffleEvent {
+  private int shuffleId;
+  private List<String> assignedServers;
+
+  public ShuffleAssignmentInfoEvent(int shuffleId, List<String> 
assignedServers) {
+    this.shuffleId = shuffleId;
+    this.assignedServers = assignedServers;
+  }
+
+  public int getShuffleId() {
+    return shuffleId;
+  }
+
+  public List<String> getAssignedServers() {
+    return assignedServers;
+  }
+}
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleMetric.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleMetric.java
new file mode 100644
index 000000000..72cd97d38
--- /dev/null
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleMetric.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.events;
+
+public class ShuffleMetric {
+  private long durationMillis;
+  private long byteSize;
+
+  public ShuffleMetric(long durationMillis, long byteSize) {
+    this.durationMillis = durationMillis;
+    this.byteSize = byteSize;
+  }
+
+  public long getDurationMillis() {
+    return durationMillis;
+  }
+
+  public long getByteSize() {
+    return byteSize;
+  }
+}
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleReadMetric.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleReadMetric.java
new file mode 100644
index 000000000..1cb54c69f
--- /dev/null
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleReadMetric.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.events;
+
+public class ShuffleReadMetric extends ShuffleMetric {
+  public ShuffleReadMetric(long durationMillis, long byteSize) {
+    super(durationMillis, byteSize);
+  }
+}
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleWriteMetric.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleWriteMetric.java
new file mode 100644
index 000000000..4b24c1fa8
--- /dev/null
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/ShuffleWriteMetric.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.events;
+
+public class ShuffleWriteMetric extends ShuffleMetric {
+  public ShuffleWriteMetric(long durationMillis, long byteSize) {
+    super(durationMillis, byteSize);
+  }
+}
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleReadInfoEvent.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleReadInfoEvent.java
new file mode 100644
index 000000000..bf847a873
--- /dev/null
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleReadInfoEvent.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.events;
+
+import java.util.Map;
+
+public class TaskShuffleReadInfoEvent extends UniffleEvent {
+  private int stageId;
+  private int shuffleId;
+  private long taskId;
+  private Map<String, ShuffleReadMetric> metrics;
+
+  public TaskShuffleReadInfoEvent(
+      int stageId, int shuffleId, long taskId, Map<String, ShuffleReadMetric> 
metrics) {
+    this.stageId = stageId;
+    this.shuffleId = shuffleId;
+    this.taskId = taskId;
+    this.metrics = metrics;
+  }
+
+  public int getStageId() {
+    return stageId;
+  }
+
+  public int getShuffleId() {
+    return shuffleId;
+  }
+
+  public long getTaskId() {
+    return taskId;
+  }
+
+  public Map<String, ShuffleReadMetric> getMetrics() {
+    return metrics;
+  }
+}
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
new file mode 100644
index 000000000..6368566d1
--- /dev/null
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.events;
+
+import java.util.Map;
+
+public class TaskShuffleWriteInfoEvent extends UniffleEvent {
+  private int stageId;
+  private int shuffleId;
+  private long taskId;
+  private Map<String, ShuffleWriteMetric> metrics;
+
+  public TaskShuffleWriteInfoEvent(
+      int stageId, int shuffleId, long taskId, Map<String, ShuffleWriteMetric> 
metrics) {
+    this.stageId = stageId;
+    this.shuffleId = shuffleId;
+    this.taskId = taskId;
+    this.metrics = metrics;
+  }
+
+  public int getStageId() {
+    return stageId;
+  }
+
+  public int getShuffleId() {
+    return shuffleId;
+  }
+
+  public long getTaskId() {
+    return taskId;
+  }
+
+  public Map<String, ShuffleWriteMetric> getMetrics() {
+    return metrics;
+  }
+}
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/UniffleEvent.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/UniffleEvent.java
new file mode 100644
index 000000000..a73b3bcaf
--- /dev/null
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/UniffleEvent.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.events;
+
+import org.apache.spark.scheduler.SparkListenerEvent;
+
+public abstract class UniffleEvent implements SparkListenerEvent {
+
+  public boolean logEvent() {
+    return true;
+  }
+}
diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index e01f5f892..2ba39eac1 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -31,6 +31,11 @@ import java.util.stream.Collectors;
 import com.google.protobuf.UnsafeByteOperations;
 import io.grpc.stub.StreamObserver;
 import org.apache.spark.SparkException;
+import org.apache.spark.shuffle.RssSparkShuffleUtils;
+import org.apache.spark.shuffle.events.ShuffleReadMetric;
+import org.apache.spark.shuffle.events.ShuffleWriteMetric;
+import org.apache.spark.shuffle.events.TaskShuffleReadInfoEvent;
+import org.apache.spark.shuffle.events.TaskShuffleWriteInfoEvent;
 import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
 import org.apache.spark.shuffle.handle.StageAttemptShuffleHandleInfo;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -711,4 +716,54 @@ public class ShuffleManagerGrpcService extends 
ShuffleManagerImplBase {
     responseObserver.onNext(reply);
     responseObserver.onCompleted();
   }
+
+  @Override
+  public void reportShuffleWriteMetric(
+      RssProtos.ReportShuffleWriteMetricRequest request,
+      StreamObserver<RssProtos.ReportShuffleWriteMetricResponse> 
responseObserver) {
+    TaskShuffleWriteInfoEvent event =
+        new TaskShuffleWriteInfoEvent(
+            request.getStageId(),
+            request.getShuffleId(),
+            request.getTaskId(),
+            request.getMetricsMap().entrySet().stream()
+                .collect(
+                    Collectors.toMap(
+                        Map.Entry::getKey,
+                        x ->
+                            new ShuffleWriteMetric(
+                                x.getValue().getDurationMillis(), 
x.getValue().getByteSize()))));
+    RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(event);
+    RssProtos.ReportShuffleWriteMetricResponse reply =
+        RssProtos.ReportShuffleWriteMetricResponse.newBuilder()
+            .setStatus(RssProtos.StatusCode.SUCCESS)
+            .build();
+    responseObserver.onNext(reply);
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void reportShuffleReadMetric(
+      RssProtos.ReportShuffleReadMetricRequest request,
+      StreamObserver<RssProtos.ReportShuffleReadMetricResponse> 
responseObserver) {
+    TaskShuffleReadInfoEvent event =
+        new TaskShuffleReadInfoEvent(
+            request.getStageId(),
+            request.getShuffleId(),
+            request.getTaskId(),
+            request.getMetricsMap().entrySet().stream()
+                .collect(
+                    Collectors.toMap(
+                        Map.Entry::getKey,
+                        x ->
+                            new ShuffleReadMetric(
+                                x.getValue().getDurationMillis(), 
x.getValue().getByteSize()))));
+    RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(event);
+    RssProtos.ReportShuffleReadMetricResponse reply =
+        RssProtos.ReportShuffleReadMetricResponse.newBuilder()
+            .setStatus(RssProtos.StatusCode.SUCCESS)
+            .build();
+    responseObserver.onNext(reply);
+    responseObserver.onCompleted();
+  }
 }
diff --git a/client-spark/spark3/pom.xml b/client-spark/extension/pom.xml
similarity index 61%
copy from client-spark/spark3/pom.xml
copy to client-spark/extension/pom.xml
index 94a124d0b..77c290484 100644
--- a/client-spark/spark3/pom.xml
+++ b/client-spark/extension/pom.xml
@@ -20,6 +20,7 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <modelVersion>4.0.0</modelVersion>
+
     <parent>
         <artifactId>uniffle-parent</artifactId>
         <groupId>org.apache.uniffle</groupId>
@@ -27,15 +28,16 @@
         <relativePath>../../pom.xml</relativePath>
     </parent>
 
-    <artifactId>rss-client-spark3</artifactId>
+    <artifactId>rss-client-spark-ui</artifactId>
+    <version>0.11.0-SNAPSHOT</version>
     <packaging>jar</packaging>
-    <name>Apache Uniffle Client (Spark 3)</name>
+    <name>Apache Uniffle Client spark ui</name>
 
     <dependencies>
         <dependency>
-            <groupId>com.google.protobuf</groupId>
-            <artifactId>protobuf-java</artifactId>
-            <version>${protobuf.version}</version>
+            <groupId>org.apache.uniffle</groupId>
+            <artifactId>rss-common</artifactId>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
@@ -50,53 +52,42 @@
             </exclusions>
         </dependency>
         <dependency>
-            <groupId>org.apache.uniffle</groupId>
-            <artifactId>rss-client</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.uniffle</groupId>
-            <artifactId>rss-client-spark-common</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-all</artifactId>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
         </dependency>
         <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-inline</artifactId>
-            <scope>test</scope>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.uniffle</groupId>
             <artifactId>rss-client-spark-common</artifactId>
             <version>${project.version}</version>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.uniffle</groupId>
-            <artifactId>shuffle-storage</artifactId>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-minicluster</artifactId>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
-            <version>${spark.version}</version>
-            <scope>provided</scope>
         </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <version>3.2.2</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <scalaVersion>${scala.version}</scalaVersion>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
new file mode 100644
index 000000000..e674439f1
--- /dev/null
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
+import org.apache.spark.shuffle.events.{ShuffleAssignmentInfoEvent, 
TaskShuffleReadInfoEvent, TaskShuffleWriteInfoEvent}
+import org.apache.spark.status.ElementTrackingStore
+
+class UniffleListener(conf: SparkConf, kvstore: ElementTrackingStore)
+  extends SparkListener with Logging {
+
+  private def onBuildInfo(event: BuildInfoEvent): Unit = {
+    val uiData = new BuildInfoUIData(event.info.toSeq.sortBy(_._1))
+    kvstore.write(uiData)
+  }
+
+  private def onAssignmentInfo(info: ShuffleAssignmentInfoEvent): Unit = {
+    kvstore.write(
+      new ShuffleAssignmentUIData(
+        info.getShuffleId,
+        info.getAssignedServers
+      )
+    )
+  }
+
+  private def onTaskShuffleWriteInfo(event: TaskShuffleWriteInfoEvent): Unit = 
{
+    kvstore.write(
+      new TaskShuffleWriteMetricUIData(
+        event.getStageId,
+        event.getShuffleId,
+        event.getTaskId,
+        event.getMetrics
+      )
+    )
+  }
+
+  private def onTaskShuffleReadInfo(event: TaskShuffleReadInfoEvent): Unit = {
+    kvstore.write(
+      new TaskShuffleReadMetricUIData(
+        event.getStageId,
+        event.getShuffleId,
+        event.getTaskId,
+        event.getMetrics
+      )
+    )
+  }
+
+  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
+    case e: BuildInfoEvent => onBuildInfo(e)
+    case e: TaskShuffleWriteInfoEvent => onTaskShuffleWriteInfo(e)
+    case e: TaskShuffleReadInfoEvent => onTaskShuffleReadInfo(e)
+    case e: ShuffleAssignmentInfoEvent => onAssignmentInfo(e)
+    case _ => // Ignore
+  }
+}
+
+object UniffleListener {
+  def register(ctx: SparkContext): Unit = {
+    val kvStore = ctx.statusStore.store.asInstanceOf[ElementTrackingStore]
+    val listener = new UniffleListener(ctx.conf, kvStore)
+    ctx.listenerBus.addToStatusQueue(listener)
+  }
+}
\ No newline at end of file
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/UnifflePlugin.scala 
b/client-spark/extension/src/main/scala/org/apache/spark/UnifflePlugin.scala
new file mode 100644
index 000000000..8a59dbf47
--- /dev/null
+++ b/client-spark/extension/src/main/scala/org/apache/spark/UnifflePlugin.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.internal.Logging
+import org.apache.spark.shuffle.events.UniffleEvent
+import org.apache.spark.status.ElementTrackingStore
+import org.apache.spark.ui.ShuffleTab
+import org.apache.uniffle.common.ProjectConstants
+
+import java.util.Collections
+import scala.collection.mutable
+
+class UnifflePlugin extends SparkPlugin {
+  override def driverPlugin(): DriverPlugin = new UniffleDriverPlugin()
+
+  override def executorPlugin(): ExecutorPlugin = null
+}
+
+private class UniffleDriverPlugin extends DriverPlugin with Logging {
+  private var _sc: Option[SparkContext] = None
+
+  override def init(sc: SparkContext, pluginContext: PluginContext): 
java.util.Map[String, String] = {
+    _sc = Some(sc)
+    UniffleListener.register(sc)
+    postBuildInfoEvent(sc)
+    attachUI(sc)
+    Collections.emptyMap()
+  }
+
+  private def attachUI(context: SparkContext): Unit = {
+    val kvStore = context.statusStore.store.asInstanceOf[ElementTrackingStore]
+    val statusStore = new UniffleStatusStore(kvStore)
+    context.ui.foreach(new ShuffleTab(statusStore, _))
+  }
+
+  private def postBuildInfoEvent(context: SparkContext): Unit = {
+    val buildInfo = new mutable.LinkedHashMap[String, String]()
+    buildInfo.put("Version", ProjectConstants.VERSION)
+    buildInfo.put("Commit Id", ProjectConstants.getGitCommitId)
+    buildInfo.put("Revision", ProjectConstants.REVISION)
+
+    val event = new BuildInfoEvent(buildInfo.toMap)
+    context.listenerBus.post(event)
+  }
+}
+
+case class BuildInfoEvent(info: Map[String, String]) extends UniffleEvent
\ No newline at end of file
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
new file mode 100644
index 000000000..fd19ffb02
--- /dev/null
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import com.fasterxml.jackson.annotation.JsonIgnore
+import org.apache.spark.shuffle.events.{ShuffleReadMetric, ShuffleWriteMetric}
+import org.apache.spark.status.KVUtils.KVIndexParam
+import org.apache.spark.util.Utils
+import org.apache.spark.util.kvstore.{KVIndex, KVStore, KVStoreView}
+
+import scala.collection.JavaConverters.asScalaIteratorConverter
+
+class UniffleStatusStore(store: KVStore) {
+  private def viewToSeq[T](view: KVStoreView[T]): Seq[T] = {
+    Utils.tryWithResource(view.closeableIterator())(iter => 
iter.asScala.toList)
+  }
+
+  def buildInfo(): BuildInfoUIData = {
+    val kClass = classOf[BuildInfoUIData]
+    store.read(kClass, kClass.getName)
+  }
+
+  def taskShuffleReadMetrics(): Seq[TaskShuffleReadMetricUIData] = {
+    viewToSeq(store.view(classOf[TaskShuffleReadMetricUIData]))
+  }
+
+  def taskShuffleWriteMetrics(): Seq[TaskShuffleWriteMetricUIData] = {
+    viewToSeq(store.view(classOf[TaskShuffleWriteMetricUIData]))
+  }
+
+  def assignmentInfos(): Seq[ShuffleAssignmentUIData] = {
+    viewToSeq(store.view(classOf[ShuffleAssignmentUIData]))
+  }
+}
+
+class BuildInfoUIData(val info: Seq[(String, String)]) {
+  @JsonIgnore
+  @KVIndex
+  def id: String = classOf[BuildInfoUIData].getName()
+}
+
+class TaskShuffleWriteMetricUIData(val stageId: Int,
+                                   val shuffleId: Int,
+                                   @KVIndexParam val taskId: Long,
+                                   val metrics: java.util.Map[String, 
ShuffleWriteMetric])
+
+class TaskShuffleReadMetricUIData(val stageId: Int,
+                                  val shuffleId: Int,
+                                  @KVIndexParam val taskId: Long,
+                                  val metrics: java.util.Map[String, 
ShuffleReadMetric])
+class ShuffleAssignmentUIData(@KVIndexParam val shuffleId: Int,
+                              val shuffleServerIdList: java.util.List[String])
\ No newline at end of file
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala 
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
new file mode 100644
index 000000000..6b7826a69
--- /dev/null
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.shuffle.events.ShuffleMetric
+
+import javax.servlet.http.HttpServletRequest
+import scala.collection.JavaConverters.mapAsScalaMapConverter
+import scala.xml.{Node, NodeSeq}
+
+class ShufflePage(parent: ShuffleTab) extends WebUIPage("") with Logging {
+  private val runtimeStatusStore = parent.store
+
+  private def propertyHeader = Seq("Name", "Value")
+
+  private def propertyRow(kv: (String, String)) = <tr>
+    <td>
+      {kv._1}
+    </td> <td>
+      {kv._2}
+    </td>
+  </tr>
+
+  private def allServerRow(kv: (String, Long, Long, Long, Long, Long, Long)) = 
<tr>
+    <td>{kv._1}</td>
+    <td>{kv._2}</td>
+    <td>{kv._3}</td>
+    <td>{kv._4}</td>
+    <td>{kv._5}</td>
+    <td>{kv._6}</td>
+  </tr>
+
+  private def shuffleStatisticsCalculate(shuffleMetrics: Seq[(String, 
ShuffleMetric)]): (Seq[Long], Seq[String]) = {
+    if (shuffleMetrics.isEmpty) {
+      return (Seq.empty[Long], Seq.empty[String])
+    }
+
+    val trackerData = shuffleMetrics
+    val groupedAndSortedMetrics = trackerData
+      .groupBy(_._1)
+      .map {
+        case (key, metrics) =>
+          val totalByteSize = metrics.map(_._2.getByteSize).sum
+          val totalDuration = metrics.map(_._2.getDurationMillis).sum
+          (key, totalByteSize, totalDuration, totalByteSize / totalDuration)
+      }
+      .toSeq
+      .sortBy(_._4)
+
+    val minMetric = groupedAndSortedMetrics.head
+    val maxMetric = groupedAndSortedMetrics.last
+    val p25Metric = groupedAndSortedMetrics((groupedAndSortedMetrics.size * 
0.25).toInt)
+    val p50Metric = groupedAndSortedMetrics(groupedAndSortedMetrics.size / 2)
+    val p75Metric = groupedAndSortedMetrics((groupedAndSortedMetrics.size * 
0.75).toInt)
+
+    val speeds = Seq(minMetric, p25Metric, p50Metric, p75Metric, 
maxMetric).map(_._4)
+    val shuffleServerIds = Seq(minMetric, p25Metric, p50Metric, p75Metric, 
maxMetric).map(_._1)
+
+    (speeds, shuffleServerIds)
+  }
+
+  private def createShuffleMetricsRows(shuffleWriteMetrics: (Seq[Long], 
Seq[String]), shuffleReadMetrics: (Seq[Long], Seq[String])): 
Seq[scala.xml.Elem] = {
+    val (writeSpeeds, writeServerIds) = if (shuffleWriteMetrics != null) 
shuffleWriteMetrics else (Seq.empty, Seq.empty)
+    val (readSpeeds, readServerIds) = if (shuffleReadMetrics != null) 
shuffleReadMetrics else (Seq.empty, Seq.empty)
+
+    def createSpeedRow(metricType: String, speeds: Seq[Long]) = <tr>
+      <td>
+        {metricType}
+      </td>{speeds.map(speed => <td>
+        {f"$speed%.2f"}
+      </td>)}
+    </tr>
+
+    def createServerIdRow(metricType: String, serverIds: Seq[String]) = <tr>
+      <td>
+        {metricType}
+      </td>{serverIds.map(serverId => <td>
+        {serverId}
+      </td>)}
+    </tr>
+
+    val writeSpeedRow = if (writeSpeeds.nonEmpty) Some(createSpeedRow("Write 
Speed (bytes/sec)", writeSpeeds)) else None
+    val writeServerIdRow = if (writeServerIds.nonEmpty) 
Some(createServerIdRow("Shuffle Write Server ID", writeServerIds)) else None
+    val readSpeedRow = if (readSpeeds.nonEmpty) Some(createSpeedRow("Read 
Speed (bytes/sec)", readSpeeds)) else None
+    val readServerIdRow = if (readServerIds.nonEmpty) 
Some(createServerIdRow("Shuffle Read Server ID", readServerIds)) else None
+
+    Seq(writeSpeedRow, writeServerIdRow, readSpeedRow, readServerIdRow).flatten
+  }
+
+  private def combineReadWriteByServerId(writeMetrics: Seq[(String, 
ShuffleMetric)], readMetrics: Seq[(String, ShuffleMetric)]): Seq[(String, Long, 
Long, Long, Long, Long, Long)] = {
+    val write = groupByShuffleServer(writeMetrics)
+    val read = groupByShuffleServer(readMetrics)
+    val allServerIds = write.keySet ++ read.keySet
+    val combinedMetrics = allServerIds.toSeq.map { serverId =>
+      val writeMetric = write.getOrElse(serverId, (0L, 0L, 0L))
+      val readMetric = read.getOrElse(serverId, (0L, 0L, 0L))
+      (serverId, writeMetric._1, writeMetric._2, writeMetric._3, 
readMetric._1, readMetric._2, readMetric._3)
+    }
+    combinedMetrics
+  }
+
+  private def groupByShuffleServer(shuffleMetrics: Seq[(String, 
ShuffleMetric)]): Map[String, (Long, Long, Long)] = {
+    if (shuffleMetrics.isEmpty) {
+      return Map.empty[String, (Long, Long, Long)]
+    }
+    val metrics = shuffleMetrics
+      .groupBy(_._1)
+      .mapValues {
+        metrics =>
+          val totalByteSize = metrics.map(_._2.getByteSize).sum
+          val totalDuration = metrics.map(_._2.getDurationMillis).sum
+          (totalByteSize, totalDuration, totalByteSize / totalDuration)
+      }
+      .toMap
+    metrics
+  }
+
+  override def render(request: HttpServletRequest): Seq[Node] = {
+    // render build info
+    val buildInfo = runtimeStatusStore.buildInfo()
+    val buildInfoTableUI = UIUtils.listingTable(
+      propertyHeader,
+      propertyRow,
+      buildInfo.info,
+      fixedWidth = true
+    )
+
+    // render shuffle-servers write+read statistics
+    val shuffleWriteMetrics = 
shuffleStatisticsCalculate(runtimeStatusStore.taskShuffleWriteMetrics().flatMap(x
 => x.metrics.asScala))
+    val shuffleReadMetrics = 
shuffleStatisticsCalculate(runtimeStatusStore.taskShuffleReadMetrics().flatMap(x
 => x.metrics.asScala))
+    val shuffleHeader = Seq("Min", "P25", "P50", "P75", "Max")
+    val shuffleMetricsRows = createShuffleMetricsRows(shuffleWriteMetrics, 
shuffleReadMetrics)
+    val shuffleMetricsTableUI =
+      <table class="table table-bordered table-condensed table-striped 
table-head-clickable">
+        <thead>
+          <tr>
+            {("Metric" +: shuffleHeader).map(header => <th>
+            {header}
+          </th>)}
+          </tr>
+        </thead>
+        <tbody>
+          {shuffleMetricsRows}
+        </tbody>
+      </table>
+
+    // render all assigned shuffle-servers
+    val allServers = combineReadWriteByServerId(
+      runtimeStatusStore.taskShuffleWriteMetrics().flatMap(x => 
x.metrics.asScala),
+      runtimeStatusStore.taskShuffleReadMetrics().flatMap(x => 
x.metrics.asScala)
+    )
+    val allServersTableUI = UIUtils.listingTable(
+      Seq("Shuffle Server ID", "Write Bytes", "Write Duration", "Write Speed", 
"Read Bytes", "Read Duration", "Read Speed"),
+      allServerRow,
+      allServers,
+      fixedWidth = true
+    )
+
+    // render assignment info
+    val assignmentInfos = runtimeStatusStore.assignmentInfos
+    val assignmentTableUI = UIUtils.listingTable(
+      Seq("Shuffle ID", "Assigned Server Number"),
+      propertyRow,
+      assignmentInfos.map(x => (x.shuffleId.toString, 
x.shuffleServerIdList.size().toString)),
+      fixedWidth = true
+    )
+
+    val summary: NodeSeq =
+      <div>
+        <div>
+          <span class="collapse-sql-properties collapse-table"
+                onClick="collapseTable('build-info-table')">
+            <h4>
+              <span class="collapse-table-arrow arrow-closed"></span>
+              <a>Uniffle Build Information</a>
+            </h4>
+          </span>
+          <div class="build-info-table collapsible-table">
+            {buildInfoTableUI}
+          </div>
+        </div>
+
+        <div>
+          <span class="collapse-sql-properties collapse-table"
+                onClick="collapseTable('statistics-table')">
+            <h4>
+              <span class="collapse-table-arrow arrow-closed"></span>
+              <a>Shuffle Throughput Statistics</a>
+            </h4>
+            <div class="statistics-table collapsible-table">
+              {shuffleMetricsTableUI}
+            </div>
+          </span>
+        </div>
+
+        <div>
+          <span class="collapse-table" 
onClick="collapseTable('all-servers-table')">
+            <h4>
+              <span class="collapse-table-arrow"></span>
+              <a>Shuffle Server</a>
+            </h4>
+            <div class="all-servers-table collapsed">
+              {allServersTableUI}
+            </div>
+          </span>
+        </div>
+
+        <div>
+          <span class="collapse-sql-properties collapse-table"
+                onClick="collapseTable('assignment-table')">
+            <h4>
+              <span class="collapse-table-arrow arrow-closed"></span>
+              <a>Assignment</a>
+            </h4>
+          </span>
+          <div class="assignment-table collapsible-table">
+            {assignmentTableUI}
+          </div>
+        </div>
+      </div>
+
+    UIUtils.headerSparkPage(request, "Uniffle", summary, parent)
+  }
+}
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShuffleTab.scala 
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShuffleTab.scala
new file mode 100644
index 000000000..99711e9f1
--- /dev/null
+++ b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShuffleTab.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.{SparkConf, UniffleStatusStore}
+
+class ShuffleTab(val store: UniffleStatusStore, sparkUI: SparkUI)
+  extends SparkUITab(sparkUI, "uniffle")
+    with Logging {
+
+  def conf: SparkConf = sparkUI.conf
+
+  override val name = "Uniffle"
+
+  val parent = sparkUI
+
+  attachPage(new ShufflePage(this))
+  parent.attachTab(this)
+}
\ No newline at end of file
diff --git a/client-spark/spark3/pom.xml b/client-spark/spark3/pom.xml
index 94a124d0b..cea3c88ad 100644
--- a/client-spark/spark3/pom.xml
+++ b/client-spark/spark3/pom.xml
@@ -59,6 +59,11 @@
             <artifactId>rss-client-spark-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.uniffle</groupId>
+            <artifactId>rss-client-spark-ui</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>io.netty</groupId>
             <artifactId>netty-all</artifactId>
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 12f2fada5..c6569ec91 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.shuffle;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -41,6 +42,7 @@ import org.apache.spark.TaskContext;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.executor.ShuffleReadMetrics;
 import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.shuffle.events.ShuffleAssignmentInfoEvent;
 import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
 import org.apache.spark.shuffle.handle.ShuffleHandleInfo;
 import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo;
@@ -205,6 +207,19 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
             + partitionToServers.size()
             + "], shuffleServerForResult: "
             + partitionToServers);
+
+    // Post assignment event
+    RssSparkShuffleUtils.getActiveSparkContext()
+        .listenerBus()
+        .post(
+            new ShuffleAssignmentInfoEvent(
+                shuffleId,
+                new ArrayList<>(
+                    partitionToServers.values().stream()
+                        .flatMap(x -> x.stream())
+                        .map(x -> x.getId())
+                        .collect(Collectors.toSet()))));
+
     return new RssShuffleHandle<>(
         shuffleId, id.get(), dependency.rdd().getNumPartitions(), dependency, 
hdlInfoBd);
   }
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 1b40a4c07..a4e538e00 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -74,8 +74,10 @@ import org.apache.uniffle.client.impl.FailedBlockSendTracker;
 import org.apache.uniffle.client.impl.TrackingBlockStatus;
 import org.apache.uniffle.client.request.RssReassignOnBlockSendFailureRequest;
 import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
+import org.apache.uniffle.client.request.RssReportShuffleWriteMetricRequest;
 import 
org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
 import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
+import org.apache.uniffle.client.response.RssReportShuffleWriteMetricResponse;
 import org.apache.uniffle.common.ReceivingFailureServer;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
@@ -924,6 +926,23 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
         throw e;
       }
     } finally {
+      // report shuffle write metrics to driver
+      if (managerClientSupplier != null) {
+        ShuffleManagerClient shuffleManagerClient = 
managerClientSupplier.get();
+        if (shuffleManagerClient != null) {
+          RssReportShuffleWriteMetricResponse response =
+              shuffleManagerClient.reportShuffleWriteMetric(
+                  new RssReportShuffleWriteMetricRequest(
+                      taskContext.stageId(),
+                      shuffleId,
+                      taskContext.taskAttemptId(),
+                      
bufferManager.getShuffleServerPushCostTracker().toMetric()));
+          if (response.getStatusCode() != StatusCode.SUCCESS) {
+            LOG.error("Errors on reporting shuffle write metrics to driver");
+          }
+        }
+      }
+
       if (blockFailSentRetryEnabled) {
         if (success) {
           if 
(CollectionUtils.isNotEmpty(shuffleManager.getFailedBlockIds(taskId))) {
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleServerPushCostTracker.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleServerPushCostTracker.java
index f238c1073..d3d434320 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleServerPushCostTracker.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleServerPushCostTracker.java
@@ -23,11 +23,14 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.client.request.RssReportShuffleWriteMetricRequest;
+
 /** This class is to track the underlying assigned shuffle servers' data 
pushing speed. */
 public class ShuffleServerPushCostTracker {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ShuffleServerPushCostTracker.class);
@@ -88,4 +91,14 @@ public class ShuffleServerPushCostTracker {
     int index = (int) Math.ceil(percentile / 100.0 * costs.size()) - 1;
     return costs.get(Math.min(Math.max(index, 0), costs.size() - 1));
   }
+
+  public Map<String, 
RssReportShuffleWriteMetricRequest.TaskShuffleWriteMetric> toMetric() {
+    return this.tracking.entrySet().stream()
+        .collect(
+            Collectors.toMap(
+                Map.Entry::getKey,
+                x ->
+                    new 
RssReportShuffleWriteMetricRequest.TaskShuffleWriteMetric(
+                        x.getValue().sentDurationMillis(), 
x.getValue().sentBytes())));
+  }
 }
diff --git a/dev/scripts/checkshade.sh b/dev/scripts/checkshade.sh
old mode 100644
new mode 100755
index 0bede9a73..b741720b1
--- a/dev/scripts/checkshade.sh
+++ b/dev/scripts/checkshade.sh
@@ -27,7 +27,7 @@ if [ ! -f "${SHADED_JAR_PATH}" ]; then
     exit 1
 fi
 
-UNSHADED=$(unzip -l "${SHADED_JAR_PATH}" | awk 'NR>3 {print $4}' | grep -vE 
'uniffle|org/apache/spark/shuffle|META-INF|git.properties|/$|\.html$|\.css$|\.xml$|^javax|^$')
+UNSHADED=$(unzip -l "${SHADED_JAR_PATH}" | awk 'NR>3 {print $4}' | grep -vE 
'uniffle|org/apache/spark|META-INF|git.properties|/$|\.html$|\.css$|\.xml$|^javax|^$')
 UNSHADED_COUNT=0
 if [ -n "$UNSHADED" ]; then
     UNSHADED_COUNT=$(wc -l <<< "$UNSHADED")
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
index d3a78ce29..381ad706f 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
@@ -22,14 +22,18 @@ import 
org.apache.uniffle.client.request.RssGetShuffleResultRequest;
 import org.apache.uniffle.client.request.RssPartitionToShuffleServerRequest;
 import org.apache.uniffle.client.request.RssReassignOnBlockSendFailureRequest;
 import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.request.RssReportShuffleReadMetricRequest;
 import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
 import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
+import org.apache.uniffle.client.request.RssReportShuffleWriteMetricRequest;
 import org.apache.uniffle.client.response.RssGetShuffleResultResponse;
 import 
org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
 import org.apache.uniffle.client.response.RssReassignOnStageRetryResponse;
 import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.client.response.RssReportShuffleReadMetricResponse;
 import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
 import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
+import org.apache.uniffle.client.response.RssReportShuffleWriteMetricResponse;
 import org.apache.uniffle.common.util.StatefulCloseable;
 
 public interface ShuffleManagerClient extends StatefulCloseable {
@@ -68,4 +72,10 @@ public interface ShuffleManagerClient extends 
StatefulCloseable {
       RssGetShuffleResultForMultiPartRequest request);
 
   RssReportShuffleResultResponse 
reportShuffleResult(RssReportShuffleResultRequest request);
+
+  RssReportShuffleWriteMetricResponse reportShuffleWriteMetric(
+      RssReportShuffleWriteMetricRequest request);
+
+  RssReportShuffleReadMetricResponse reportShuffleReadMetric(
+      RssReportShuffleReadMetricRequest request);
 }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
index 9c34f3f7d..0040e56ef 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
@@ -28,14 +28,18 @@ import 
org.apache.uniffle.client.request.RssGetShuffleResultRequest;
 import org.apache.uniffle.client.request.RssPartitionToShuffleServerRequest;
 import org.apache.uniffle.client.request.RssReassignOnBlockSendFailureRequest;
 import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
+import org.apache.uniffle.client.request.RssReportShuffleReadMetricRequest;
 import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
 import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
+import org.apache.uniffle.client.request.RssReportShuffleWriteMetricRequest;
 import org.apache.uniffle.client.response.RssGetShuffleResultResponse;
 import 
org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
 import org.apache.uniffle.client.response.RssReassignOnStageRetryResponse;
 import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
+import org.apache.uniffle.client.response.RssReportShuffleReadMetricResponse;
 import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
 import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
+import org.apache.uniffle.client.response.RssReportShuffleWriteMetricResponse;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.proto.RssProtos;
 import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureRequest;
@@ -155,6 +159,22 @@ public class ShuffleManagerGrpcClient extends GrpcClient 
implements ShuffleManag
     return RssReportShuffleResultResponse.fromProto(response);
   }
 
+  @Override
+  public RssReportShuffleWriteMetricResponse reportShuffleWriteMetric(
+      RssReportShuffleWriteMetricRequest request) {
+    RssProtos.ReportShuffleWriteMetricResponse response =
+        getBlockingStub().reportShuffleWriteMetric(request.toProto());
+    return RssReportShuffleWriteMetricResponse.fromProto(response);
+  }
+
+  @Override
+  public RssReportShuffleReadMetricResponse reportShuffleReadMetric(
+      RssReportShuffleReadMetricRequest request) {
+    RssProtos.ReportShuffleReadMetricResponse response =
+        getBlockingStub().reportShuffleReadMetric(request.toProto());
+    return RssReportShuffleReadMetricResponse.fromProto(response);
+  }
+
   @Override
   public boolean isClosed() {
     return channel.isShutdown() || channel.isTerminated();
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
new file mode 100644
index 000000000..b5f4a6624
--- /dev/null
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.request;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.uniffle.proto.RssProtos;
+
+public class RssReportShuffleReadMetricRequest {
+  private int stageId;
+  private int shuffleId;
+  private long taskId;
+  private Map<String, TaskShuffleReadMetric> metrics;
+
+  public RssReportShuffleReadMetricRequest(
+      int stageId, int shuffleId, long taskId, Map<String, 
TaskShuffleReadMetric> metrics) {
+    this.stageId = stageId;
+    this.shuffleId = shuffleId;
+    this.taskId = taskId;
+    this.metrics = metrics;
+  }
+
+  public RssProtos.ReportShuffleReadMetricRequest toProto() {
+    RssReportShuffleReadMetricRequest request = this;
+    RssProtos.ReportShuffleReadMetricRequest.Builder builder =
+        RssProtos.ReportShuffleReadMetricRequest.newBuilder();
+    builder
+        .setShuffleId(request.shuffleId)
+        .setStageId(request.stageId)
+        .setTaskId(request.taskId)
+        .putAllMetrics(
+            request.metrics.entrySet().stream()
+                .collect(
+                    Collectors.toMap(
+                        Map.Entry::getKey,
+                        x ->
+                            RssProtos.ShuffleReadMetric.newBuilder()
+                                .setByteSize(x.getValue().getByteSize())
+                                
.setDurationMillis(x.getValue().getDurationMillis())
+                                .build())));
+    return builder.build();
+  }
+
+  public static class TaskShuffleReadMetric {
+    private long durationMillis;
+    private long byteSize;
+
+    TaskShuffleReadMetric(long durationMillis, long byteSize) {
+      this.durationMillis = durationMillis;
+      this.byteSize = byteSize;
+    }
+
+    public long getDurationMillis() {
+      return durationMillis;
+    }
+
+    public long getByteSize() {
+      return byteSize;
+    }
+  }
+}
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
new file mode 100644
index 000000000..1c0f75235
--- /dev/null
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.request;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.uniffle.proto.RssProtos;
+
+public class RssReportShuffleWriteMetricRequest {
+  private int stageId;
+  private int shuffleId;
+  private long taskId;
+  private Map<String, TaskShuffleWriteMetric> metrics;
+
+  public RssReportShuffleWriteMetricRequest(
+      int stageId, int shuffleId, long taskId, Map<String, 
TaskShuffleWriteMetric> metrics) {
+    this.stageId = stageId;
+    this.shuffleId = shuffleId;
+    this.taskId = taskId;
+    this.metrics = metrics;
+  }
+
+  public RssProtos.ReportShuffleWriteMetricRequest toProto() {
+    RssReportShuffleWriteMetricRequest request = this;
+    RssProtos.ReportShuffleWriteMetricRequest.Builder builder =
+        RssProtos.ReportShuffleWriteMetricRequest.newBuilder();
+    builder
+        .setShuffleId(request.shuffleId)
+        .setStageId(request.stageId)
+        .setTaskId(request.taskId)
+        .putAllMetrics(
+            request.metrics.entrySet().stream()
+                .collect(
+                    Collectors.toMap(
+                        Map.Entry::getKey,
+                        x ->
+                            RssProtos.ShuffleWriteMetric.newBuilder()
+                                .setByteSize(x.getValue().getByteSize())
+                                
.setDurationMillis(x.getValue().getDurationMillis())
+                                .build())));
+    return builder.build();
+  }
+
+  public static class TaskShuffleWriteMetric {
+    private long durationMillis;
+    private long byteSize;
+
+    public TaskShuffleWriteMetric(long durationMillis, long byteSize) {
+      this.durationMillis = durationMillis;
+      this.byteSize = byteSize;
+    }
+
+    public long getDurationMillis() {
+      return durationMillis;
+    }
+
+    public long getByteSize() {
+      return byteSize;
+    }
+  }
+}
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleReadMetricResponse.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleReadMetricResponse.java
new file mode 100644
index 000000000..976f8863f
--- /dev/null
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleReadMetricResponse.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.response;
+
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.proto.RssProtos;
+
+public class RssReportShuffleReadMetricResponse extends ClientResponse {
+
+  public RssReportShuffleReadMetricResponse(StatusCode statusCode) {
+    super(statusCode);
+  }
+
+  public static RssReportShuffleReadMetricResponse fromProto(
+      RssProtos.ReportShuffleReadMetricResponse rpcResponse) {
+    return new 
RssReportShuffleReadMetricResponse(StatusCode.fromProto(rpcResponse.getStatus()));
+  }
+}
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleWriteMetricResponse.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleWriteMetricResponse.java
new file mode 100644
index 000000000..437042b8b
--- /dev/null
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReportShuffleWriteMetricResponse.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.response;
+
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.proto.RssProtos;
+
+public class RssReportShuffleWriteMetricResponse extends ClientResponse {
+
+  public RssReportShuffleWriteMetricResponse(StatusCode statusCode) {
+    super(statusCode);
+  }
+
+  public static RssReportShuffleWriteMetricResponse fromProto(
+      RssProtos.ReportShuffleWriteMetricResponse rpcResponse) {
+    return new 
RssReportShuffleWriteMetricResponse(StatusCode.fromProto(rpcResponse.getStatus()));
+  }
+}
diff --git a/pom.xml b/pom.xml
index 9d96c71ab..284a7f23c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -107,6 +107,7 @@
     <skipBuildImage>true</skipBuildImage>
     <snakeyaml.version>2.2</snakeyaml.version>
     <kryo.version>4.0.2</kryo.version>
+    <scala.version>2.12.18</scala.version>
   </properties>
 
   <repositories>
@@ -1476,6 +1477,7 @@
         <module>client-spark/spark3-shaded</module>
         <module>integration-test/spark-common</module>
         <module>integration-test/spark3</module>
+        <module>client-spark/extension</module>
       </modules>
       <dependencyManagement>
         <dependencies>
@@ -1519,6 +1521,11 @@
             <artifactId>rss-client-spark-common</artifactId>
             <version>${project.version}</version>
           </dependency>
+          <dependency>
+            <groupId>org.apache.uniffle</groupId>
+            <artifactId>rss-client-spark-ui</artifactId>
+            <version>${project.version}</version>
+          </dependency>
           <dependency>
             <groupId>org.apache.uniffle</groupId>
             <artifactId>rss-client-spark-common</artifactId>
@@ -1557,6 +1564,7 @@
         <module>client-spark/spark3-shaded</module>
         <module>integration-test/spark-common</module>
         <module>integration-test/spark3</module>
+        <module>client-spark/extension</module>
       </modules>
       <dependencyManagement>
         <dependencies>
@@ -1608,6 +1616,11 @@
             <artifactId>rss-client-spark-common</artifactId>
             <version>${project.version}</version>
           </dependency>
+          <dependency>
+            <groupId>org.apache.uniffle</groupId>
+            <artifactId>rss-client-spark-ui</artifactId>
+            <version>${project.version}</version>
+          </dependency>
           <dependency>
             <groupId>org.apache.uniffle</groupId>
             <artifactId>rss-client-spark-common</artifactId>
@@ -1647,6 +1660,7 @@
         <module>client-spark/spark3-shaded</module>
         <module>integration-test/spark-common</module>
         <module>integration-test/spark3</module>
+        <module>client-spark/extension</module>
       </modules>
       <dependencyManagement>
         <dependencies>
@@ -1698,6 +1712,11 @@
             <artifactId>rss-client-spark-common</artifactId>
             <version>${project.version}</version>
           </dependency>
+          <dependency>
+            <groupId>org.apache.uniffle</groupId>
+            <artifactId>rss-client-spark-ui</artifactId>
+            <version>${project.version}</version>
+          </dependency>
           <dependency>
             <groupId>org.apache.uniffle</groupId>
             <artifactId>rss-client-spark-common</artifactId>
@@ -1739,6 +1758,7 @@
         <module>client-spark/spark3-shaded</module>
         <module>integration-test/spark-common</module>
         <module>integration-test/spark3</module>
+        <module>client-spark/extension</module>
       </modules>
       <dependencyManagement>
         <dependencies>
@@ -1790,6 +1810,11 @@
             <artifactId>rss-client-spark-common</artifactId>
             <version>${project.version}</version>
           </dependency>
+          <dependency>
+            <groupId>org.apache.uniffle</groupId>
+            <artifactId>rss-client-spark-ui</artifactId>
+            <version>${project.version}</version>
+          </dependency>
           <dependency>
             <groupId>org.apache.uniffle</groupId>
             <artifactId>rss-client-spark-common</artifactId>
@@ -1844,6 +1869,7 @@
         <module>client-spark/spark3-shaded</module>
         <module>integration-test/spark-common</module>
         <module>integration-test/spark3</module>
+        <module>client-spark/extension</module>
       </modules>
       <dependencyManagement>
         <dependencies>
@@ -1908,7 +1934,11 @@
             <type>test-jar</type>
             <scope>test</scope>
           </dependency>
-
+          <dependency>
+            <groupId>org.apache.uniffle</groupId>
+            <artifactId>rss-client-spark-ui</artifactId>
+            <version>${project.version}</version>
+          </dependency>
           <dependency>
             <groupId>org.apache.uniffle</groupId>
             <artifactId>rss-integration-spark-common-test</artifactId>
@@ -1947,6 +1977,7 @@
         <module>client-spark/spark3-shaded</module>
         <module>integration-test/spark-common</module>
         <module>integration-test/spark3</module>
+        <module>client-spark/extension</module>
       </modules>
       <dependencyManagement>
         <dependencies>
@@ -1998,6 +2029,11 @@
             <artifactId>rss-client-spark-common</artifactId>
             <version>${project.version}</version>
           </dependency>
+          <dependency>
+            <groupId>org.apache.uniffle</groupId>
+            <artifactId>rss-client-spark-ui</artifactId>
+            <version>${project.version}</version>
+          </dependency>
           <dependency>
             <groupId>org.apache.uniffle</groupId>
             <artifactId>rss-client-spark-common</artifactId>
@@ -2037,6 +2073,7 @@
         <module>client-spark/spark3-shaded</module>
         <module>integration-test/spark-common</module>
         <module>integration-test/spark3</module>
+        <module>client-spark/extension</module>
       </modules>
       <dependencyManagement>
         <dependencies>
@@ -2070,6 +2107,11 @@
             <artifactId>rss-client-spark-common</artifactId>
             <version>${project.version}</version>
           </dependency>
+          <dependency>
+            <groupId>org.apache.uniffle</groupId>
+            <artifactId>rss-client-spark-ui</artifactId>
+            <version>${project.version}</version>
+          </dependency>
           <dependency>
             <groupId>org.apache.uniffle</groupId>
             <artifactId>rss-client-spark-common</artifactId>
@@ -2128,6 +2170,7 @@
       <id>scala2.13</id>
       <properties>
         <scala.binary.version>2.13</scala.binary.version>
+        <scala.version>2.13.16</scala.version>
       </properties>
     </profile>
 
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 18213341b..e47c51e13 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -584,6 +584,41 @@ service ShuffleManager {
   rpc reportShuffleResult (ReportShuffleResultRequest) returns 
(ReportShuffleResultResponse);
   rpc getShuffleResult (GetShuffleResultRequest) returns 
(GetShuffleResultResponse);
   rpc getShuffleResultForMultiPart (GetShuffleResultForMultiPartRequest) 
returns (GetShuffleResultForMultiPartResponse);
+  // report task shuffle metrics
+  rpc reportShuffleWriteMetric (ReportShuffleWriteMetricRequest) returns 
(ReportShuffleWriteMetricResponse);
+  rpc reportShuffleReadMetric (ReportShuffleReadMetricRequest) returns 
(ReportShuffleReadMetricResponse);
+}
+
+message ReportShuffleWriteMetricRequest {
+  int32 shuffleId = 1;
+  int32 stageId = 2;
+  int64 taskId = 3;
+  map<string, ShuffleWriteMetric> metrics = 4;
+}
+
+message ShuffleWriteMetric {
+  int64 durationMillis = 1;
+  int64 byteSize = 2;
+}
+
+message ShuffleReadMetric {
+  int64 durationMillis = 1;
+  int64 byteSize = 2;
+}
+
+message ReportShuffleWriteMetricResponse {
+  StatusCode status = 1;
+}
+
+message ReportShuffleReadMetricRequest {
+  int32 shuffleId = 1;
+  int32 stageId = 2;
+  int64 taskId = 3;
+  map<string, ShuffleReadMetric> metrics = 4;
+}
+
+message ReportShuffleReadMetricResponse {
+  StatusCode status = 1;
 }
 
 message ReportShuffleFetchFailureRequest {

Reply via email to