This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 22be6287a [#2460] feat(spark3): Add reassign info into UI tab (#2501)
22be6287a is described below
commit 22be6287a9ed25bab46ff4708e392309627bcb93
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Jun 16 19:38:46 2025 +0800
[#2460] feat(spark3): Add reassign info into UI tab (#2501)
### What changes were proposed in this pull request?
Add reassign info into UI tab
### Why are the changes needed?
I want to enable the partition split for all clusters spark jobs, this PR
could make me better inspect validity
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Neen't
---
.../shuffle/events/TaskReassignInfoEvent.java | 45 ++++++++++++++++++++++
.../shuffle/manager/RssShuffleManagerBase.java | 25 +++++++++++-
.../scala/org/apache/spark/UniffleListener.scala | 9 ++++-
.../org/apache/spark/UniffleStatusStore.scala | 17 +++++++-
.../scala/org/apache/spark/ui/ShufflePage.scala | 21 ++++++++++
5 files changed, 114 insertions(+), 3 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskReassignInfoEvent.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskReassignInfoEvent.java
new file mode 100644
index 000000000..2000d6a88
--- /dev/null
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskReassignInfoEvent.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.events;
+
+public class TaskReassignInfoEvent extends UniffleEvent {
+ private boolean reassignTriggeredOnPartitionSplit;
+ private boolean reassignTriggeredOnBlockSendFailure;
+ private boolean reassignTriggeredOnStageRetry;
+
+ public TaskReassignInfoEvent(
+ boolean reassignTriggeredOnPartitionSplit,
+ boolean reassignTriggeredOnBlockSendFailure,
+ boolean reassignTriggeredOnStageRetry) {
+ this.reassignTriggeredOnPartitionSplit = reassignTriggeredOnPartitionSplit;
+ this.reassignTriggeredOnBlockSendFailure =
reassignTriggeredOnBlockSendFailure;
+ this.reassignTriggeredOnStageRetry = reassignTriggeredOnStageRetry;
+ }
+
+ public boolean isReassignTriggeredOnPartitionSplit() {
+ return reassignTriggeredOnPartitionSplit;
+ }
+
+ public boolean isReassignTriggeredOnBlockSendFailure() {
+ return reassignTriggeredOnBlockSendFailure;
+ }
+
+ public boolean isReassignTriggeredOnStageRetry() {
+ return reassignTriggeredOnStageRetry;
+ }
+}
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index 046a8b935..cfa364bef 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -59,6 +59,7 @@ import org.apache.spark.shuffle.RssStageResubmitManager;
import org.apache.spark.shuffle.ShuffleHandleInfoManager;
import org.apache.spark.shuffle.ShuffleManager;
import org.apache.spark.shuffle.SparkVersionUtils;
+import org.apache.spark.shuffle.events.TaskReassignInfoEvent;
import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
import org.apache.spark.shuffle.handle.ShuffleHandleInfo;
import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo;
@@ -177,10 +178,17 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
private int partitionSplitLoadBalanceServerNum;
protected PartitionSplitMode partitionSplitMode;
+ private AtomicBoolean reassignTriggeredOnPartitionSplit = new
AtomicBoolean(false);
+ private AtomicBoolean reassignTriggeredOnBlockSendFailure = new
AtomicBoolean(false);
+ private AtomicBoolean reassignTriggeredOnStageRetry = new
AtomicBoolean(false);
+
+ private boolean isDriver = false;
+
public RssShuffleManagerBase(SparkConf conf, boolean isDriver) {
LOG.info(
"Uniffle {} version: {}", this.getClass().getName(),
Constants.VERSION_AND_REVISION_SHORT);
this.sparkConf = conf;
+ this.isDriver = isDriver;
checkSupported(sparkConf);
boolean supportsRelocation =
Optional.ofNullable(SparkEnv.get())
@@ -994,6 +1002,7 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
"The stage retry has been triggered successfully for the shuffleId:
{}, attemptNumber: {}",
shuffleId,
stageAttemptNumber);
+ this.reassignTriggeredOnStageRetry.set(true);
return true;
}
@@ -1117,7 +1126,11 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
System.currentTimeMillis() - startTime,
partitionSplit,
reassignResult);
-
+ if (partitionSplit) {
+ this.reassignTriggeredOnPartitionSplit.set(true);
+ } else {
+ this.reassignTriggeredOnBlockSendFailure.set(true);
+ }
return internalHandle;
}
}
@@ -1160,6 +1173,16 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
@Override
public void stop() {
+ if (this.isDriver && partitionReassignEnabled) {
+ // send reassign event into spark event store
+ TaskReassignInfoEvent reassignInfoEvent =
+ new TaskReassignInfoEvent(
+ reassignTriggeredOnPartitionSplit.get(),
+ reassignTriggeredOnBlockSendFailure.get(),
+ reassignTriggeredOnStageRetry.get());
+
RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(reassignInfoEvent);
+ }
+
if (managerClientSupplier != null
&& managerClientSupplier instanceof ExpiringCloseableSupplier) {
((ExpiringCloseableSupplier<ShuffleManagerClient>)
managerClientSupplier).close();
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
index ae974982b..7175257ae 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala
@@ -20,7 +20,7 @@ package org.apache.spark
import org.apache.commons.lang3.StringUtils
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent,
SparkListenerJobEnd, SparkListenerJobStart, SparkListenerTaskEnd}
-import org.apache.spark.shuffle.events.{ShuffleAssignmentInfoEvent,
ShuffleWriteTimes, TaskShuffleReadInfoEvent, TaskShuffleWriteInfoEvent}
+import org.apache.spark.shuffle.events.{ShuffleAssignmentInfoEvent,
ShuffleWriteTimes, TaskReassignInfoEvent, TaskShuffleReadInfoEvent,
TaskShuffleWriteInfoEvent}
import org.apache.spark.status.ElementTrackingStore
import java.util.concurrent.ConcurrentHashMap
@@ -137,8 +137,15 @@ class UniffleListener(conf: SparkConf, kvstore:
ElementTrackingStore)
case e: TaskShuffleWriteInfoEvent => onTaskShuffleWriteInfo(e)
case e: TaskShuffleReadInfoEvent => onTaskShuffleReadInfo(e)
case e: ShuffleAssignmentInfoEvent => onAssignmentInfo(e)
+ case e: TaskReassignInfoEvent => onReassignInfo(e)
case _ => // Ignore
}
+
+ private def onReassignInfo(event: TaskReassignInfoEvent) = {
+ kvstore.write(
+ ReassignInfoUIData(event)
+ )
+ }
}
object UniffleListener {
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
index 5176048d4..2eabbd06a 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala
@@ -18,7 +18,7 @@
package org.apache.spark
import com.fasterxml.jackson.annotation.JsonIgnore
-import org.apache.spark.shuffle.events.ShuffleWriteTimes
+import org.apache.spark.shuffle.events.{ShuffleWriteTimes,
TaskReassignInfoEvent}
import org.apache.spark.status.KVUtils.KVIndexParam
import org.apache.spark.util.Utils
import org.apache.spark.util.kvstore.{KVIndex, KVStore, KVStoreView}
@@ -90,6 +90,15 @@ class UniffleStatusStore(store: KVStore) {
case _: Exception => AggregatedTaskInfoUIData(0, 0, 0, 0)
}
}
+
+ def reassignInfo(): ReassignInfoUIData = {
+ val kClass = classOf[ReassignInfoUIData]
+ try {
+ store.read(kClass, kClass.getName)
+ } catch {
+ case _: Exception => ReassignInfoUIData(new TaskReassignInfoEvent(false,
false, false))
+ }
+ }
}
class UniffleProperties(val info: Seq[(String, String)]) {
@@ -152,4 +161,10 @@ case class AggregatedShuffleWriteTimesUIData(times:
ShuffleWriteTimes) {
@JsonIgnore
@KVIndex
def id: String = classOf[AggregatedShuffleWriteTimesUIData].getName()
+}
+
+case class ReassignInfoUIData(event: TaskReassignInfoEvent) {
+ @JsonIgnore
+ @KVIndex
+ def id: String = classOf[ReassignInfoUIData].getName()
}
\ No newline at end of file
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
index d53e0e8d0..4c50888a1 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
@@ -120,6 +120,9 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("")
with Logging {
val uniffleWriteAvgSpeed =
calculateSpeed(originWriteMetric.metrics.values().asScala.toSeq)
val uniffleReadAvgSpeed =
calculateSpeed(originReadMetric.metrics.values().asScala.toSeq)
+ // reassign info
+ val reassignInfo = runtimeStatusStore.reassignInfo().event
+
// render build info
val buildInfo = runtimeStatusStore.buildInfo()
val buildInfoTableUI = UIUtils.listingTable(
@@ -282,6 +285,24 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
</a>
{uniffleWriteAvgSpeed} / {uniffleReadAvgSpeed}
</li>
+ <li>
+ <a>
+ <strong>ReassignTriggeredOnPartitionSplit: </strong>
+ </a>
+ {reassignInfo.isReassignTriggeredOnPartitionSplit}
+ </li>
+ <li>
+ <a>
+ <strong>ReassignTriggeredOnBlockSendFailure: </strong>
+ </a>
+ {reassignInfo.isReassignTriggeredOnBlockSendFailure}
+ </li>
+ <li>
+ <a>
+ <strong>ReassignTriggeredOnStageRetry: </strong>
+ </a>
+ {reassignInfo.isReassignTriggeredOnStageRetry}
+ </li>
</ul>
</div>