This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 7a9e7cf13 [#2457] feat(spark): Introducing shuffle-server data push
statistics (#2458)
7a9e7cf13 is described below
commit 7a9e7cf133a38f3a81b47510cf299a954e8126b1
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Apr 23 10:11:34 2025 +0800
[#2457] feat(spark): Introducing shuffle-server data push statistics (#2458)
### What changes were proposed in this pull request?
Introducing shuffle-server data push statistics in writer
### Why are the changes needed?
Sometimes we found some jobs are slow due to the underlying one server
slow, but under the current codebase, it's hard to found it. This PR is to
track the shuffle-servers speed to output.
BTW, based on this PR, I will introduce the uniffle spark UI for easier and
better observability.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal tests.
---
.../apache/spark/shuffle/writer/AddBlockEvent.java | 13 +++-
.../apache/spark/shuffle/writer/DataPusher.java | 8 ++
.../spark/shuffle/writer/WriteBufferManager.java | 15 +++-
.../spark/shuffle/writer/DataPusherTest.java | 4 +-
.../spark/shuffle/writer/RssShuffleWriter.java | 1 +
.../uniffle/client/impl/ShuffleServerPushCost.java | 78 +++++++++++++++++++
.../client/impl/ShuffleServerPushCostTracker.java | 91 ++++++++++++++++++++++
.../client/impl/ShuffleWriteClientImpl.java | 29 +++++--
.../client/response/SendShuffleDataResult.java | 14 ++++
9 files changed, 240 insertions(+), 13 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/AddBlockEvent.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/AddBlockEvent.java
index f989fdb0b..24df3e6d1 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/AddBlockEvent.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/AddBlockEvent.java
@@ -28,17 +28,22 @@ public class AddBlockEvent {
private int stageAttemptNumber;
private List<ShuffleBlockInfo> shuffleDataInfoList;
private List<Runnable> processedCallbackChain;
+ private WriteBufferManager bufferManager;
public AddBlockEvent(String taskId, List<ShuffleBlockInfo>
shuffleDataInfoList) {
- this(taskId, 0, shuffleDataInfoList);
+ this(taskId, 0, shuffleDataInfoList, null);
}
public AddBlockEvent(
- String taskId, int stageAttemptNumber, List<ShuffleBlockInfo>
shuffleDataInfoList) {
+ String taskId,
+ int stageAttemptNumber,
+ List<ShuffleBlockInfo> shuffleDataInfoList,
+ WriteBufferManager writeBufferManager) {
this.taskId = taskId;
this.stageAttemptNumber = stageAttemptNumber;
this.shuffleDataInfoList = shuffleDataInfoList;
this.processedCallbackChain = new ArrayList<>();
+ this.bufferManager = writeBufferManager;
}
/** @param callback, should not throw any exception and execute fast. */
@@ -62,6 +67,10 @@ public class AddBlockEvent {
return processedCallbackChain;
}
+ public WriteBufferManager getBufferManager() {
+ return bufferManager;
+ }
+
@Override
public String toString() {
return "AddBlockEvent: TaskId[" + taskId + "], " + shuffleDataInfoList;
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
index c55216d26..f14583654 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.impl.FailedBlockSendTracker;
+import org.apache.uniffle.client.impl.ShuffleServerPushCostTracker;
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.exception.RssException;
@@ -100,6 +101,13 @@ public class DataPusher implements Closeable {
putFailedBlockSendTracker(
taskToFailedBlockSendTracker, taskId,
result.getFailedBlockSendTracker());
} finally {
+ WriteBufferManager bufferManager = event.getBufferManager();
+ if (bufferManager != null) {
+ ShuffleServerPushCostTracker shuffleServerPushCostTracker =
+ result.getShuffleServerPushCostTracker();
+ bufferManager.merge(shuffleServerPushCostTracker);
+ }
+
Set<Long> succeedBlockIds = getSucceedBlockIds(result);
for (ShuffleBlockInfo block : shuffleBlockInfoList) {
block.executeCompletionCallback(succeedBlockIds.contains(block.getBlockId()));
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index cf4b4bc51..169bb20b1 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -47,6 +47,7 @@ import org.apache.spark.shuffle.RssSparkConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.client.impl.ShuffleServerPushCostTracker;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.compression.Codec;
@@ -105,6 +106,7 @@ public class WriteBufferManager extends MemoryConsumer {
private double bufferSpillRatio;
private Function<Integer, List<ShuffleServerInfo>>
partitionAssignmentRetrieveFunc;
private int stageAttemptNumber;
+ private ShuffleServerPushCostTracker shuffleServerPushCostTracker;
public WriteBufferManager(
int shuffleId,
@@ -200,6 +202,7 @@ public class WriteBufferManager extends MemoryConsumer {
this.blockIdLayout = BlockIdLayout.from(rssConf);
this.partitionAssignmentRetrieveFunc = partitionAssignmentRetrieveFunc;
this.stageAttemptNumber = stageAttemptNumber;
+ this.shuffleServerPushCostTracker = new ShuffleServerPushCostTracker();
}
public WriteBufferManager(
@@ -528,7 +531,7 @@ public class WriteBufferManager extends MemoryConsumer {
+ totalSize
+ " bytes");
}
- events.add(new AddBlockEvent(taskId, stageAttemptNumber,
shuffleBlockInfosPerEvent));
+ events.add(new AddBlockEvent(taskId, stageAttemptNumber,
shuffleBlockInfosPerEvent, this));
shuffleBlockInfosPerEvent = Lists.newArrayList();
totalSize = 0;
}
@@ -543,7 +546,7 @@ public class WriteBufferManager extends MemoryConsumer {
+ " bytes");
}
// Use final temporary variables for closures
- events.add(new AddBlockEvent(taskId, stageAttemptNumber,
shuffleBlockInfosPerEvent));
+ events.add(new AddBlockEvent(taskId, stageAttemptNumber,
shuffleBlockInfosPerEvent, this));
}
return events;
}
@@ -685,4 +688,12 @@ public class WriteBufferManager extends MemoryConsumer {
Function<Integer, List<ShuffleServerInfo>>
partitionAssignmentRetrieveFunc) {
this.partitionAssignmentRetrieveFunc = partitionAssignmentRetrieveFunc;
}
+
+ public void merge(ShuffleServerPushCostTracker shuffleServerPushCostTracker)
{
+ this.shuffleServerPushCostTracker.merge(shuffleServerPushCostTracker);
+ }
+
+ public ShuffleServerPushCostTracker getShuffleServerPushCostTracker() {
+ return shuffleServerPushCostTracker;
+ }
}
diff --git
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java
index 080ba1e33..8ac07664f 100644
---
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java
+++
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.Test;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.FailedBlockSendTracker;
+import org.apache.uniffle.client.impl.ShuffleServerPushCostTracker;
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.common.ShuffleBlockInfo;
@@ -114,7 +115,8 @@ public class DataPusherTest {
failedBlockSendTracker.add(
failedBlock2, new ShuffleServerInfo("host", 39998),
StatusCode.NO_BUFFER);
shuffleWriteClient.setFakedShuffleDataResult(
- new SendShuffleDataResult(Sets.newHashSet(1L, 2L),
failedBlockSendTracker));
+ new SendShuffleDataResult(
+ Sets.newHashSet(1L, 2L), failedBlockSendTracker, new
ShuffleServerPushCostTracker()));
ShuffleBlockInfo shuffleBlockInfo =
new ShuffleBlockInfo(1, 1, 1, 1, 1, new byte[1], null, 1, 100, 1);
AddBlockEvent event = new AddBlockEvent("taskId",
Arrays.asList(shuffleBlockInfo));
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 14c4a68af..1b40a4c07 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -368,6 +368,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
checkSentRecordCount(recordCount);
checkBlockSendResult(new HashSet<>(blockIds));
checkSentBlockCount();
+ bufferManager.getShuffleServerPushCostTracker().statistics();
long commitStartTs = System.currentTimeMillis();
long checkDuration = commitStartTs - checkStartTs;
if (!isMemoryShuffleEnabled) {
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleServerPushCost.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleServerPushCost.java
new file mode 100644
index 000000000..92383a1bf
--- /dev/null
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleServerPushCost.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ShuffleServerPushCost {
+ private final String shuffleServerId;
+ private final AtomicLong sentBytes;
+ private final AtomicLong sentDurationMs;
+
+ public ShuffleServerPushCost(String shuffleServerId) {
+ this.shuffleServerId = shuffleServerId;
+ this.sentBytes = new AtomicLong();
+ this.sentDurationMs = new AtomicLong();
+ }
+
+ public void incSentBytes(long bytes) {
+ this.sentBytes.addAndGet(bytes);
+ }
+
+ public void incDurationMs(long duration) {
+ this.sentDurationMs.addAndGet(duration);
+ }
+
+ public void merge(ShuffleServerPushCost cost) {
+ if (!cost.shuffleServerId.equals(this.shuffleServerId)) {
+ return;
+ }
+
+ this.incSentBytes(cost.sentBytes.get());
+ this.incDurationMs(cost.sentDurationMs.get());
+ }
+
+ public long speed() {
+ if (sentDurationMs.get() == 0) {
+ return 0L;
+ }
+ return sentBytes.get() / sentDurationMs.get();
+ }
+
+ public long sentBytes() {
+ return sentBytes.get();
+ }
+
+ public long sentDurationMillis() {
+ return sentDurationMs.get();
+ }
+
+ @Override
+ public String toString() {
+ return "ShuffleServerPushCost{"
+ + "shuffleServerId='"
+ + shuffleServerId
+ + ", sentBytes="
+ + sentBytes
+ + ", sentDurationMs="
+ + sentDurationMs
+ + ", speed="
+ + speed()
+ + "}";
+ }
+}
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleServerPushCostTracker.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleServerPushCostTracker.java
new file mode 100644
index 000000000..f238c1073
--- /dev/null
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleServerPushCostTracker.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.impl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** This class is to track the underlying assigned shuffle servers' data
pushing speed. */
+public class ShuffleServerPushCostTracker {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ShuffleServerPushCostTracker.class);
+
+ // shuffleServerId -> ShuffleServerPushCost Object
+ private Map<String, ShuffleServerPushCost> tracking;
+
+ public ShuffleServerPushCostTracker() {
+ this.tracking = new ConcurrentHashMap<>();
+ }
+
+ public void merge(ShuffleServerPushCostTracker tracker) {
+ if (tracker == null) {
+ return;
+ }
+ for (Map.Entry<String, ShuffleServerPushCost> entry :
tracker.tracking.entrySet()) {
+ String id = entry.getKey();
+ ShuffleServerPushCost cost = entry.getValue();
+ this.tracking.computeIfAbsent(id, key -> new
ShuffleServerPushCost(key)).merge(cost);
+ }
+ }
+
+ public void record(String id, long sentBytes, long pushDuration) {
+ ShuffleServerPushCost cost =
+ this.tracking.computeIfAbsent(id, key -> new
ShuffleServerPushCost(key));
+ cost.incDurationMs(pushDuration);
+ cost.incSentBytes(sentBytes);
+ }
+
+ public void statistics() {
+ List<ShuffleServerPushCost> shuffleServerPushCosts = new
ArrayList<>(this.tracking.values());
+ if (CollectionUtils.isEmpty(shuffleServerPushCosts)) {
+ return;
+ }
+
+ Collections.sort(
+ shuffleServerPushCosts,
Comparator.comparingLong(ShuffleServerPushCost::speed));
+
+ LOGGER.info(
+ "Statistics of shuffle server push speed: \n"
+ + "-------------------------------------------"
+ + "\nMinimum: {} \nP25: {} \nMedian: {} \nP75: {} \nMaximum: {}\n"
+ + "-------------------------------------------",
+ shuffleServerPushCosts.isEmpty() ? 0 : shuffleServerPushCosts.get(0),
+ getPercentile(shuffleServerPushCosts, 25),
+ getPercentile(shuffleServerPushCosts, 50),
+ getPercentile(shuffleServerPushCosts, 75),
+ shuffleServerPushCosts.isEmpty()
+ ? 0
+ : shuffleServerPushCosts.get(shuffleServerPushCosts.size() - 1));
+ }
+
+ private ShuffleServerPushCost getPercentile(
+ List<ShuffleServerPushCost> costs, double percentile) {
+ if (costs.isEmpty()) {
+ return null;
+ }
+ int index = (int) Math.ceil(percentile / 100.0 * costs.size()) - 1;
+ return costs.get(Math.min(Math.max(index, 0), costs.size() - 1));
+ }
+}
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 62f9fd753..440237ee3 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -172,7 +172,8 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
Map<Long, AtomicInteger> blockIdsSendSuccessTracker,
FailedBlockSendTracker failedBlockSendTracker,
boolean allowFastFail,
- Supplier<Boolean> needCancelRequest) {
+ Supplier<Boolean> needCancelRequest,
+ ShuffleServerPushCostTracker shuffleServerPushCostTracker) {
if (serverToBlockIds == null) {
return true;
@@ -204,13 +205,11 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
long s = System.currentTimeMillis();
RssSendShuffleDataResponse response =
getShuffleServerClient(ssi).sendShuffleData(request);
-
+ long pushDuration = System.currentTimeMillis() - s;
String logMsg =
String.format(
"ShuffleWriteClientImpl sendShuffleData with %s
blocks to %s cost: %s(ms)",
- serverToBlockIds.get(ssi).size(),
- ssi.getId(),
- System.currentTimeMillis() - s);
+ serverToBlockIds.get(ssi).size(), ssi.getId(),
pushDuration);
if (response.getStatusCode() == StatusCode.SUCCESS) {
// mark a replica of block that has been sent
@@ -237,6 +236,16 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
"{}, it failed wth statusCode[{}]", logMsg,
response.getStatusCode());
return false;
}
+
+ // record shuffle-server push cost
+ long sentBytes =
+ shuffleIdToBlocks.values().stream()
+ .flatMap(x -> x.values().stream())
+ .flatMap(x -> x.stream())
+ .map(x -> x.getLength())
+ .reduce((a, b) -> a + b)
+ .orElse(0);
+ shuffleServerPushCostTracker.record(ssi.getId(),
sentBytes, pushDuration);
} catch (Exception e) {
recordFailedBlocks(
failedBlockSendTracker, serverToBlocks, ssi,
StatusCode.INTERNAL_ERROR);
@@ -425,6 +434,7 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
blockIdsSendSuccessTracker.computeIfAbsent(
block, id -> new AtomicInteger(0))));
FailedBlockSendTracker blockIdsSendFailTracker = new
FailedBlockSendTracker();
+ ShuffleServerPushCostTracker shuffleServerPushCostTracker = new
ShuffleServerPushCostTracker();
// sent the primary round of blocks.
boolean isAllSuccess =
@@ -436,7 +446,8 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
blockIdsSendSuccessTracker,
blockIdsSendFailTracker,
secondaryServerToBlocks.isEmpty(),
- needCancelRequest);
+ needCancelRequest,
+ shuffleServerPushCostTracker);
// The secondary round of blocks is sent only when the primary group
issues failed sending.
// This should be infrequent.
@@ -453,7 +464,8 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
blockIdsSendSuccessTracker,
blockIdsSendFailTracker,
true,
- needCancelRequest);
+ needCancelRequest,
+ shuffleServerPushCostTracker);
}
Set<Long> blockIdsSendSuccessSet = Sets.newHashSet();
@@ -470,7 +482,8 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
blockIdsSendFailTracker.remove(successBlockId.getKey());
}
});
- return new SendShuffleDataResult(blockIdsSendSuccessSet,
blockIdsSendFailTracker);
+ return new SendShuffleDataResult(
+ blockIdsSendSuccessSet, blockIdsSendFailTracker,
shuffleServerPushCostTracker);
}
/**
diff --git
a/client/src/main/java/org/apache/uniffle/client/response/SendShuffleDataResult.java
b/client/src/main/java/org/apache/uniffle/client/response/SendShuffleDataResult.java
index 595de2931..31bd76be0 100644
---
a/client/src/main/java/org/apache/uniffle/client/response/SendShuffleDataResult.java
+++
b/client/src/main/java/org/apache/uniffle/client/response/SendShuffleDataResult.java
@@ -20,16 +20,26 @@ package org.apache.uniffle.client.response;
import java.util.Set;
import org.apache.uniffle.client.impl.FailedBlockSendTracker;
+import org.apache.uniffle.client.impl.ShuffleServerPushCostTracker;
public class SendShuffleDataResult {
private Set<Long> successBlockIds;
private FailedBlockSendTracker failedBlockSendTracker;
+ private ShuffleServerPushCostTracker shuffleServerPushCostTracker;
public SendShuffleDataResult(
Set<Long> successBlockIds, FailedBlockSendTracker
failedBlockSendTracker) {
+ this(successBlockIds, failedBlockSendTracker, new
ShuffleServerPushCostTracker());
+ }
+
+ public SendShuffleDataResult(
+ Set<Long> successBlockIds,
+ FailedBlockSendTracker failedBlockSendTracker,
+ ShuffleServerPushCostTracker shuffleServerPushCostTracker) {
this.successBlockIds = successBlockIds;
this.failedBlockSendTracker = failedBlockSendTracker;
+ this.shuffleServerPushCostTracker = shuffleServerPushCostTracker;
}
public Set<Long> getSuccessBlockIds() {
@@ -43,4 +53,8 @@ public class SendShuffleDataResult {
public FailedBlockSendTracker getFailedBlockSendTracker() {
return failedBlockSendTracker;
}
+
+ public ShuffleServerPushCostTracker getShuffleServerPushCostTracker() {
+ return shuffleServerPushCostTracker;
+ }
}