This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 0552d3b0a [#1844] fix(spark): Reassign shuffle servers when retrying
stage (#1845)
0552d3b0a is described below
commit 0552d3b0a6bae24b84e30ca17ed0b40ebfd28401
Author: yl09099 <[email protected]>
AuthorDate: Mon Dec 9 15:13:55 2024 +0800
[#1844] fix(spark): Reassign shuffle servers when retrying stage (#1845)
### What changes were proposed in this pull request?
If the Shuffle Server is not reassigned after the Retry is triggered at the
Stage, data will be lost. Therefore, reassign the Shuffle Server after the
Retry.
question:
Error: Failures: Error:
RSSStageDynamicServerReWriteTest.testRSSStageResubmit:119-SparkIntegrationTestBase.run:64->SparkIntegrationTestBase.verifyTestResult:149
expected: <1000> but was: <970>.
### Why are the changes needed?
Fix: #1844
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Presence test.
---
.../spark/shuffle/RssStageResubmitManager.java | 23 +-
.../spark/shuffle/writer/WriteBufferManager.java | 41 +++-
.../org/apache/uniffle/shuffle/BlockIdManager.java | 3 +-
.../BlockIdSelfManagedShuffleWriteClient.java | 25 +++
.../shuffle/manager/RssShuffleManagerBase.java | 136 ++++++------
.../manager/RssShuffleManagerInterface.java | 10 +-
.../shuffle/manager/ShuffleManagerGrpcService.java | 245 ++++++++++++++-------
.../shuffle/writer/WriteBufferManagerTest.java | 12 +-
.../shuffle/manager/DummyRssShuffleManager.java | 8 +-
.../apache/spark/shuffle/RssShuffleManager.java | 19 +-
.../spark/shuffle/writer/RssShuffleWriter.java | 51 +++--
.../spark/shuffle/writer/RssShuffleWriterTest.java | 3 +-
.../apache/spark/shuffle/RssShuffleManager.java | 29 ++-
.../spark/shuffle/writer/RssShuffleWriter.java | 58 +++--
.../spark/shuffle/writer/RssShuffleWriterTest.java | 3 +-
.../uniffle/client/api/ShuffleWriteClient.java | 9 +
.../client/impl/ShuffleWriteClientImpl.java | 34 +++
.../test/RemoteMergeShuffleWithRssClientTest.java | 8 +-
.../test/RSSStageDynamicServerReWriteTest.java | 12 +-
.../apache/uniffle/test/RSSStageResubmitTest.java | 4 +-
.../uniffle/test/SparkIntegrationTestBase.java | 19 ++
.../uniffle/client/api/ShuffleManagerClient.java | 4 -
.../client/impl/grpc/ShuffleManagerGrpcClient.java | 10 -
.../RssPartitionToShuffleServerRequest.java | 16 +-
.../client/request/RssReassignServersRequest.java | 45 ----
.../RssReportShuffleWriteFailureRequest.java | 4 +
.../response/RssReassignServersResponse.java | 43 ----
proto/src/main/proto/Rss.proto | 23 +-
.../uniffle/server/ShuffleServerGrpcService.java | 46 +++-
.../org/apache/uniffle/server/ShuffleTaskInfo.java | 2 +-
.../apache/uniffle/server/ShuffleTaskManager.java | 4 +-
.../server/block/DefaultShuffleBlockIdManager.java | 12 +
.../block/PartitionedShuffleBlockIdManager.java | 11 +
.../server/block/ShuffleBlockIdManager.java | 2 +
.../server/MockedShuffleServerGrpcService.java | 13 +-
35 files changed, 596 insertions(+), 391 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssStageResubmitManager.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssStageResubmitManager.java
index 028622f92..ff901487e 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssStageResubmitManager.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssStageResubmitManager.java
@@ -17,29 +17,21 @@
package org.apache.spark.shuffle;
-import java.util.Map;
import java.util.Set;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.uniffle.common.util.JavaUtils;
-
public class RssStageResubmitManager {
private static final Logger LOG =
LoggerFactory.getLogger(RssStageResubmitManager.class);
+
/** Blacklist of the Shuffle Server when the write fails. */
private Set<String> serverIdBlackList;
- /**
- * Prevent multiple tasks from reporting FetchFailed, resulting in multiple
ShuffleServer
- * assignments, stageID, Attemptnumber Whether to reassign the combination
flag;
- */
- private Map<Integer, RssStageInfo> serverAssignedInfos;
public RssStageResubmitManager() {
this.serverIdBlackList = Sets.newConcurrentHashSet();
- this.serverAssignedInfos = JavaUtils.newConcurrentMap();
}
public Set<String> getServerIdBlackList() {
@@ -53,17 +45,4 @@ public class RssStageResubmitManager {
public void recordFailuresShuffleServer(String shuffleServerId) {
serverIdBlackList.add(shuffleServerId);
}
-
- public RssStageInfo recordAndGetServerAssignedInfo(int shuffleId, String
stageIdAndAttempt) {
-
- return serverAssignedInfos.computeIfAbsent(
- shuffleId, id -> new RssStageInfo(stageIdAndAttempt, false));
- }
-
- public void recordAndGetServerAssignedInfo(
- int shuffleId, String stageIdAndAttempt, boolean isRetried) {
- serverAssignedInfos
- .computeIfAbsent(shuffleId, id -> new RssStageInfo(stageIdAndAttempt,
false))
- .setReassigned(isRetried);
- }
}
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index 4ee09a31c..e91441f1c 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -102,6 +102,7 @@ public class WriteBufferManager extends MemoryConsumer {
private BlockIdLayout blockIdLayout;
private double bufferSpillRatio;
private Function<Integer, List<ShuffleServerInfo>>
partitionAssignmentRetrieveFunc;
+ private int stageAttemptNumber;
public WriteBufferManager(
int shuffleId,
@@ -122,7 +123,8 @@ public class WriteBufferManager extends MemoryConsumer {
taskMemoryManager,
shuffleWriteMetrics,
rssConf,
- null);
+ null,
+ 0);
}
public WriteBufferManager(
@@ -136,6 +138,32 @@ public class WriteBufferManager extends MemoryConsumer {
RssConf rssConf,
Function<List<ShuffleBlockInfo>, List<CompletableFuture<Long>>>
spillFunc,
Function<Integer, List<ShuffleServerInfo>>
partitionAssignmentRetrieveFunc) {
+ this(
+ shuffleId,
+ taskId,
+ taskAttemptId,
+ bufferManagerOptions,
+ serializer,
+ taskMemoryManager,
+ shuffleWriteMetrics,
+ rssConf,
+ spillFunc,
+ partitionAssignmentRetrieveFunc,
+ 0);
+ }
+
+ public WriteBufferManager(
+ int shuffleId,
+ String taskId,
+ long taskAttemptId,
+ BufferManagerOptions bufferManagerOptions,
+ Serializer serializer,
+ TaskMemoryManager taskMemoryManager,
+ ShuffleWriteMetrics shuffleWriteMetrics,
+ RssConf rssConf,
+ Function<List<ShuffleBlockInfo>, List<CompletableFuture<Long>>>
spillFunc,
+ Function<Integer, List<ShuffleServerInfo>>
partitionAssignmentRetrieveFunc,
+ int stageAttemptNumber) {
super(taskMemoryManager, taskMemoryManager.pageSizeBytes(),
MemoryMode.ON_HEAP);
this.bufferSize = bufferManagerOptions.getBufferSize();
this.spillSize = bufferManagerOptions.getBufferSpillThreshold();
@@ -169,6 +197,7 @@ public class WriteBufferManager extends MemoryConsumer {
this.bufferSpillRatio = rssConf.get(RssSparkConfig.RSS_MEMORY_SPILL_RATIO);
this.blockIdLayout = BlockIdLayout.from(rssConf);
this.partitionAssignmentRetrieveFunc = partitionAssignmentRetrieveFunc;
+ this.stageAttemptNumber = stageAttemptNumber;
}
public WriteBufferManager(
@@ -181,7 +210,8 @@ public class WriteBufferManager extends MemoryConsumer {
TaskMemoryManager taskMemoryManager,
ShuffleWriteMetrics shuffleWriteMetrics,
RssConf rssConf,
- Function<List<ShuffleBlockInfo>, List<CompletableFuture<Long>>>
spillFunc) {
+ Function<List<ShuffleBlockInfo>, List<CompletableFuture<Long>>>
spillFunc,
+ int stageAttemptNumber) {
this(
shuffleId,
taskId,
@@ -192,7 +222,8 @@ public class WriteBufferManager extends MemoryConsumer {
shuffleWriteMetrics,
rssConf,
spillFunc,
- partitionId -> partitionToServers.get(partitionId));
+ partitionId -> partitionToServers.get(partitionId),
+ stageAttemptNumber);
}
/** add serialized columnar data directly when integrate with gluten */
@@ -492,7 +523,7 @@ public class WriteBufferManager extends MemoryConsumer {
+ totalSize
+ " bytes");
}
- events.add(new AddBlockEvent(taskId, shuffleBlockInfosPerEvent));
+ events.add(new AddBlockEvent(taskId, stageAttemptNumber,
shuffleBlockInfosPerEvent));
shuffleBlockInfosPerEvent = Lists.newArrayList();
totalSize = 0;
}
@@ -507,7 +538,7 @@ public class WriteBufferManager extends MemoryConsumer {
+ " bytes");
}
// Use final temporary variables for closures
- events.add(new AddBlockEvent(taskId, shuffleBlockInfosPerEvent));
+ events.add(new AddBlockEvent(taskId, stageAttemptNumber,
shuffleBlockInfosPerEvent));
}
return events;
}
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/BlockIdManager.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/BlockIdManager.java
index c75030e95..1d58c2355 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/BlockIdManager.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/BlockIdManager.java
@@ -40,11 +40,12 @@ public class BlockIdManager {
}
public void add(int shuffleId, int partitionId, List<Long> ids) {
+
if (CollectionUtils.isEmpty(ids)) {
return;
}
Map<Integer, Roaring64NavigableMap> partitionedBlockIds =
- blockIds.computeIfAbsent(shuffleId, (k) ->
JavaUtils.newConcurrentMap());
+ blockIds.computeIfAbsent(shuffleId, k -> JavaUtils.newConcurrentMap());
partitionedBlockIds.compute(
partitionId,
(id, bitmap) -> {
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/BlockIdSelfManagedShuffleWriteClient.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/BlockIdSelfManagedShuffleWriteClient.java
index 93aa3f0fc..a407442f3 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/BlockIdSelfManagedShuffleWriteClient.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/BlockIdSelfManagedShuffleWriteClient.java
@@ -77,6 +77,31 @@ public class BlockIdSelfManagedShuffleWriteClient extends
ShuffleWriteClientImpl
managerClientSupplier.get().reportShuffleResult(request);
}
+ @Override
+ public void reportShuffleResult(
+ Map<ShuffleServerInfo, Map<Integer, Set<Long>>>
serverToPartitionToBlockIds,
+ String appId,
+ int shuffleId,
+ long taskAttemptId,
+ int bitmapNum,
+ Set<ShuffleServerInfo> reportFailureServers,
+ boolean enableWriteFailureRetry) {
+ Map<Integer, List<Long>> partitionToBlockIds = new HashMap<>();
+ for (Map<Integer, Set<Long>> k : serverToPartitionToBlockIds.values()) {
+ for (Map.Entry<Integer, Set<Long>> entry : k.entrySet()) {
+ int partitionId = entry.getKey();
+ partitionToBlockIds
+ .computeIfAbsent(partitionId, x -> new ArrayList<>())
+ .addAll(entry.getValue());
+ }
+ }
+
+ RssReportShuffleResultRequest request =
+ new RssReportShuffleResultRequest(
+ appId, shuffleId, taskAttemptId, partitionToBlockIds, bitmapNum);
+ managerClientSupplier.get().reportShuffleResult(request);
+ }
+
@Override
public Roaring64NavigableMap getShuffleResult(
String clientType,
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index c1b697cbd..c1fc5b68e 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -34,6 +34,8 @@ import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import scala.Tuple2;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -48,7 +50,6 @@ import org.apache.spark.SparkException;
import org.apache.spark.shuffle.RssShuffleHandle;
import org.apache.spark.shuffle.RssSparkConfig;
import org.apache.spark.shuffle.RssSparkShuffleUtils;
-import org.apache.spark.shuffle.RssStageInfo;
import org.apache.spark.shuffle.RssStageResubmitManager;
import org.apache.spark.shuffle.ShuffleHandleInfoManager;
import org.apache.spark.shuffle.ShuffleManager;
@@ -107,6 +108,8 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
protected String clientType;
protected SparkConf sparkConf;
+ protected Map<Integer, Integer> shuffleIdToPartitionNum;
+ protected Map<Integer, Integer> shuffleIdToNumMapTasks;
protected Supplier<ShuffleManagerClient> managerClientSupplier;
protected boolean rssStageRetryEnabled;
protected boolean rssStageRetryForWriteFailureEnabled;
@@ -118,13 +121,9 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
protected ShuffleHandleInfoManager shuffleHandleInfoManager;
protected RssStageResubmitManager rssStageResubmitManager;
-
protected int partitionReassignMaxServerNum;
-
protected boolean blockIdSelfManagedEnabled;
-
protected boolean partitionReassignEnabled;
-
protected boolean shuffleManagerRpcServiceEnabled;
public RssShuffleManagerBase() {
@@ -571,14 +570,21 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), ""),
confItems);
}
- public ShuffleHandleInfo getShuffleHandleInfo(RssShuffleHandle<?, ?, ?>
rssHandle) {
+ public ShuffleHandleInfo getShuffleHandleInfo(
+ int stageAttemptId,
+ int stageAttemptNumber,
+ RssShuffleHandle<?, ?, ?> rssHandle,
+ boolean isWritePhase) {
int shuffleId = rssHandle.getShuffleId();
- if (shuffleManagerRpcServiceEnabled && rssStageRetryEnabled) {
+ if (shuffleManagerRpcServiceEnabled &&
rssStageRetryForWriteFailureEnabled) {
// In Stage Retry mode, Get the ShuffleServer list from the Driver based
on the shuffleId.
- return getRemoteShuffleHandleInfoWithStageRetry(shuffleId);
+ return getRemoteShuffleHandleInfoWithStageRetry(
+ stageAttemptId, stageAttemptNumber, shuffleId, isWritePhase);
} else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
- // In Stage Retry mode, Get the ShuffleServer list from the Driver based
on the shuffleId.
- return getRemoteShuffleHandleInfoWithBlockRetry(shuffleId);
+ // In partition block Retry mode, Get the ShuffleServer list from the
Driver based on the
+ // shuffleId.
+ return getRemoteShuffleHandleInfoWithBlockRetry(
+ stageAttemptId, stageAttemptNumber, shuffleId, isWritePhase);
} else {
return new SimpleShuffleHandleInfo(
shuffleId, rssHandle.getPartitionToServers(),
rssHandle.getRemoteStorage());
@@ -592,9 +598,10 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
* @return ShuffleHandleInfo
*/
protected synchronized StageAttemptShuffleHandleInfo
getRemoteShuffleHandleInfoWithStageRetry(
- int shuffleId) {
+ int stageAttemptId, int stageAttemptNumber, int shuffleId, boolean
isWritePhase) {
RssPartitionToShuffleServerRequest rssPartitionToShuffleServerRequest =
- new RssPartitionToShuffleServerRequest(shuffleId);
+ new RssPartitionToShuffleServerRequest(
+ stageAttemptId, stageAttemptNumber, shuffleId, isWritePhase);
RssReassignOnStageRetryResponse rpcPartitionToShufflerServer =
getOrCreateShuffleManagerClientSupplier()
.get()
@@ -612,9 +619,10 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
* @return ShuffleHandleInfo
*/
protected synchronized MutableShuffleHandleInfo
getRemoteShuffleHandleInfoWithBlockRetry(
- int shuffleId) {
+ int stageAttemptId, int stageAttemptNumber, int shuffleId, boolean
isWritePhase) {
RssPartitionToShuffleServerRequest rssPartitionToShuffleServerRequest =
- new RssPartitionToShuffleServerRequest(shuffleId);
+ new RssPartitionToShuffleServerRequest(
+ stageAttemptId, stageAttemptNumber, shuffleId, isWritePhase);
RssReassignOnBlockSendFailureResponse rpcPartitionToShufflerServer =
getOrCreateShuffleManagerClientSupplier()
.get()
@@ -668,65 +676,34 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
* Reassign the ShuffleServer list for ShuffleId
*
* @param shuffleId
- * @param numPartitions
*/
@Override
public boolean reassignOnStageResubmit(
- int stageId, int stageAttemptNumber, int shuffleId, int numPartitions) {
- String stageIdAndAttempt = stageId + "_" + stageAttemptNumber;
- RssStageInfo rssStageInfo =
- rssStageResubmitManager.recordAndGetServerAssignedInfo(shuffleId,
stageIdAndAttempt);
- synchronized (rssStageInfo) {
- Boolean needReassign = rssStageInfo.isReassigned();
- if (!needReassign) {
- int requiredShuffleServerNumber =
- RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);
- int estimateTaskConcurrency =
RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf);
-
- /**
- * this will clear up the previous stage attempt all data when
registering the same
- * shuffleId at the second time
- */
- Map<Integer, List<ShuffleServerInfo>> partitionToServers =
- requestShuffleAssignment(
- shuffleId,
- numPartitions,
- 1,
- requiredShuffleServerNumber,
- estimateTaskConcurrency,
- rssStageResubmitManager.getServerIdBlackList(),
- stageId,
- stageAttemptNumber,
- false);
- /**
- * we need to clear the metadata of the completed task, otherwise some
of the stage's data
- * will be lost
- */
- try {
- unregisterAllMapOutput(shuffleId);
- } catch (SparkException e) {
- LOG.error("Clear MapoutTracker Meta failed!");
- throw new RssException("Clear MapoutTracker Meta failed!", e);
- }
- MutableShuffleHandleInfo shuffleHandleInfo =
- new MutableShuffleHandleInfo(shuffleId, partitionToServers,
getRemoteStorageInfo());
- StageAttemptShuffleHandleInfo stageAttemptShuffleHandleInfo =
- (StageAttemptShuffleHandleInfo)
shuffleHandleInfoManager.get(shuffleId);
-
stageAttemptShuffleHandleInfo.replaceCurrentShuffleHandleInfo(shuffleHandleInfo);
- rssStageResubmitManager.recordAndGetServerAssignedInfo(shuffleId,
stageIdAndAttempt, true);
- LOG.info(
- "The stage retry has been triggered successfully for the stageId:
{}, attemptNumber: {}",
- stageId,
- stageAttemptNumber);
- return true;
- } else {
- LOG.info(
- "Do nothing that the stage: {} has been reassigned for attempt{}",
- stageId,
- stageAttemptNumber);
- return false;
- }
- }
+ int shuffleId, int stageAttemptId, int stageAttemptNumber) {
+ int requiredShuffleServerNumber =
+ RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);
+ int estimateTaskConcurrency =
RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf);
+ Map<Integer, List<ShuffleServerInfo>> partitionToServers =
+ requestShuffleAssignment(
+ shuffleId,
+ getPartitionNum(shuffleId),
+ 1,
+ requiredShuffleServerNumber,
+ estimateTaskConcurrency,
+ rssStageResubmitManager.getServerIdBlackList(),
+ stageAttemptId,
+ stageAttemptNumber,
+ false);
+ MutableShuffleHandleInfo shuffleHandleInfo =
+ new MutableShuffleHandleInfo(shuffleId, partitionToServers,
getRemoteStorageInfo());
+ StageAttemptShuffleHandleInfo stageAttemptShuffleHandleInfo =
+ (StageAttemptShuffleHandleInfo)
shuffleHandleInfoManager.get(shuffleId);
+
stageAttemptShuffleHandleInfo.replaceCurrentShuffleHandleInfo(shuffleHandleInfo);
+ LOG.info(
+ "The stage retry has been triggered successfully for the shuffleId:
{}, attemptNumber: {}",
+ shuffleId,
+ stageAttemptNumber);
+ return true;
}
/** this is only valid on driver side that exposed to being invoked by grpc
server */
@@ -1062,7 +1039,7 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
}
LOG.info("Start to register shuffleId {}", shuffleId);
long start = System.currentTimeMillis();
- Map<String, String> sparkConfMap =
RssSparkConfig.sparkConfToMap(getSparkConf());
+ Map<String, String> sparkConfMap = sparkConfToMap(getSparkConf());
serverToPartitionRanges.entrySet().stream()
.forEach(
entry -> {
@@ -1093,7 +1070,7 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
}
LOG.info("Start to register shuffleId[{}]", shuffleId);
long start = System.currentTimeMillis();
- Map<String, String> sparkConfMap =
RssSparkConfig.sparkConfToMap(getSparkConf());
+ Map<String, String> sparkConfMap = sparkConfToMap(getSparkConf());
Set<Map.Entry<ShuffleServerInfo, List<PartitionRange>>> entries =
serverToPartitionRanges.entrySet();
entries.stream()
@@ -1139,4 +1116,19 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
public SparkConf getSparkConf() {
return sparkConf;
}
+
+ public Map<String, String> sparkConfToMap(SparkConf sparkConf) {
+ Map<String, String> map = new HashMap<>();
+
+ for (Tuple2<String, String> tuple : sparkConf.getAll()) {
+ String key = tuple._1;
+ map.put(key, tuple._2);
+ }
+
+ return map;
+ }
+
+ public ShuffleWriteClient getShuffleWriteClient() {
+ return shuffleWriteClient;
+ }
}
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
index 5abb6b832..82967cc4a 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java
@@ -24,6 +24,7 @@ import org.apache.spark.SparkException;
import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
import org.apache.spark.shuffle.handle.ShuffleHandleInfo;
+import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.common.ReceivingFailureServer;
import org.apache.uniffle.shuffle.BlockIdManager;
@@ -80,7 +81,7 @@ public interface RssShuffleManagerInterface {
*/
void addFailuresShuffleServerInfos(String shuffleServerId);
- boolean reassignOnStageResubmit(int stageId, int stageAttemptNumber, int
shuffleId, int numMaps);
+ boolean reassignOnStageResubmit(int shuffleId, int stageAttemptId, int
stageAttemptNumber);
MutableShuffleHandleInfo reassignOnBlockSendFailure(
int stageId,
@@ -88,4 +89,11 @@ public interface RssShuffleManagerInterface {
int shuffleId,
Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers,
boolean partitionSplit);
+
+ /**
+ * Driver Obtains ShuffleWriteClient.
+ *
+ * @return
+ */
+ ShuffleWriteClient getShuffleWriteClient();
}
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index cbc66105b..667b6a905 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.shuffle.manager;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@@ -29,6 +30,7 @@ import java.util.stream.Collectors;
import com.google.protobuf.UnsafeByteOperations;
import io.grpc.stub.StreamObserver;
+import org.apache.spark.SparkException;
import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
import org.apache.spark.shuffle.handle.StageAttemptShuffleHandleInfo;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -37,6 +39,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ReceivingFailureServer;
import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.proto.RssProtos;
@@ -47,7 +50,7 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
private static final Logger LOG =
LoggerFactory.getLogger(ShuffleManagerGrpcService.class);
private final Map<Integer, RssShuffleStatus> shuffleStatus =
JavaUtils.newConcurrentMap();
// The shuffleId mapping records the number of ShuffleServer write failures
- private final Map<Integer, ShuffleServerFailureRecord> shuffleWriteStatus =
+ private final Map<Integer, ShuffleServerWriterFailureRecord>
shuffleWriteStatus =
JavaUtils.newConcurrentMap();
private final RssShuffleManagerInterface shuffleManager;
@@ -61,6 +64,7 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
StreamObserver<RssProtos.ReportShuffleWriteFailureResponse>
responseObserver) {
String appId = request.getAppId();
int shuffleId = request.getShuffleId();
+ int stageAttemptId = request.getStageAttemptId();
int stageAttemptNumber = request.getStageAttemptNumber();
List<RssProtos.ShuffleServerId> shuffleServerIdsList =
request.getShuffleServerIdsList();
RssProtos.StatusCode code;
@@ -75,43 +79,66 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
code = RssProtos.StatusCode.INVALID_REQUEST;
reSubmitWholeStage = false;
} else {
- Map<String, AtomicInteger> shuffleServerInfoIntegerMap =
JavaUtils.newConcurrentMap();
+ Map<String, AtomicInteger> initServerFailures =
JavaUtils.newConcurrentMap();
List<ShuffleServerInfo> shuffleServerInfos =
ShuffleServerInfo.fromProto(shuffleServerIdsList);
shuffleServerInfos.forEach(
- shuffleServerInfo -> {
- shuffleServerInfoIntegerMap.put(shuffleServerInfo.getId(), new
AtomicInteger(0));
- });
- ShuffleServerFailureRecord shuffleServerFailureRecord =
+ shuffleServerInfo ->
+ initServerFailures.computeIfAbsent(
+ shuffleServerInfo.getId(), key -> new AtomicInteger(0)));
+ ShuffleServerWriterFailureRecord shuffleServerWriterFailureRecord =
shuffleWriteStatus.computeIfAbsent(
shuffleId,
- key ->
- new ShuffleServerFailureRecord(shuffleServerInfoIntegerMap,
stageAttemptNumber));
+ key -> new ShuffleServerWriterFailureRecord(stageAttemptNumber,
initServerFailures));
boolean resetflag =
-
shuffleServerFailureRecord.resetStageAttemptIfNecessary(stageAttemptNumber);
+
shuffleServerWriterFailureRecord.resetStageAttemptIfNecessary(stageAttemptNumber);
if (resetflag) {
msg =
String.format(
- "got an old stage(%d vs %d) shuffle write failure report,
which should be impossible.",
- shuffleServerFailureRecord.getStageAttempt(),
stageAttemptNumber);
+ "got an old stage(%d_%d) shuffle write failure report, which
should be impossible.",
+ stageAttemptId, stageAttemptNumber);
LOG.warn(msg);
code = RssProtos.StatusCode.INVALID_REQUEST;
reSubmitWholeStage = false;
} else {
- code = RssProtos.StatusCode.SUCCESS;
- // update the stage shuffleServer write failed count
- boolean fetchFailureflag =
- shuffleServerFailureRecord.incPartitionWriteFailure(
- stageAttemptNumber, shuffleServerInfos, shuffleManager);
- if (fetchFailureflag) {
- reSubmitWholeStage = true;
- msg =
- String.format(
- "report shuffle write failure as maximum number(%d) of
shuffle write is occurred",
- shuffleManager.getMaxFetchFailures());
- } else {
- reSubmitWholeStage = false;
- msg = "don't report shuffle write failure";
+ synchronized (shuffleServerWriterFailureRecord) {
+ code = RssProtos.StatusCode.SUCCESS;
+ // update the stage shuffleServer write failed count
+ boolean isFetchFailed =
+ shuffleServerWriterFailureRecord.incWriteFailureForShuffleServer(
+ stageAttemptNumber, shuffleServerInfos, shuffleManager);
+ if (isFetchFailed) {
+ reSubmitWholeStage = true;
+ msg =
+ String.format(
+ "Report shuffle write failure as maximum number(%d) of
shuffle write is occurred.",
+ shuffleManager.getMaxFetchFailures());
+ if (!shuffleServerWriterFailureRecord.isClearedMapTrackerBlock()) {
+ try {
+ // Clear the metadata of the completed task, otherwise some of
the stage's data will
+ // be lost.
+ shuffleManager.unregisterAllMapOutput(shuffleId);
+ // Deregister the shuffleId corresponding to the Shuffle
Server.
+
shuffleManager.getShuffleWriteClient().unregisterShuffle(appId, shuffleId);
+
shuffleServerWriterFailureRecord.setClearedMapTrackerBlock(true);
+ LOG.info(
+ "Clear shuffle result in shuffleId:{}, stageId:{},
stageAttemptNumber:{}.",
+ shuffleId,
+ stageAttemptId,
+ stageAttemptNumber);
+ } catch (SparkException e) {
+ LOG.error(
+ "Clear MapoutTracker Meta failed in shuffleId:{},
stageAttemptId:{}, stageAttemptNumber:{}.",
+ shuffleId,
+ stageAttemptId,
+ stageAttemptNumber);
+ throw new RssException("Clear MapoutTracker Meta failed!", e);
+ }
+ }
+ } else {
+ reSubmitWholeStage = false;
+ msg = "The maximum number of failures was not reached.";
+ }
}
}
}
@@ -194,8 +221,25 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
StreamObserver<RssProtos.ReassignOnStageRetryResponse> responseObserver)
{
RssProtos.ReassignOnStageRetryResponse reply;
RssProtos.StatusCode code;
+ int stageAttemptId = request.getStageAttemptId();
+ int stageAttemptNumber = request.getStageAttemptNumber();
int shuffleId = request.getShuffleId();
- StageAttemptShuffleHandleInfo shuffleHandle =
+ boolean isWritePhase = request.getIsWritePhase();
+ StageAttemptShuffleHandleInfo shuffleHandle;
+ if (isWritePhase) {
+ ShuffleServerWriterFailureRecord shuffleServerWriterFailureRecord =
+ shuffleWriteStatus.get(shuffleId);
+ if (shuffleServerWriterFailureRecord != null) {
+ synchronized (shuffleServerWriterFailureRecord) {
+ if
(shuffleServerWriterFailureRecord.isNeedReassignForLastStageNumber(
+ stageAttemptNumber)) {
+ shuffleManager.reassignOnStageResubmit(shuffleId, stageAttemptId,
stageAttemptNumber);
+
shuffleServerWriterFailureRecord.setShuffleServerAssignmented(true);
+ }
+ }
+ }
+ }
+ shuffleHandle =
(StageAttemptShuffleHandleInfo)
shuffleManager.getShuffleHandleInfoByShuffleId(shuffleId);
if (shuffleHandle != null) {
code = RssProtos.StatusCode.SUCCESS;
@@ -208,6 +252,7 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
code = RssProtos.StatusCode.INVALID_REQUEST;
reply =
RssProtos.ReassignOnStageRetryResponse.newBuilder().setStatus(code).build();
}
+
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@@ -236,27 +281,6 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
responseObserver.onCompleted();
}
- @Override
- public void reassignOnStageResubmit(
- RssProtos.ReassignServersRequest request,
- StreamObserver<RssProtos.ReassignServersResponse> responseObserver) {
- int stageId = request.getStageId();
- int stageAttemptNumber = request.getStageAttemptNumber();
- int shuffleId = request.getShuffleId();
- int numPartitions = request.getNumPartitions();
- boolean needReassign =
- shuffleManager.reassignOnStageResubmit(
- stageId, stageAttemptNumber, shuffleId, numPartitions);
- RssProtos.StatusCode code = RssProtos.StatusCode.SUCCESS;
- RssProtos.ReassignServersResponse reply =
- RssProtos.ReassignServersResponse.newBuilder()
- .setStatus(code)
- .setNeedReassign(needReassign)
- .build();
- responseObserver.onNext(reply);
- responseObserver.onCompleted();
- }
-
@Override
public void reassignOnBlockSendFailure(
org.apache.uniffle.proto.RssProtos.RssReassignOnBlockSendFailureRequest
request,
@@ -312,17 +336,26 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
shuffleStatus.remove(shuffleId);
}
- private static class ShuffleServerFailureRecord {
+ private static class ShuffleServerWriterFailureRecord {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
private final ReentrantReadWriteLock.WriteLock writeLock =
lock.writeLock();
- private final Map<String, AtomicInteger> shuffleServerFailureRecordCount;
- private int stageAttemptNumber;
-
- private ShuffleServerFailureRecord(
- Map<String, AtomicInteger> shuffleServerFailureRecordCount, int
stageAttemptNumber) {
- this.shuffleServerFailureRecordCount = shuffleServerFailureRecordCount;
+ private final Map<String, AtomicInteger>
shuffleServerWriteFailureRecordCount;
+ private Integer stageAttemptNumber;
+ // Whether the ShuffleServer has been reassigned for the current number of
attempts.
+ private boolean isShuffleServerAssignmented;
+ // Whether a retry is required for the current number of attempts.
+ private boolean isStageNeedRetry;
+ // Whether the Shuffle result has been cleared for the current number of
attempts.
+ private boolean isClearedMapTrackerBlock;
+
+ private ShuffleServerWriterFailureRecord(
+ Integer stageAttemptNumber, Map<String, AtomicInteger>
initServerFailures) {
this.stageAttemptNumber = stageAttemptNumber;
+ this.shuffleServerWriteFailureRecordCount = initServerFailures;
+ this.isStageNeedRetry = false;
+ this.isShuffleServerAssignmented = false;
+ this.isClearedMapTrackerBlock = false;
}
private <T> T withReadLock(Supplier<T> fn) {
@@ -343,18 +376,17 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
}
}
- public int getStageAttempt() {
- return withReadLock(() -> this.stageAttemptNumber);
- }
-
public boolean resetStageAttemptIfNecessary(int stageAttemptNumber) {
return withWriteLock(
() -> {
if (this.stageAttemptNumber < stageAttemptNumber) {
// a new stage attempt is issued. Record the shuffleServer
status of the Map should be
// clear and reset.
- shuffleServerFailureRecordCount.clear();
+ this.shuffleServerWriteFailureRecordCount.clear();
this.stageAttemptNumber = stageAttemptNumber;
+ this.isStageNeedRetry = false;
+ this.isShuffleServerAssignmented = false;
+ this.isClearedMapTrackerBlock = false;
return false;
} else if (this.stageAttemptNumber > stageAttemptNumber) {
return true;
@@ -363,35 +395,90 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
});
}
- public boolean incPartitionWriteFailure(
+ public boolean incWriteFailureForShuffleServer(
int stageAttemptNumber,
List<ShuffleServerInfo> shuffleServerInfos,
RssShuffleManagerInterface shuffleManager) {
return withWriteLock(
() -> {
if (this.stageAttemptNumber != stageAttemptNumber) {
- // do nothing here
+ // If it is not the latest StageAttemptNumber, skip it.
return false;
- }
- shuffleServerInfos.forEach(
- shuffleServerInfo -> {
- shuffleServerFailureRecordCount
- .computeIfAbsent(shuffleServerInfo.getId(), k -> new
AtomicInteger())
- .incrementAndGet();
- });
- List<Map.Entry<String, AtomicInteger>> list =
- new ArrayList(shuffleServerFailureRecordCount.entrySet());
- if (!list.isEmpty()) {
- Collections.sort(list, (o1, o2) -> (o1.getValue().get() -
o2.getValue().get()));
- Map.Entry<String, AtomicInteger> shuffleServerInfoIntegerEntry =
list.get(0);
- if (shuffleServerInfoIntegerEntry.getValue().get()
- > shuffleManager.getMaxFetchFailures()) {
- shuffleManager.addFailuresShuffleServerInfos(
- shuffleServerInfoIntegerEntry.getKey());
- return true;
+ } else {
+ shuffleServerInfos.forEach(
+ shuffleServerInfo ->
+ shuffleServerWriteFailureRecordCount
+ .computeIfAbsent(shuffleServerInfo.getId(), key ->
new AtomicInteger())
+ .incrementAndGet());
+ List<Map.Entry<String, AtomicInteger>> serverFailuresList =
+ new
ArrayList(shuffleServerWriteFailureRecordCount.entrySet());
+ if (serverFailuresList.isEmpty()) {
+ return false;
+ } else {
+ if (isStageNeedRetry) {
+ // You've decided to throw fetchfailed, you just throw it.
+ return true;
+ } else {
+ int failureCnt = 0;
+ // They need to be sorted in reverse order, and then the
Shuffleserver that
+ // exceeds
+ // the maximum number of failures is selected.
+ Collections.sort(
+ serverFailuresList, Comparator.comparingInt(o ->
-o.getValue().get()));
+ for (int i = 0; i < serverFailuresList.size(); i++) {
+ Map.Entry<String, AtomicInteger> serverFailure =
serverFailuresList.get(i);
+ if (serverFailure.getValue().get() >
shuffleManager.getMaxFetchFailures()) {
+
shuffleManager.addFailuresShuffleServerInfos(serverFailure.getKey());
+ failureCnt++;
+ } else {
+ break;
+ }
+ }
+ if (failureCnt > 0) {
+ this.isStageNeedRetry = true;
+ return true;
+ } else {
+ return false;
+ }
+ }
}
}
- return false;
+ });
+ }
+
+ public boolean isNeedReassignForLastStageNumber(int
lastStageAttemptNumber) {
+ return withReadLock(
+ () -> {
+ if (isStageNeedRetry
+ && !isShuffleServerAssignmented
+ && stageAttemptNumber == lastStageAttemptNumber - 1) {
+ return true;
+ } else {
+ return false;
+ }
+ });
+ }
+
+ public void setShuffleServerAssignmented(boolean isAssignmented) {
+ withWriteLock(
+ () -> {
+ this.isShuffleServerAssignmented = isAssignmented;
+ return null;
+ });
+ }
+
+ public void setClearedMapTrackerBlock(boolean isCleared) {
+ withWriteLock(
+ () -> {
+ this.isClearedMapTrackerBlock = isCleared;
+ return null;
+ });
+ }
+
+ public boolean isClearedMapTrackerBlock() {
+ return withReadLock(
+ () -> {
+ return isClearedMapTrackerBlock;
});
}
}
diff --git
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java
index 19c9f6d10..97639d3c4 100644
---
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java
+++
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java
@@ -333,7 +333,8 @@ public class WriteBufferManagerTest {
mockTaskMemoryManager,
new ShuffleWriteMetrics(),
RssSparkConfig.toRssConf(conf),
- null);
+ null,
+ 0);
WriteBufferManager spyManager = spy(wbm);
doReturn(512L).when(spyManager).acquireMemory(anyLong());
@@ -366,7 +367,8 @@ public class WriteBufferManagerTest {
mockTaskMemoryManager,
new ShuffleWriteMetrics(),
RssSparkConfig.toRssConf(conf),
- null);
+ null,
+ 0);
Function<List<ShuffleBlockInfo>, List<CompletableFuture<Long>>> spillFunc =
blocks -> {
@@ -476,7 +478,8 @@ public class WriteBufferManagerTest {
mockTaskMemoryManager,
new ShuffleWriteMetrics(),
RssSparkConfig.toRssConf(conf),
- null);
+ null,
+ 0);
Function<List<ShuffleBlockInfo>, List<CompletableFuture<Long>>> spillFunc =
blocks -> {
@@ -565,7 +568,8 @@ public class WriteBufferManagerTest {
fakedTaskMemoryManager,
new ShuffleWriteMetrics(),
RssSparkConfig.toRssConf(conf),
- null);
+ null,
+ 0);
List<ShuffleBlockInfo> blockList = new ArrayList<>();
diff --git
a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
index 8e6a307c5..d82581915 100644
---
a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
+++
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/DummyRssShuffleManager.java
@@ -25,6 +25,7 @@ import java.util.Set;
import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
import org.apache.spark.shuffle.handle.ShuffleHandleInfo;
+import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.common.ReceivingFailureServer;
import org.apache.uniffle.shuffle.BlockIdManager;
@@ -71,7 +72,7 @@ public class DummyRssShuffleManager implements
RssShuffleManagerInterface {
@Override
public boolean reassignOnStageResubmit(
- int stageId, int stageAttemptNumber, int shuffleId, int numMaps) {
+ int shuffleId, int stageAttemptId, int stageAttemptNumber) {
return false;
}
@@ -84,4 +85,9 @@ public class DummyRssShuffleManager implements
RssShuffleManagerInterface {
boolean partitionSplit) {
return null;
}
+
+ @Override
+ public ShuffleWriteClient getShuffleWriteClient() {
+ return null;
+ }
}
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index c21cd0b56..d62827243 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -103,8 +103,6 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
private final String user;
private final String uuid;
private DataPusher dataPusher;
- private final Map<Integer, Integer> shuffleIdToPartitionNum =
JavaUtils.newConcurrentMap();
- private final Map<Integer, Integer> shuffleIdToNumMapTasks =
JavaUtils.newConcurrentMap();
private GrpcServer shuffleManagerServer;
private ShuffleManagerGrpcService service;
@@ -170,6 +168,8 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
sparkConf.set("spark.shuffle.reduceLocality.enabled", "false");
LOG.info("Disable shuffle data locality in RssShuffleManager.");
+ this.shuffleIdToPartitionNum = JavaUtils.newConcurrentMap();
+ this.shuffleIdToNumMapTasks = JavaUtils.newConcurrentMap();
// stage retry for write/fetch failure
rssStageRetryForFetchFailureEnabled =
rssConf.get(RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED);
@@ -353,7 +353,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
shuffleIdToPartitionNum.putIfAbsent(shuffleId,
dependency.partitioner().numPartitions());
shuffleIdToNumMapTasks.putIfAbsent(shuffleId,
dependency.rdd().partitions().length);
- if (shuffleManagerRpcServiceEnabled && rssStageRetryEnabled) {
+ if (shuffleManagerRpcServiceEnabled &&
rssStageRetryForWriteFailureEnabled) {
ShuffleHandleInfo handleInfo =
new MutableShuffleHandleInfo(shuffleId, partitionToServers,
remoteStorage);
StageAttemptShuffleHandleInfo stageAttemptShuffleHandleInfo =
@@ -375,7 +375,8 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
+ shuffleId
+ "], partitionNum["
+ partitionToServers.size()
- + "]");
+ + "], server:{}",
+ partitionToServers);
return new RssShuffleHandle(shuffleId, appId, numMaps, dependency,
hdlInfoBd);
}
@@ -489,12 +490,16 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
+ "]");
start = System.currentTimeMillis();
ShuffleHandleInfo shuffleHandleInfo;
- if (shuffleManagerRpcServiceEnabled && rssStageRetryEnabled) {
+ if (shuffleManagerRpcServiceEnabled &&
rssStageRetryForWriteFailureEnabled) {
// In Stage Retry mode, Get the ShuffleServer list from the Driver
based on the shuffleId.
- shuffleHandleInfo =
getRemoteShuffleHandleInfoWithStageRetry(shuffleId);
+ shuffleHandleInfo =
+ getRemoteShuffleHandleInfoWithStageRetry(
+ context.stageId(), context.stageAttemptNumber(), shuffleId,
false);
} else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
// In Block Retry mode, Get the ShuffleServer list from the Driver
based on the shuffleId
- shuffleHandleInfo =
getRemoteShuffleHandleInfoWithBlockRetry(shuffleId);
+ shuffleHandleInfo =
+ getRemoteShuffleHandleInfoWithBlockRetry(
+ context.stageId(), context.stageAttemptNumber(), shuffleId,
false);
} else {
shuffleHandleInfo =
new SimpleShuffleHandleInfo(
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 4474c99c8..aa4ff9f89 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -65,9 +65,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleManagerClient;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.impl.FailedBlockSendTracker;
-import org.apache.uniffle.client.request.RssReassignServersRequest;
import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
-import org.apache.uniffle.client.response.RssReassignServersResponse;
import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
@@ -111,6 +109,8 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
private TaskContext taskContext;
private SparkConf sparkConf;
private Supplier<ShuffleManagerClient> managerClientSupplier;
+ private boolean enableWriteFailureRetry;
+ private Set<ShuffleServerInfo> recordReportFailedShuffleservers;
public RssShuffleWriter(
String appId,
@@ -181,6 +181,9 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
this.shuffleHandleInfo = shuffleHandleInfo;
this.taskContext = context;
this.sparkConf = sparkConf;
+ this.enableWriteFailureRetry =
+
RssSparkConfig.toRssConf(sparkConf).get(RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED);
+ this.recordReportFailedShuffleservers = Sets.newConcurrentHashSet();
}
public RssShuffleWriter(
@@ -208,7 +211,8 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
managerClientSupplier,
rssHandle,
taskFailureCallback,
- shuffleManager.getShuffleHandleInfo(rssHandle),
+ shuffleManager.getShuffleHandleInfo(
+ context.stageId(), context.stageAttemptNumber(), rssHandle, true),
context);
BufferManagerOptions bufferOptions = new BufferManagerOptions(sparkConf);
final WriteBufferManager bufferManager =
@@ -222,7 +226,8 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
context.taskMemoryManager(),
shuffleWriteMetrics,
RssSparkConfig.toRssConf(sparkConf),
- this::processShuffleBlockInfos);
+ this::processShuffleBlockInfos,
+ context.stageAttemptNumber());
this.bufferManager = bufferManager;
}
@@ -244,8 +249,8 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
writeImpl(records);
} catch (Exception e) {
taskFailureCallback.apply(taskId);
- if
(RssSparkConfig.toRssConf(sparkConf).get(RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED))
{
- throwFetchFailedIfNecessary(e);
+ if (enableWriteFailureRetry) {
+ throwFetchFailedIfNecessary(e, Sets.newConcurrentHashSet());
} else {
throw e;
}
@@ -480,7 +485,13 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
createDummyBlockManagerId(appId + "_" + taskId, taskAttemptId);
long start = System.currentTimeMillis();
shuffleWriteClient.reportShuffleResult(
- serverToPartitionToBlockIds, appId, shuffleId, taskAttemptId,
bitmapSplitNum);
+ serverToPartitionToBlockIds,
+ appId,
+ shuffleId,
+ taskAttemptId,
+ bitmapSplitNum,
+ recordReportFailedShuffleservers,
+ enableWriteFailureRetry);
LOG.info(
"Report shuffle result for task[{}] with bitmapNum[{}] cost {} ms",
taskAttemptId,
@@ -491,6 +502,14 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
} else {
return Option.empty();
}
+ } catch (Exception e) {
+ // If an exception is thrown during the reporting process, it should be
judged as a failure
+ // and Stage retry should be triggered.
+ if (enableWriteFailureRetry) {
+ throw throwFetchFailedIfNecessary(e, recordReportFailedShuffleservers);
+ } else {
+ throw e;
+ }
} finally {
// free all memory & metadata, or memory leak happen in executor
if (bufferManager != null) {
@@ -531,35 +550,27 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
return shuffleWriteMetrics;
}
- private void throwFetchFailedIfNecessary(Exception e) {
+ private RssException throwFetchFailedIfNecessary(
+ Exception e, Set<ShuffleServerInfo> reportFailuredServers) {
// The shuffleServer is registered only when a Block fails to be sent
if (e instanceof RssSendFailedException) {
FailedBlockSendTracker blockIdsFailedSendTracker =
shuffleManager.getBlockIdsFailedSendTracker(taskId);
List<ShuffleServerInfo> shuffleServerInfos =
Lists.newArrayList(blockIdsFailedSendTracker.getFaultyShuffleServers());
+ shuffleServerInfos.addAll(reportFailuredServers);
RssReportShuffleWriteFailureRequest req =
new RssReportShuffleWriteFailureRequest(
appId,
shuffleId,
+ taskContext.stageId(),
taskContext.stageAttemptNumber(),
shuffleServerInfos,
e.getMessage());
RssReportShuffleWriteFailureResponse response =
managerClientSupplier.get().reportShuffleWriteFailure(req);
if (response.getReSubmitWholeStage()) {
- // The shuffle server is reassigned.
- RssReassignServersRequest rssReassignServersRequest =
- new RssReassignServersRequest(
- taskContext.stageId(),
- taskContext.stageAttemptNumber(),
- shuffleId,
- partitioner.numPartitions());
- RssReassignServersResponse rssReassignServersResponse =
-
managerClientSupplier.get().reassignOnStageResubmit(rssReassignServersRequest);
- LOG.info(
- "Whether the reassignment is successful: {}",
- rssReassignServersResponse.isNeedReassign());
+ LOG.warn("Multiple task failures trigger Stage retry.");
// since we are going to roll out the whole stage, mapIndex shouldn't
matter, hence -1
// is
// provided.
diff --git
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index 779f94117..331697b85 100644
---
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -287,7 +287,8 @@ public class RssShuffleWriterTest {
mockTaskMemoryManager,
shuffleWriteMetrics,
new RssConf(),
- null);
+ null,
+ 0);
WriteBufferManager bufferManagerSpy = spy(bufferManager);
doReturn(1000000L).when(bufferManagerSpy).acquireMemory(anyLong());
TaskContext contextMock = mock(TaskContext.class);
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 95c89bd29..37692a05a 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -111,9 +111,6 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
private String uuid;
private Set<String> failedTaskIds = Sets.newConcurrentHashSet();
private DataPusher dataPusher;
-
- private final Map<Integer, Integer> shuffleIdToPartitionNum =
JavaUtils.newConcurrentMap();
- private final Map<Integer, Integer> shuffleIdToNumMapTasks =
JavaUtils.newConcurrentMap();
private ShuffleManagerGrpcService service;
private GrpcServer shuffleManagerServer;
@@ -184,8 +181,8 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
LOG.info("Disable shuffle data locality in RssShuffleManager.");
taskToSuccessBlockIds = JavaUtils.newConcurrentMap();
taskToFailedBlockSendTracker = JavaUtils.newConcurrentMap();
-
- this.rssStageRetryEnabled =
rssConf.get(RssSparkConfig.RSS_RESUBMIT_STAGE_ENABLED);
+ this.shuffleIdToPartitionNum = JavaUtils.newConcurrentMap();
+ this.shuffleIdToNumMapTasks = JavaUtils.newConcurrentMap();
this.partitionReassignEnabled =
rssConf.get(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);
// stage retry for write/fetch failure
@@ -471,7 +468,7 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
startHeartbeat();
shuffleIdToPartitionNum.putIfAbsent(shuffleId,
dependency.partitioner().numPartitions());
shuffleIdToNumMapTasks.putIfAbsent(shuffleId,
dependency.rdd().partitions().length);
- if (shuffleManagerRpcServiceEnabled && rssStageRetryEnabled) {
+ if (shuffleManagerRpcServiceEnabled &&
rssStageRetryForWriteFailureEnabled) {
ShuffleHandleInfo shuffleHandleInfo =
new MutableShuffleHandleInfo(shuffleId, partitionToServers,
remoteStorage);
StageAttemptShuffleHandleInfo handleInfo =
@@ -514,7 +511,6 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
} else {
writeMetrics = context.taskMetrics().shuffleWriteMetrics();
}
-
String taskId = "" + context.taskAttemptId() + "_" +
context.attemptNumber();
return new RssShuffleWriter<>(
rssHandle.getAppId(),
@@ -647,7 +643,24 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
RssShuffleHandle<K, ?, C> rssShuffleHandle = (RssShuffleHandle<K, ?, C>)
handle;
final int partitionNum =
rssShuffleHandle.getDependency().partitioner().numPartitions();
int shuffleId = rssShuffleHandle.getShuffleId();
- ShuffleHandleInfo shuffleHandleInfo =
getShuffleHandleInfo(rssShuffleHandle);
+ ShuffleHandleInfo shuffleHandleInfo;
+ if (shuffleManagerRpcServiceEnabled &&
rssStageRetryForWriteFailureEnabled) {
+ // In Stage Retry mode, Get the ShuffleServer list from the Driver based
on the shuffleId.
+ shuffleHandleInfo =
+ getRemoteShuffleHandleInfoWithStageRetry(
+ context.stageId(), context.stageAttemptNumber(), shuffleId,
false);
+ } else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
+ // In Block Retry mode, Get the ShuffleServer list from the Driver based
on the shuffleId.
+ shuffleHandleInfo =
+ getRemoteShuffleHandleInfoWithBlockRetry(
+ context.stageId(), context.stageAttemptNumber(), shuffleId,
false);
+ } else {
+ shuffleHandleInfo =
+ new SimpleShuffleHandleInfo(
+ shuffleId,
+ rssShuffleHandle.getPartitionToServers(),
+ rssShuffleHandle.getRemoteStorage());
+ }
Map<ShuffleServerInfo, Set<Integer>> serverToPartitions =
getPartitionDataServers(shuffleHandleInfo, startPartition,
endPartition);
long start = System.currentTimeMillis();
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index decefc6d3..48cbf3efa 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -73,10 +73,8 @@ import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.impl.FailedBlockSendTracker;
import org.apache.uniffle.client.impl.TrackingBlockStatus;
import org.apache.uniffle.client.request.RssReassignOnBlockSendFailureRequest;
-import org.apache.uniffle.client.request.RssReassignServersRequest;
import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
import
org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
-import org.apache.uniffle.client.response.RssReassignServersResponse;
import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
import org.apache.uniffle.common.ReceivingFailureServer;
import org.apache.uniffle.common.ShuffleBlockInfo;
@@ -140,6 +138,8 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
Sets.newHashSet(StatusCode.NO_REGISTER);
private final Supplier<ShuffleManagerClient> managerClientSupplier;
+ private boolean enableWriteFailureRetry;
+ private Set<ShuffleServerInfo> recordReportFailedShuffleservers;
// Only for tests
@VisibleForTesting
@@ -226,6 +226,9 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
RssClientConf.RSS_CLIENT_REASSIGN_ENABLED.defaultValue());
this.blockFailSentRetryMaxTimes =
RssSparkConfig.toRssConf(sparkConf).get(RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES);
+ this.enableWriteFailureRetry =
+
RssSparkConfig.toRssConf(sparkConf).get(RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED);
+ this.recordReportFailedShuffleservers = Sets.newConcurrentHashSet();
}
public RssShuffleWriter(
@@ -253,8 +256,10 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
managerClientSupplier,
rssHandle,
taskFailureCallback,
- shuffleManager.getShuffleHandleInfo(rssHandle),
+ shuffleManager.getShuffleHandleInfo(
+ context.stageId(), context.stageAttemptNumber(), rssHandle, true),
context);
+ this.taskAttemptAssignment = new TaskAttemptAssignment(taskAttemptId,
shuffleHandleInfo);
BufferManagerOptions bufferOptions = new BufferManagerOptions(sparkConf);
final WriteBufferManager bufferManager =
new WriteBufferManager(
@@ -267,9 +272,9 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
shuffleWriteMetrics,
RssSparkConfig.toRssConf(sparkConf),
this::processShuffleBlockInfos,
- this::getPartitionAssignedServers);
+ this::getPartitionAssignedServers,
+ context.stageAttemptNumber());
this.bufferManager = bufferManager;
- this.taskAttemptAssignment = new TaskAttemptAssignment(taskAttemptId,
shuffleHandleInfo);
}
@VisibleForTesting
@@ -287,8 +292,8 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
writeImpl(records);
} catch (Exception e) {
taskFailureCallback.apply(taskId);
- if
(RssSparkConfig.toRssConf(sparkConf).get(RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED))
{
- throwFetchFailedIfNecessary(e);
+ if (enableWriteFailureRetry) {
+ throwFetchFailedIfNecessary(e, Sets.newConcurrentHashSet());
} else {
throw e;
}
@@ -806,9 +811,16 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
if (success) {
long start = System.currentTimeMillis();
shuffleWriteClient.reportShuffleResult(
- serverToPartitionToBlockIds, appId, shuffleId, taskAttemptId,
bitmapSplitNum);
+ serverToPartitionToBlockIds,
+ appId,
+ shuffleId,
+ taskAttemptId,
+ bitmapSplitNum,
+ recordReportFailedShuffleservers,
+ enableWriteFailureRetry);
LOG.info(
- "Report shuffle result for task[{}] with bitmapNum[{}] cost {} ms",
+ "Report shuffle result for shuffleId[{}] task[{}] with
bitmapNum[{}] cost {} ms",
+ shuffleId,
taskAttemptId,
bitmapSplitNum,
(System.currentTimeMillis() - start));
@@ -825,6 +837,14 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
} else {
return Option.empty();
}
+ } catch (Exception e) {
+ // If an exception is thrown during the reporting process, it should be
judged as a failure
+ // and Stage retry should be triggered.
+ if (enableWriteFailureRetry) {
+ throw throwFetchFailedIfNecessary(e, recordReportFailedShuffleservers);
+ } else {
+ throw e;
+ }
} finally {
if (blockFailSentRetryEnabled) {
if (success) {
@@ -867,37 +887,27 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
return bufferManager;
}
- private void throwFetchFailedIfNecessary(Exception e) {
+ private RssException throwFetchFailedIfNecessary(
+ Exception e, Set<ShuffleServerInfo> reportFailuredServers) {
// The shuffleServer is registered only when a Block fails to be sent
if (e instanceof RssSendFailedException) {
FailedBlockSendTracker blockIdsFailedSendTracker =
shuffleManager.getBlockIdsFailedSendTracker(taskId);
List<ShuffleServerInfo> shuffleServerInfos =
Lists.newArrayList(blockIdsFailedSendTracker.getFaultyShuffleServers());
+ shuffleServerInfos.addAll(reportFailuredServers);
RssReportShuffleWriteFailureRequest req =
new RssReportShuffleWriteFailureRequest(
appId,
shuffleId,
+ taskContext.stageId(),
taskContext.stageAttemptNumber(),
shuffleServerInfos,
e.getMessage());
RssReportShuffleWriteFailureResponse response =
managerClientSupplier.get().reportShuffleWriteFailure(req);
if (response.getReSubmitWholeStage()) {
- RssReassignServersRequest rssReassignServersRequest =
- new RssReassignServersRequest(
- taskContext.stageId(),
- taskContext.stageAttemptNumber(),
- shuffleId,
- partitioner.numPartitions());
- RssReassignServersResponse rssReassignServersResponse =
-
managerClientSupplier.get().reassignOnStageResubmit(rssReassignServersRequest);
- LOG.info(
- "Whether the reassignment is successful: {}",
- rssReassignServersResponse.isNeedReassign());
- // since we are going to roll out the whole stage, mapIndex shouldn't
matter, hence -1
- // is
- // provided.
+ LOG.warn(response.getMessage());
FetchFailedException ffe =
RssSparkShuffleUtils.createFetchFailedException(
shuffleId, -1, taskContext.stageAttemptNumber(), e);
diff --git
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index a4317aae8..9e9e68db5 100644
---
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -717,7 +717,8 @@ public class RssShuffleWriterTest {
fakedTaskMemoryManager,
new ShuffleWriteMetrics(),
RssSparkConfig.toRssConf(conf),
- null);
+ null,
+ 0);
Serializer kryoSerializer = new KryoSerializer(conf);
Partitioner mockPartitioner = mock(Partitioner.class);
diff --git
a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
index caab46020..33980c92f 100644
--- a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
+++ b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java
@@ -155,6 +155,15 @@ public interface ShuffleWriteClient {
long taskAttemptId,
int bitmapNum);
+ default void reportShuffleResult(
+ Map<ShuffleServerInfo, Map<Integer, Set<Long>>>
serverToPartitionToBlockIds,
+ String appId,
+ int shuffleId,
+ long taskAttemptId,
+ int bitmapNum,
+ Set<ShuffleServerInfo> reportFailureServers,
+ boolean enableWriteFailureRetry) {}
+
ShuffleAssignmentsInfo getShuffleAssignments(
String appId,
int shuffleId,
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index b78f06bd3..27c55ac3d 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -91,6 +91,7 @@ import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.exception.RssFetchFailedException;
+import org.apache.uniffle.common.exception.RssSendFailedException;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.BlockIdLayout;
import org.apache.uniffle.common.util.JavaUtils;
@@ -718,6 +719,25 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
int shuffleId,
long taskAttemptId,
int bitmapNum) {
+ reportShuffleResult(
+ serverToPartitionToBlockIds,
+ appId,
+ shuffleId,
+ taskAttemptId,
+ bitmapNum,
+ Sets.newConcurrentHashSet(),
+ false);
+ }
+
+ @Override
+ public void reportShuffleResult(
+ Map<ShuffleServerInfo, Map<Integer, Set<Long>>>
serverToPartitionToBlockIds,
+ String appId,
+ int shuffleId,
+ long taskAttemptId,
+ int bitmapNum,
+ Set<ShuffleServerInfo> reportFailureServers,
+ boolean enableWriteFailureRetry) {
// record blockId count for quora check,but this is not a good realization.
Map<Long, Integer> blockReportTracker =
createBlockReportTracker(serverToPartitionToBlockIds);
for (Map.Entry<ShuffleServerInfo, Map<Integer, Set<Long>>> entry :
@@ -758,6 +778,13 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
+ "] failed with "
+ response.getStatusCode());
recordFailedBlockIds(blockReportTracker, requestBlockIds);
+ if (enableWriteFailureRetry) {
+ // The failed Shuffle Server is recorded and corresponding
exceptions are raised only
+ // when the retry function is started.
+ reportFailureServers.add(ssi);
+ throw new RssSendFailedException(
+ "Throw an exception because the report shuffle result status
code is not SUCCESS.");
+ }
}
} catch (Exception e) {
LOG.warn(
@@ -769,6 +796,13 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
+ shuffleId
+ "]");
recordFailedBlockIds(blockReportTracker, requestBlockIds);
+ if (enableWriteFailureRetry) {
+ // The failed Shuffle Server is recorded and corresponding
exceptions are raised only when
+ // the retry function is started.
+ reportFailureServers.add(ssi);
+ throw new RssSendFailedException(
+ "Throw an exception because the report shuffle result status
code is not SUCCESS.");
+ }
}
}
if (blockReportTracker.values().stream().anyMatch(cnt -> cnt <
replicaWrite)) {
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTest.java
index 90591a3b8..8a892d9a6 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/RemoteMergeShuffleWithRssClientTest.java
@@ -184,7 +184,7 @@ public class RemoteMergeShuffleWithRssClientTest extends
ShuffleReadWriteBase {
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL,
0,
- -1,
+ 0,
RssProtos.MergeContext.newBuilder()
.setKeyClass(keyClass.getName())
.setValueClass(valueClass.getName())
@@ -354,7 +354,7 @@ public class RemoteMergeShuffleWithRssClientTest extends
ShuffleReadWriteBase {
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL,
0,
- -1,
+ 0,
RssProtos.MergeContext.newBuilder()
.setKeyClass(keyClass.getName())
.setValueClass(valueClass.getName())
@@ -532,7 +532,7 @@ public class RemoteMergeShuffleWithRssClientTest extends
ShuffleReadWriteBase {
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL,
0,
- -1,
+ 0,
RssProtos.MergeContext.newBuilder()
.setKeyClass(keyClass.getName())
.setValueClass(valueClass.getName())
@@ -745,7 +745,7 @@ public class RemoteMergeShuffleWithRssClientTest extends
ShuffleReadWriteBase {
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL,
0,
- -1,
+ 0,
RssProtos.MergeContext.newBuilder()
.setKeyClass(keyClass.getName())
.setValueClass(valueClass.getName())
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageDynamicServerReWriteTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageDynamicServerReWriteTest.java
index a9ae33a42..444250f56 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageDynamicServerReWriteTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageDynamicServerReWriteTest.java
@@ -29,10 +29,7 @@ import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.uniffle.client.util.RssClientConfig;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.MockedGrpcServer;
@@ -40,9 +37,6 @@ import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
public class RSSStageDynamicServerReWriteTest extends
SparkTaskFailureIntegrationTestBase {
-
- private static final Logger LOG =
LoggerFactory.getLogger(RSSStageDynamicServerReWriteTest.class);
-
@BeforeAll
public static void setupServers(@TempDir File tmpDir) throws Exception {
CoordinatorConf coordinatorConf = getCoordinatorConf();
@@ -69,11 +63,13 @@ public class RSSStageDynamicServerReWriteTest extends
SparkTaskFailureIntegratio
File dataDir2 = new File(tmpDir, id + "_2");
String basePath = dataDir1.getAbsolutePath() + "," +
dataDir2.getAbsolutePath();
shuffleServerConf.setString("rss.storage.type",
StorageType.MEMORY_LOCALFILE.name());
+ shuffleServerConf.set(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED,
false);
shuffleServerConf.setInteger(
"rss.rpc.server.port",
shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT) + id);
shuffleServerConf.setInteger("rss.jetty.http.port", 19081 + id * 100);
shuffleServerConf.setString("rss.storage.basePath", basePath);
+
if (abnormalFlag) {
createMockedShuffleServer(shuffleServerConf);
// Set the sending block data timeout for the first shuffleServer
@@ -111,7 +107,9 @@ public class RSSStageDynamicServerReWriteTest extends
SparkTaskFailureIntegratio
public void updateSparkConfCustomer(SparkConf sparkConf) {
super.updateSparkConfCustomer(sparkConf);
sparkConf.set(
- RssSparkConfig.SPARK_RSS_CONFIG_PREFIX +
RssClientConfig.RSS_RESUBMIT_STAGE, "true");
+ RssSparkConfig.SPARK_RSS_CONFIG_PREFIX
+ +
RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED.key(),
+ "true");
}
@Test
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java
index 774ba572b..0a70e8242 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java
@@ -78,7 +78,9 @@ public class RSSStageResubmitTest extends
SparkTaskFailureIntegrationTestBase {
public void updateSparkConfCustomer(SparkConf sparkConf) {
super.updateSparkConfCustomer(sparkConf);
sparkConf.set(
- RssSparkConfig.SPARK_RSS_CONFIG_PREFIX +
RssClientConfig.RSS_RESUBMIT_STAGE, "true");
+ RssSparkConfig.SPARK_RSS_CONFIG_PREFIX
+ + RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED,
+ "true");
}
@Test
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
index 71395f281..9c5926370 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
@@ -64,6 +64,7 @@ public abstract class SparkIntegrationTestBase extends
IntegrationTestBase {
verifyTestResult(resultWithoutRss, resultWithRssGrpc);
updateSparkConfWithRssNetty(sparkConf);
+ updateSparkConfCustomer(sparkConf);
start = System.currentTimeMillis();
Map resultWithRssNetty = runSparkApp(sparkConf, fileName);
final long durationWithRssNetty = System.currentTimeMillis() - start;
@@ -134,6 +135,24 @@ public abstract class SparkIntegrationTestBase extends
IntegrationTestBase {
}
public void updateSparkConfWithRssNetty(SparkConf sparkConf) {
+ sparkConf.set("spark.shuffle.manager",
"org.apache.spark.shuffle.RssShuffleManager");
+ sparkConf.set(
+ "spark.shuffle.sort.io.plugin.class",
"org.apache.spark.shuffle.RssShuffleDataIo");
+ sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
+ sparkConf.set(RssSparkConfig.RSS_WRITER_BUFFER_SIZE.key(), "4m");
+ sparkConf.set(RssSparkConfig.RSS_WRITER_BUFFER_SPILL_SIZE.key(), "32m");
+ sparkConf.set(RssSparkConfig.RSS_CLIENT_READ_BUFFER_SIZE.key(), "2m");
+ sparkConf.set(RssSparkConfig.RSS_WRITER_SERIALIZER_BUFFER_SIZE.key(),
"128k");
+ sparkConf.set(RssSparkConfig.RSS_WRITER_BUFFER_SEGMENT_SIZE.key(), "256k");
+ sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(),
COORDINATOR_QUORUM);
+ sparkConf.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.key(),
"30000");
+ sparkConf.set(RssSparkConfig.RSS_CLIENT_RETRY_MAX.key(), "10");
+ sparkConf.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS.key(),
"1000");
+ sparkConf.set(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX.key(), "1000");
+ sparkConf.set(RssSparkConfig.RSS_INDEX_READ_LIMIT.key(), "100");
+ sparkConf.set(RssSparkConfig.RSS_CLIENT_READ_BUFFER_SIZE.key(), "1m");
+ sparkConf.set(RssSparkConfig.RSS_HEARTBEAT_INTERVAL.key(), "2000");
+ sparkConf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE.key(), "true");
sparkConf.set(RssSparkConfig.RSS_CLIENT_TYPE,
ClientType.GRPC_NETTY.name());
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
index 6616fe7b1..d3a78ce29 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleManagerClient.java
@@ -21,14 +21,12 @@ import
org.apache.uniffle.client.request.RssGetShuffleResultForMultiPartRequest;
import org.apache.uniffle.client.request.RssGetShuffleResultRequest;
import org.apache.uniffle.client.request.RssPartitionToShuffleServerRequest;
import org.apache.uniffle.client.request.RssReassignOnBlockSendFailureRequest;
-import org.apache.uniffle.client.request.RssReassignServersRequest;
import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
import org.apache.uniffle.client.response.RssGetShuffleResultResponse;
import
org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
import org.apache.uniffle.client.response.RssReassignOnStageRetryResponse;
-import org.apache.uniffle.client.response.RssReassignServersResponse;
import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
@@ -61,8 +59,6 @@ public interface ShuffleManagerClient extends
StatefulCloseable {
RssReportShuffleWriteFailureResponse reportShuffleWriteFailure(
RssReportShuffleWriteFailureRequest req);
- RssReassignServersResponse reassignOnStageResubmit(RssReassignServersRequest
req);
-
RssReassignOnBlockSendFailureResponse reassignOnBlockSendFailure(
RssReassignOnBlockSendFailureRequest request);
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
index 8cad876c2..9c34f3f7d 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java
@@ -27,14 +27,12 @@ import
org.apache.uniffle.client.request.RssGetShuffleResultForMultiPartRequest;
import org.apache.uniffle.client.request.RssGetShuffleResultRequest;
import org.apache.uniffle.client.request.RssPartitionToShuffleServerRequest;
import org.apache.uniffle.client.request.RssReassignOnBlockSendFailureRequest;
-import org.apache.uniffle.client.request.RssReassignServersRequest;
import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
import org.apache.uniffle.client.response.RssGetShuffleResultResponse;
import
org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
import org.apache.uniffle.client.response.RssReassignOnStageRetryResponse;
-import org.apache.uniffle.client.response.RssReassignServersResponse;
import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
@@ -125,14 +123,6 @@ public class ShuffleManagerGrpcClient extends GrpcClient
implements ShuffleManag
}
}
- @Override
- public RssReassignServersResponse
reassignOnStageResubmit(RssReassignServersRequest req) {
- RssProtos.ReassignServersRequest reassignServersRequest = req.toProto();
- RssProtos.ReassignServersResponse reassignServersResponse =
- getBlockingStub().reassignOnStageResubmit(reassignServersRequest);
- return RssReassignServersResponse.fromProto(reassignServersResponse);
- }
-
@Override
public RssReassignOnBlockSendFailureResponse reassignOnBlockSendFailure(
RssReassignOnBlockSendFailureRequest request) {
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssPartitionToShuffleServerRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssPartitionToShuffleServerRequest.java
index 62c3ec8f6..9e42d2c6a 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssPartitionToShuffleServerRequest.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssPartitionToShuffleServerRequest.java
@@ -20,20 +20,34 @@ package org.apache.uniffle.client.request;
import org.apache.uniffle.proto.RssProtos;
public class RssPartitionToShuffleServerRequest {
+ private int stageAttemptId;
+ private int stageAttemptNumber;
private int shuffleId;
+ private boolean isWritePhase;
- public RssPartitionToShuffleServerRequest(int shuffleId) {
+ public RssPartitionToShuffleServerRequest(
+ int stageAttemptId, int stageAttemptNumber, int shuffleId, boolean
isWritePhase) {
+ this.stageAttemptId = stageAttemptId;
+ this.stageAttemptNumber = stageAttemptNumber;
this.shuffleId = shuffleId;
+ this.isWritePhase = isWritePhase;
}
public int getShuffleId() {
return shuffleId;
}
+ public int getStageAttemptNumber() {
+ return stageAttemptNumber;
+ }
+
public RssProtos.PartitionToShuffleServerRequest toProto() {
RssProtos.PartitionToShuffleServerRequest.Builder builder =
RssProtos.PartitionToShuffleServerRequest.newBuilder();
+ builder.setStageAttemptId(stageAttemptId);
+ builder.setStageAttemptNumber(stageAttemptNumber);
builder.setShuffleId(shuffleId);
+ builder.setIsWritePhase(isWritePhase);
return builder.build();
}
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReassignServersRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReassignServersRequest.java
deleted file mode 100644
index 5ce505ab1..000000000
---
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReassignServersRequest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.uniffle.client.request;
-
-import org.apache.uniffle.proto.RssProtos;
-
-public class RssReassignServersRequest {
- private int stageId;
- private int stageAttemptNumber;
- private int shuffleId;
- private int numPartitions;
-
- public RssReassignServersRequest(
- int stageId, int stageAttemptNumber, int shuffleId, int numPartitions) {
- this.stageId = stageId;
- this.stageAttemptNumber = stageAttemptNumber;
- this.shuffleId = shuffleId;
- this.numPartitions = numPartitions;
- }
-
- public RssProtos.ReassignServersRequest toProto() {
- RssProtos.ReassignServersRequest.Builder builder =
- RssProtos.ReassignServersRequest.newBuilder()
- .setStageId(stageId)
- .setStageAttemptNumber(stageAttemptNumber)
- .setShuffleId(shuffleId)
- .setNumPartitions(numPartitions);
- return builder.build();
- }
-}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteFailureRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteFailureRequest.java
index c05176769..56da7c6c3 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteFailureRequest.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteFailureRequest.java
@@ -28,6 +28,7 @@ import org.apache.uniffle.proto.RssProtos.ShuffleServerId;
public class RssReportShuffleWriteFailureRequest {
private String appId;
private int shuffleId;
+ private int stageAttemptId;
private int stageAttemptNumber;
private List<ShuffleServerInfo> shuffleServerInfos;
private String exception;
@@ -35,11 +36,13 @@ public class RssReportShuffleWriteFailureRequest {
public RssReportShuffleWriteFailureRequest(
String appId,
int shuffleId,
+ int stageAttemptId,
int stageAttemptNumber,
List<ShuffleServerInfo> shuffleServerInfos,
String exception) {
this.appId = appId;
this.shuffleId = shuffleId;
+ this.stageAttemptId = stageAttemptId;
this.stageAttemptNumber = stageAttemptNumber;
this.shuffleServerInfos = shuffleServerInfos;
this.exception = exception;
@@ -60,6 +63,7 @@ public class RssReportShuffleWriteFailureRequest {
builder
.setAppId(appId)
.setShuffleId(shuffleId)
+ .setStageAttemptId(stageAttemptId)
.setStageAttemptNumber(stageAttemptNumber)
.addAllShuffleServerIds(shuffleServerIds);
if (exception != null) {
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssReassignServersResponse.java
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssReassignServersResponse.java
deleted file mode 100644
index d85351d98..000000000
---
a/internal-client/src/main/java/org/apache/uniffle/client/response/RssReassignServersResponse.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.uniffle.client.response;
-
-import org.apache.uniffle.common.rpc.StatusCode;
-import org.apache.uniffle.proto.RssProtos;
-
-public class RssReassignServersResponse extends ClientResponse {
-
- private boolean needReassign;
-
- public RssReassignServersResponse(StatusCode statusCode, String message,
boolean needReassign) {
- super(statusCode, message);
- this.needReassign = needReassign;
- }
-
- public boolean isNeedReassign() {
- return needReassign;
- }
-
- public static RssReassignServersResponse
fromProto(RssProtos.ReassignServersResponse response) {
- return new RssReassignServersResponse(
- // todo: [issue#780] add fromProto for StatusCode issue
- StatusCode.valueOf(response.getStatus().name()),
- response.getMsg(),
- response.getNeedReassign());
- }
-}
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index c68a1a87e..4749a0658 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -579,8 +579,6 @@ service ShuffleManager {
rpc
getPartitionToShufflerServerWithBlockRetry(PartitionToShuffleServerRequest)
returns (ReassignOnBlockSendFailureResponse);
// Report write failures to ShuffleManager
rpc reportShuffleWriteFailure (ReportShuffleWriteFailureRequest) returns
(ReportShuffleWriteFailureResponse);
- // Reassign on stage resubmit
- rpc reassignOnStageResubmit(ReassignServersRequest) returns
(ReassignServersResponse);
// Reassign on block send failure that occurs in writer
rpc reassignOnBlockSendFailure(RssReassignOnBlockSendFailureRequest) returns
(ReassignOnBlockSendFailureResponse);
rpc reportShuffleResult (ReportShuffleResultRequest) returns
(ReportShuffleResultResponse);
@@ -607,7 +605,10 @@ message ReportShuffleFetchFailureResponse {
}
message PartitionToShuffleServerRequest {
- int32 shuffleId = 2;
+ int32 stageAttemptId = 1;
+ int32 stageAttemptNumber = 2;
+ int32 shuffleId = 3;
+ bool isWritePhase = 4;
}
message ReassignOnStageRetryResponse {
@@ -643,7 +644,8 @@ message RemoteStorageInfo{
message ReportShuffleWriteFailureRequest {
string appId = 1;
int32 shuffleId = 2;
- int32 stageAttemptNumber = 3;
+ int32 stageAttemptId = 3;
+ int32 stageAttemptNumber = 4;
repeated ShuffleServerId shuffleServerIds= 5;
string exception = 6;
}
@@ -654,19 +656,6 @@ message ReportShuffleWriteFailureResponse {
string msg = 3;
}
-message ReassignServersRequest{
- int32 stageId = 1;
- int32 stageAttemptNumber = 2;
- int32 shuffleId = 3;
- int32 numPartitions = 4;
-}
-
-message ReassignServersResponse {
- StatusCode status = 1;
- bool needReassign = 2;
- string msg = 3;
-}
-
message RssReassignOnBlockSendFailureRequest {
int32 shuffleId = 1;
map<int32, ReceivingFailureServers> failurePartitionToServerIds = 2;
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 79206492a..227e92028 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -97,6 +97,7 @@ import
org.apache.uniffle.proto.RssProtos.ShuffleRegisterRequest;
import org.apache.uniffle.proto.RssProtos.ShuffleRegisterResponse;
import org.apache.uniffle.proto.ShuffleServerGrpc.ShuffleServerImplBase;
import org.apache.uniffle.server.audit.ServerRpcAuditContext;
+import org.apache.uniffle.server.block.ShuffleBlockIdManager;
import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
import org.apache.uniffle.server.merge.MergeStatus;
import org.apache.uniffle.storage.common.Storage;
@@ -238,9 +239,9 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
ShuffleRegisterResponse reply;
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
+ int stageAttemptNumber = req.getStageAttemptNumber();
String remoteStoragePath = req.getRemoteStorage().getPath();
String user = req.getUser();
- int stageAttemptNumber = req.getStageAttemptNumber();
auditContext.withAppId(appId).withShuffleId(shuffleId);
auditContext.withArgs(
"remoteStoragePath="
@@ -251,27 +252,51 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
+ stageAttemptNumber);
// If the Stage is registered for the first time, you do not need to
consider the Stage retry
// and delete the Block data that has been sent.
- if (stageAttemptNumber > 0) {
- ShuffleTaskInfo taskInfo =
shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(appId);
+ ShuffleTaskInfo taskInfo =
shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(appId);
+ if (taskInfo != null) {
// Prevents AttemptNumber of multiple stages from modifying the latest
AttemptNumber.
synchronized (taskInfo) {
- int attemptNumber = taskInfo.getLatestStageAttemptNumber(shuffleId);
- if (stageAttemptNumber > attemptNumber) {
+ int lastAttemptNumber =
taskInfo.getLatestStageAttemptNumber(shuffleId);
+ if (stageAttemptNumber > 0 && stageAttemptNumber >
lastAttemptNumber) {
taskInfo.refreshLatestStageAttemptNumber(shuffleId,
stageAttemptNumber);
try {
long start = System.currentTimeMillis();
shuffleServer.getShuffleTaskManager().removeShuffleDataSync(appId, shuffleId);
LOG.info(
"Deleted the previous stage attempt data due to stage
recomputing for app: {}, "
- + "shuffleId: {}. It costs {} ms",
+ + "shuffleId: {}, stageAttemptNumber: {}. It costs {}
ms",
appId,
shuffleId,
+ lastAttemptNumber,
System.currentTimeMillis() - start);
+ // Add a check to prevent undeleted metadata.
+ ShuffleBlockIdManager shuffleBlockIdManager =
+ shuffleServer
+ .getShuffleTaskManager()
+ .getShuffleTaskInfo(appId)
+ .getShuffleBlockIdManager();
+ long blockCountByShuffleId =
+ shuffleBlockIdManager.getBlockCountByShuffleId(
+ appId, Lists.newArrayList(shuffleId));
+ if (blockCountByShuffleId != 0) {
+ LOG.error(
+ "Metadata is not deleted on clearing previous stage
attempt data for app: {}, shuffleId: {}, stageAttemptNumber: {}",
+ appId,
+ shuffleId,
+ lastAttemptNumber);
+ StatusCode code = StatusCode.INTERNAL_ERROR;
+ auditContext.withStatusCode(code);
+ reply =
ShuffleRegisterResponse.newBuilder().setStatus(code.toProto()).build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ return;
+ }
} catch (Exception e) {
LOG.error(
- "Errors on clearing previous stage attempt data for app: {},
shuffleId: {}",
+ "Errors on clearing previous stage attempt data for app: {},
shuffleId: {}, stageAttemptNumber: {}",
appId,
shuffleId,
+ lastAttemptNumber,
e);
StatusCode code = StatusCode.INTERNAL_ERROR;
auditContext.withStatusCode(code);
@@ -280,7 +305,9 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
responseObserver.onCompleted();
return;
}
- } else if (stageAttemptNumber < attemptNumber) {
+ } else if (stageAttemptNumber > 0 && stageAttemptNumber <=
lastAttemptNumber) {
+ LOG.info(
+ "The registration failed. The latest retry count is smaller
than the existing retry count. This situation should not exist.");
// When a Stage retry occurs, the first or last registration of a
Stage may need to be
// ignored and the ignored status quickly returned.
StatusCode code = StatusCode.STAGE_RETRY_IGNORE;
@@ -325,6 +352,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
.registerShuffle(
appId,
shuffleId,
+ stageAttemptNumber,
partitionRanges,
new RemoteStorageInfo(remoteStoragePath, remoteStorageConf),
user,
@@ -342,6 +370,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
.registerShuffle(
appId + MERGE_APP_SUFFIX,
shuffleId,
+ stageAttemptNumber,
partitionRanges,
new RemoteStorageInfo(remoteStoragePath,
remoteStorageConf),
user,
@@ -849,7 +878,6 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
int bitmapNum = request.getBitmapNum();
Map<Integer, long[]> partitionToBlockIds =
toPartitionBlocksMap(request.getPartitionToBlockIdsList());
-
auditContext.withAppId(appId).withShuffleId(shuffleId);
auditContext.withArgs(
"taskAttemptId="
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index 6b2a89554..687622bb0 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -284,7 +284,7 @@ public class ShuffleTaskInfo {
}
public Integer getLatestStageAttemptNumber(int shuffleId) {
- return latestStageAttemptNumbers.computeIfAbsent(shuffleId, key -> 0);
+ return latestStageAttemptNumbers.getOrDefault(shuffleId, 0);
}
public void refreshLatestStageAttemptNumber(int shuffleId, int
stageAttemptNumber) {
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 3f753eda8..87bf31bac 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -301,6 +301,7 @@ public class ShuffleTaskManager {
return registerShuffle(
appId,
shuffleId,
+ 0,
partitionRanges,
remoteStorageInfo,
user,
@@ -312,6 +313,7 @@ public class ShuffleTaskManager {
public StatusCode registerShuffle(
String appId,
int shuffleId,
+ int stageAttemptNumber,
List<PartitionRange> partitionRanges,
RemoteStorageInfo remoteStorageInfo,
String user,
@@ -333,7 +335,7 @@ public class ShuffleTaskManager {
.dataDistributionType(dataDistType)
.build());
taskInfo.setShuffleBlockIdManagerIfNeeded(shuffleBlockIdManager);
-
+ taskInfo.refreshLatestStageAttemptNumber(shuffleId, stageAttemptNumber);
taskInfo.getShuffleBlockIdManager().registerAppId(appId);
for (PartitionRange partitionRange : partitionRanges) {
shuffleBufferManager.registerBuffer(
diff --git
a/server/src/main/java/org/apache/uniffle/server/block/DefaultShuffleBlockIdManager.java
b/server/src/main/java/org/apache/uniffle/server/block/DefaultShuffleBlockIdManager.java
index 6487fd582..3ddd4e81a 100644
---
a/server/src/main/java/org/apache/uniffle/server/block/DefaultShuffleBlockIdManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/block/DefaultShuffleBlockIdManager.java
@@ -208,6 +208,18 @@ public class DefaultShuffleBlockIdManager implements
ShuffleBlockIdManager {
.sum();
}
+ @Override
+ public long getBlockCountByShuffleId(String appId, List<Integer> shuffleIds)
{
+
+ return partitionsToBlockIds.values().stream()
+ .filter(k -> shuffleIds.contains(k.keySet()))
+ .flatMap(innerMap -> innerMap.values().stream())
+ .flatMapToLong(
+ arr ->
+
java.util.Arrays.stream(arr).mapToLong(Roaring64NavigableMap::getLongCardinality))
+ .sum();
+ }
+
@Override
public boolean contains(String appId) {
return partitionsToBlockIds.containsKey(appId);
diff --git
a/server/src/main/java/org/apache/uniffle/server/block/PartitionedShuffleBlockIdManager.java
b/server/src/main/java/org/apache/uniffle/server/block/PartitionedShuffleBlockIdManager.java
index 5e56fee83..f31ff8fc1 100644
---
a/server/src/main/java/org/apache/uniffle/server/block/PartitionedShuffleBlockIdManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/block/PartitionedShuffleBlockIdManager.java
@@ -147,6 +147,17 @@ public class PartitionedShuffleBlockIdManager implements
ShuffleBlockIdManager {
.sum();
}
+ @Override
+ public long getBlockCountByShuffleId(String appId, List<Integer> shuffleIds)
{
+
+ return partitionsToBlockIds.values().stream()
+ .filter(k -> shuffleIds.contains(k.keySet()))
+ .flatMap(innerMap -> innerMap.values().stream())
+ .flatMap(innerMap -> innerMap.values().stream())
+ .mapToLong(roaringMap -> roaringMap.getLongCardinality())
+ .sum();
+ }
+
@Override
public boolean contains(String appId) {
return partitionsToBlockIds.containsKey(appId);
diff --git
a/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManager.java
b/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManager.java
index 635e0c394..130be9afb 100644
---
a/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManager.java
@@ -50,6 +50,8 @@ public interface ShuffleBlockIdManager {
long getTotalBlockCount();
+ long getBlockCountByShuffleId(String appId, List<Integer> shuffleIds);
+
boolean contains(String testAppId);
long getBitmapNum(String appId, int shuffleId);
diff --git
a/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
b/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
index a25e1fd45..5a81b3b83 100644
---
a/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
+++
b/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java
@@ -128,8 +128,17 @@ public class MockedShuffleServerGrpcService extends
ShuffleServerGrpcService {
RssProtos.SendShuffleDataRequest request,
StreamObserver<RssProtos.SendShuffleDataResponse> responseObserver) {
if (mockSendDataFailed) {
- LOG.info("Add a mocked sendData failed on sendShuffleData");
- throw new RuntimeException("This write request is failed as mocked
failureļ¼");
+ RssProtos.SendShuffleDataResponse reply;
+ String errorMsg = "This write request is failed as mocked failureļ¼";
+ LOG.warn(errorMsg);
+ reply =
+ RssProtos.SendShuffleDataResponse.newBuilder()
+ .setStatus(StatusCode.INTERNAL_ERROR.toProto())
+ .setRetMsg(errorMsg)
+ .build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ return;
}
if (mockSendDataFailedStageNumber == request.getStageAttemptNumber()) {
LOG.info(