This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 6ad3aa0f3 [#2618] fix(spark): Invalid reassign status show in spark UI
tab (#2620)
6ad3aa0f3 is described below
commit 6ad3aa0f34c7d58ad3bf04697d9c738b0ec4df71
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Sep 22 16:56:36 2025 +0800
[#2618] fix(spark): Invalid reassign status show in spark UI tab (#2620)
### What changes were proposed in this pull request?
Fix the invalid reassign status show in spark UI tab
### Why are the changes needed?
In the original design, the reassign info event is sent in the final stop
method. However, because the post operation runs asynchronously, the listener
may not receive the event before the JVM shuts down, so the event is
effectively dropped.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal spark job test
---
.../shuffle/manager/RssShuffleManagerBase.java | 31 +++++++++++-----------
.../scala/org/apache/spark/ui/ShufflePage.scala | 16 ++---------
2 files changed, 17 insertions(+), 30 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index e9994aa35..45615898e 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -1024,7 +1024,7 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
"The stage retry has been triggered successfully for the shuffleId:
{}, attemptNumber: {}",
shuffleId,
stageAttemptNumber);
- this.reassignTriggeredOnStageRetry.set(true);
+ postReassignTriggeredEvent(reassignTriggeredOnStageRetry);
return true;
}
@@ -1148,15 +1148,24 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
System.currentTimeMillis() - startTime,
partitionSplit,
reassignResult);
- if (partitionSplit) {
- this.reassignTriggeredOnPartitionSplit.set(true);
- } else {
- this.reassignTriggeredOnBlockSendFailure.set(true);
- }
+ postReassignTriggeredEvent(
+ partitionSplit ? reassignTriggeredOnPartitionSplit :
reassignTriggeredOnBlockSendFailure);
return internalHandle;
}
}
+ /** This method will check the historical state to avoid posting duplicate
events */
+ private void postReassignTriggeredEvent(AtomicBoolean isReassign) {
+ if (isReassign.compareAndSet(false, true)) {
+ TaskReassignInfoEvent reassignInfoEvent =
+ new TaskReassignInfoEvent(
+ reassignTriggeredOnPartitionSplit.get(),
+ reassignTriggeredOnBlockSendFailure.get(),
+ reassignTriggeredOnStageRetry.get());
+
RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(reassignInfoEvent);
+ }
+ }
+
private Set<ShuffleServerInfo> requestReassignServer(
int stageId,
int stageAttemptNumber,
@@ -1195,16 +1204,6 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
@Override
public void stop() {
- if (this.isDriver && partitionReassignEnabled) {
- // send reassign event into spark event store
- TaskReassignInfoEvent reassignInfoEvent =
- new TaskReassignInfoEvent(
- reassignTriggeredOnPartitionSplit.get(),
- reassignTriggeredOnBlockSendFailure.get(),
- reassignTriggeredOnStageRetry.get());
-
RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(reassignInfoEvent);
- }
-
if (managerClientSupplier != null
&& managerClientSupplier instanceof ExpiringCloseableSupplier) {
((ExpiringCloseableSupplier<ShuffleManagerClient>)
managerClientSupplier).close();
diff --git
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
index 3fadde1e9..8916916a3 100644
---
a/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
+++
b/client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala
@@ -334,21 +334,9 @@ class ShufflePage(parent: ShuffleTab) extends
WebUIPage("") with Logging {
</li>
<li>
<a>
- <strong>ReassignTriggeredOnPartitionSplit: </strong>
+ <strong>Reassign Status:</strong>
</a>
- {reassignInfo.isReassignTriggeredOnPartitionSplit}
- </li>
- <li>
- <a>
- <strong>ReassignTriggeredOnBlockSendFailure: </strong>
- </a>
- {reassignInfo.isReassignTriggeredOnBlockSendFailure}
- </li>
- <li>
- <a>
- <strong>ReassignTriggeredOnStageRetry: </strong>
- </a>
- {reassignInfo.isReassignTriggeredOnStageRetry}
+
partitionSplit={reassignInfo.isReassignTriggeredOnPartitionSplit},
blockSentFailure={reassignInfo.isReassignTriggeredOnBlockSendFailure},
stageRetry={reassignInfo.isReassignTriggeredOnStageRetry}
</li>
</ul>
</div>