This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 22be6287a [#2460] feat(spark3): Add reassign info into UI tab (#2501)
22be6287a is described below

commit 22be6287a9ed25bab46ff4708e392309627bcb93
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Jun 16 19:38:46 2025 +0800

    [#2460] feat(spark3): Add reassign info into UI tab (#2501)
    
    ### What changes were proposed in this pull request?
    
    Add reassign info into UI tab
    
    ### Why are the changes needed?
    
    I want to enable the partition split for all clusters spark jobs, this PR 
could make me better inspect validity
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Neen't
---
 .../shuffle/events/TaskReassignInfoEvent.java      | 45 ++++++++++++++++++++++
 .../shuffle/manager/RssShuffleManagerBase.java     | 25 +++++++++++-
 .../scala/org/apache/spark/UniffleListener.scala   |  9 ++++-
 .../org/apache/spark/UniffleStatusStore.scala      | 17 +++++++-
 .../scala/org/apache/spark/ui/ShufflePage.scala    | 21 ++++++++++
 5 files changed, 114 insertions(+), 3 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskReassignInfoEvent.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskReassignInfoEvent.java
new file mode 100644
index 000000000..2000d6a88
--- /dev/null
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskReassignInfoEvent.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.events;
+
+public class TaskReassignInfoEvent extends UniffleEvent {
+  private boolean reassignTriggeredOnPartitionSplit;
+  private boolean reassignTriggeredOnBlockSendFailure;
+  private boolean reassignTriggeredOnStageRetry;
+
+  public TaskReassignInfoEvent(
+      boolean reassignTriggeredOnPartitionSplit,
+      boolean reassignTriggeredOnBlockSendFailure,
+      boolean reassignTriggeredOnStageRetry) {
+    this.reassignTriggeredOnPartitionSplit = reassignTriggeredOnPartitionSplit;
+    this.reassignTriggeredOnBlockSendFailure = 
reassignTriggeredOnBlockSendFailure;
+    this.reassignTriggeredOnStageRetry = reassignTriggeredOnStageRetry;
+  }
+
+  public boolean isReassignTriggeredOnPartitionSplit() {
+    return reassignTriggeredOnPartitionSplit;
+  }
+
+  public boolean isReassignTriggeredOnBlockSendFailure() {
+    return reassignTriggeredOnBlockSendFailure;
+  }
+
+  public boolean isReassignTriggeredOnStageRetry() {
+    return reassignTriggeredOnStageRetry;
+  }
+}
diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index 046a8b935..cfa364bef 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -59,6 +59,7 @@ import org.apache.spark.shuffle.RssStageResubmitManager;
 import org.apache.spark.shuffle.ShuffleHandleInfoManager;
 import org.apache.spark.shuffle.ShuffleManager;
 import org.apache.spark.shuffle.SparkVersionUtils;
+import org.apache.spark.shuffle.events.TaskReassignInfoEvent;
 import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
 import org.apache.spark.shuffle.handle.ShuffleHandleInfo;
 import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo;
@@ -177,10 +178,17 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
   private int partitionSplitLoadBalanceServerNum;
   protected PartitionSplitMode partitionSplitMode;
 
+  private AtomicBoolean reassignTriggeredOnPartitionSplit = new 
AtomicBoolean(false);
+  private AtomicBoolean reassignTriggeredOnBlockSendFailure = new 
AtomicBoolean(false);
+  private AtomicBoolean reassignTriggeredOnStageRetry = new 
AtomicBoolean(false);
+
+  private boolean isDriver = false;
+
   public RssShuffleManagerBase(SparkConf conf, boolean isDriver) {
     LOG.info(
         "Uniffle {} version: {}", this.getClass().getName(), 
Constants.VERSION_AND_REVISION_SHORT);
     this.sparkConf = conf;
+    this.isDriver = isDriver;
     checkSupported(sparkConf);
     boolean supportsRelocation =
         Optional.ofNullable(SparkEnv.get())
@@ -994,6 +1002,7 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
         "The stage retry has been triggered successfully for the shuffleId: 
{}, attemptNumber: {}",
         shuffleId,
         stageAttemptNumber);
+    this.reassignTriggeredOnStageRetry.set(true);
     return true;
   }
 
@@ -1117,7 +1126,11 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
           System.currentTimeMillis() - startTime,
           partitionSplit,
           reassignResult);
-
+      if (partitionSplit) {
+        this.reassignTriggeredOnPartitionSplit.set(true);
+      } else {
+        this.reassignTriggeredOnBlockSendFailure.set(true);
+      }
       return internalHandle;
     }
   }
@@ -1160,6 +1173,16 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
 
   @Override
   public void stop() {
+    if (this.isDriver && partitionReassignEnabled) {
+      // send reassign event into spark event store
+      TaskReassignInfoEvent reassignInfoEvent =
+          new TaskReassignInfoEvent(
+              reassignTriggeredOnPartitionSplit.get(),
+              reassignTriggeredOnBlockSendFailure.get(),
+              reassignTriggeredOnStageRetry.get());
+      
RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(reassignInfoEvent);
+    }
+
     if (managerClientSupplier != null
         && managerClientSupplier instanceof ExpiringCloseableSupplier) {
       ((ExpiringCloseableSupplier<ShuffleManagerClient>) 
managerClientSupplier).close();
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
index ae974982b..7175257ae 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
@@ -20,7 +20,7 @@ package org.apache.spark
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, 
SparkListenerJobEnd, SparkListenerJobStart, SparkListenerTaskEnd}
-import org.apache.spark.shuffle.events.{ShuffleAssignmentInfoEvent, 
ShuffleWriteTimes, TaskShuffleReadInfoEvent, TaskShuffleWriteInfoEvent}
+import org.apache.spark.shuffle.events.{ShuffleAssignmentInfoEvent, 
ShuffleWriteTimes, TaskReassignInfoEvent, TaskShuffleReadInfoEvent, 
TaskShuffleWriteInfoEvent}
 import org.apache.spark.status.ElementTrackingStore
 
 import java.util.concurrent.ConcurrentHashMap
@@ -137,8 +137,15 @@ class UniffleListener(conf: SparkConf, kvstore: 
ElementTrackingStore)
     case e: TaskShuffleWriteInfoEvent => onTaskShuffleWriteInfo(e)
     case e: TaskShuffleReadInfoEvent => onTaskShuffleReadInfo(e)
     case e: ShuffleAssignmentInfoEvent => onAssignmentInfo(e)
+    case e: TaskReassignInfoEvent => onReassignInfo(e)
     case _ => // Ignore
   }
+
+  private def onReassignInfo(event: TaskReassignInfoEvent) = {
+    kvstore.write(
+      ReassignInfoUIData(event)
+    )
+  }
 }
 
 object UniffleListener {
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
index 5176048d4..2eabbd06a 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
@@ -18,7 +18,7 @@
 package org.apache.spark
 
 import com.fasterxml.jackson.annotation.JsonIgnore
-import org.apache.spark.shuffle.events.ShuffleWriteTimes
+import org.apache.spark.shuffle.events.{ShuffleWriteTimes, 
TaskReassignInfoEvent}
 import org.apache.spark.status.KVUtils.KVIndexParam
 import org.apache.spark.util.Utils
 import org.apache.spark.util.kvstore.{KVIndex, KVStore, KVStoreView}
@@ -90,6 +90,15 @@ class UniffleStatusStore(store: KVStore) {
       case _: Exception => AggregatedTaskInfoUIData(0, 0, 0, 0)
     }
   }
+
+  def reassignInfo(): ReassignInfoUIData = {
+    val kClass = classOf[ReassignInfoUIData]
+    try {
+      store.read(kClass, kClass.getName)
+    } catch {
+      case _: Exception => ReassignInfoUIData(new TaskReassignInfoEvent(false, 
false, false))
+    }
+  }
 }
 
 class UniffleProperties(val info: Seq[(String, String)]) {
@@ -152,4 +161,10 @@ case class AggregatedShuffleWriteTimesUIData(times: 
ShuffleWriteTimes) {
   @JsonIgnore
   @KVIndex
   def id: String = classOf[AggregatedShuffleWriteTimesUIData].getName()
+}
+
+case class ReassignInfoUIData(event: TaskReassignInfoEvent) {
+  @JsonIgnore
+  @KVIndex
+  def id: String = classOf[ReassignInfoUIData].getName()
 }
\ No newline at end of file
diff --git 
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala 
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
index d53e0e8d0..4c50888a1 100644
--- 
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
+++ 
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
@@ -120,6 +120,9 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("") 
with Logging {
     val uniffleWriteAvgSpeed = 
calculateSpeed(originWriteMetric.metrics.values().asScala.toSeq)
     val uniffleReadAvgSpeed = 
calculateSpeed(originReadMetric.metrics.values().asScala.toSeq)
 
+    // reassign info
+    val reassignInfo = runtimeStatusStore.reassignInfo().event
+
     // render build info
     val buildInfo = runtimeStatusStore.buildInfo()
     val buildInfoTableUI = UIUtils.listingTable(
@@ -282,6 +285,24 @@ class ShufflePage(parent: ShuffleTab) extends 
WebUIPage("") with Logging {
               </a>
               {uniffleWriteAvgSpeed} / {uniffleReadAvgSpeed}
             </li>
+            <li>
+              <a>
+                <strong>ReassignTriggeredOnPartitionSplit: </strong>
+              </a>
+              {reassignInfo.isReassignTriggeredOnPartitionSplit}
+            </li>
+            <li>
+              <a>
+                <strong>ReassignTriggeredOnBlockSendFailure: </strong>
+              </a>
+              {reassignInfo.isReassignTriggeredOnBlockSendFailure}
+            </li>
+            <li>
+              <a>
+                <strong>ReassignTriggeredOnStageRetry: </strong>
+              </a>
+              {reassignInfo.isReassignTriggeredOnStageRetry}
+            </li>
           </ul>
         </div>
 

Reply via email to