This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a9e7cf13 [#2457] feat(spark): Introducing shuffle-server data push 
statistics (#2458)
7a9e7cf13 is described below

commit 7a9e7cf133a38f3a81b47510cf299a954e8126b1
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Apr 23 10:11:34 2025 +0800

    [#2457] feat(spark): Introducing shuffle-server data push statistics (#2458)
    
    ### What changes were proposed in this pull request?
    
    Introducing shuffle-server data push statistics in writer
    
    ### Why are the changes needed?
    
    Sometimes we found some jobs are slow due to the underlying one server 
slow, but under the current codebase, it's hard to found it. This PR is to 
track the shuffle-servers speed to output.
    
    BTW, based on this PR, I will introduce the uniffle spark UI for easier and 
better observability.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Internal tests.
---
 .../apache/spark/shuffle/writer/AddBlockEvent.java | 13 +++-
 .../apache/spark/shuffle/writer/DataPusher.java    |  8 ++
 .../spark/shuffle/writer/WriteBufferManager.java   | 15 +++-
 .../spark/shuffle/writer/DataPusherTest.java       |  4 +-
 .../spark/shuffle/writer/RssShuffleWriter.java     |  1 +
 .../uniffle/client/impl/ShuffleServerPushCost.java | 78 +++++++++++++++++++
 .../client/impl/ShuffleServerPushCostTracker.java  | 91 ++++++++++++++++++++++
 .../client/impl/ShuffleWriteClientImpl.java        | 29 +++++--
 .../client/response/SendShuffleDataResult.java     | 14 ++++
 9 files changed, 240 insertions(+), 13 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/AddBlockEvent.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/AddBlockEvent.java
index f989fdb0b..24df3e6d1 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/AddBlockEvent.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/AddBlockEvent.java
@@ -28,17 +28,22 @@ public class AddBlockEvent {
   private int stageAttemptNumber;
   private List<ShuffleBlockInfo> shuffleDataInfoList;
   private List<Runnable> processedCallbackChain;
+  private WriteBufferManager bufferManager;
 
   public AddBlockEvent(String taskId, List<ShuffleBlockInfo> 
shuffleDataInfoList) {
-    this(taskId, 0, shuffleDataInfoList);
+    this(taskId, 0, shuffleDataInfoList, null);
   }
 
   public AddBlockEvent(
-      String taskId, int stageAttemptNumber, List<ShuffleBlockInfo> 
shuffleDataInfoList) {
+      String taskId,
+      int stageAttemptNumber,
+      List<ShuffleBlockInfo> shuffleDataInfoList,
+      WriteBufferManager writeBufferManager) {
     this.taskId = taskId;
     this.stageAttemptNumber = stageAttemptNumber;
     this.shuffleDataInfoList = shuffleDataInfoList;
     this.processedCallbackChain = new ArrayList<>();
+    this.bufferManager = writeBufferManager;
   }
 
   /** @param callback, should not throw any exception and execute fast. */
@@ -62,6 +67,10 @@ public class AddBlockEvent {
     return processedCallbackChain;
   }
 
+  public WriteBufferManager getBufferManager() {
+    return bufferManager;
+  }
+
   @Override
   public String toString() {
     return "AddBlockEvent: TaskId[" + taskId + "], " + shuffleDataInfoList;
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
index c55216d26..f14583654 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleWriteClient;
 import org.apache.uniffle.client.impl.FailedBlockSendTracker;
+import org.apache.uniffle.client.impl.ShuffleServerPushCostTracker;
 import org.apache.uniffle.client.response.SendShuffleDataResult;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.exception.RssException;
@@ -100,6 +101,13 @@ public class DataPusher implements Closeable {
                 putFailedBlockSendTracker(
                     taskToFailedBlockSendTracker, taskId, 
result.getFailedBlockSendTracker());
               } finally {
+                WriteBufferManager bufferManager = event.getBufferManager();
+                if (bufferManager != null) {
+                  ShuffleServerPushCostTracker shuffleServerPushCostTracker =
+                      result.getShuffleServerPushCostTracker();
+                  bufferManager.merge(shuffleServerPushCostTracker);
+                }
+
                 Set<Long> succeedBlockIds = getSucceedBlockIds(result);
                 for (ShuffleBlockInfo block : shuffleBlockInfoList) {
                   
block.executeCompletionCallback(succeedBlockIds.contains(block.getBlockId()));
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index cf4b4bc51..169bb20b1 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -47,6 +47,7 @@ import org.apache.spark.shuffle.RssSparkConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.client.impl.ShuffleServerPushCostTracker;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.compression.Codec;
@@ -105,6 +106,7 @@ public class WriteBufferManager extends MemoryConsumer {
   private double bufferSpillRatio;
   private Function<Integer, List<ShuffleServerInfo>> 
partitionAssignmentRetrieveFunc;
   private int stageAttemptNumber;
+  private ShuffleServerPushCostTracker shuffleServerPushCostTracker;
 
   public WriteBufferManager(
       int shuffleId,
@@ -200,6 +202,7 @@ public class WriteBufferManager extends MemoryConsumer {
     this.blockIdLayout = BlockIdLayout.from(rssConf);
     this.partitionAssignmentRetrieveFunc = partitionAssignmentRetrieveFunc;
     this.stageAttemptNumber = stageAttemptNumber;
+    this.shuffleServerPushCostTracker = new ShuffleServerPushCostTracker();
   }
 
   public WriteBufferManager(
@@ -528,7 +531,7 @@ public class WriteBufferManager extends MemoryConsumer {
                   + totalSize
                   + " bytes");
         }
-        events.add(new AddBlockEvent(taskId, stageAttemptNumber, 
shuffleBlockInfosPerEvent));
+        events.add(new AddBlockEvent(taskId, stageAttemptNumber, 
shuffleBlockInfosPerEvent, this));
         shuffleBlockInfosPerEvent = Lists.newArrayList();
         totalSize = 0;
       }
@@ -543,7 +546,7 @@ public class WriteBufferManager extends MemoryConsumer {
                 + " bytes");
       }
       // Use final temporary variables for closures
-      events.add(new AddBlockEvent(taskId, stageAttemptNumber, 
shuffleBlockInfosPerEvent));
+      events.add(new AddBlockEvent(taskId, stageAttemptNumber, 
shuffleBlockInfosPerEvent, this));
     }
     return events;
   }
@@ -685,4 +688,12 @@ public class WriteBufferManager extends MemoryConsumer {
       Function<Integer, List<ShuffleServerInfo>> 
partitionAssignmentRetrieveFunc) {
     this.partitionAssignmentRetrieveFunc = partitionAssignmentRetrieveFunc;
   }
+
+  public void merge(ShuffleServerPushCostTracker shuffleServerPushCostTracker) 
{
+    this.shuffleServerPushCostTracker.merge(shuffleServerPushCostTracker);
+  }
+
+  public ShuffleServerPushCostTracker getShuffleServerPushCostTracker() {
+    return shuffleServerPushCostTracker;
+  }
 }
diff --git 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java
 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java
index 080ba1e33..8ac07664f 100644
--- 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java
+++ 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/DataPusherTest.java
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.Test;
 
 import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.client.impl.FailedBlockSendTracker;
+import org.apache.uniffle.client.impl.ShuffleServerPushCostTracker;
 import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
 import org.apache.uniffle.client.response.SendShuffleDataResult;
 import org.apache.uniffle.common.ShuffleBlockInfo;
@@ -114,7 +115,8 @@ public class DataPusherTest {
     failedBlockSendTracker.add(
         failedBlock2, new ShuffleServerInfo("host", 39998), 
StatusCode.NO_BUFFER);
     shuffleWriteClient.setFakedShuffleDataResult(
-        new SendShuffleDataResult(Sets.newHashSet(1L, 2L), 
failedBlockSendTracker));
+        new SendShuffleDataResult(
+            Sets.newHashSet(1L, 2L), failedBlockSendTracker, new 
ShuffleServerPushCostTracker()));
     ShuffleBlockInfo shuffleBlockInfo =
         new ShuffleBlockInfo(1, 1, 1, 1, 1, new byte[1], null, 1, 100, 1);
     AddBlockEvent event = new AddBlockEvent("taskId", 
Arrays.asList(shuffleBlockInfo));
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 14c4a68af..1b40a4c07 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -368,6 +368,7 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     checkSentRecordCount(recordCount);
     checkBlockSendResult(new HashSet<>(blockIds));
     checkSentBlockCount();
+    bufferManager.getShuffleServerPushCostTracker().statistics();
     long commitStartTs = System.currentTimeMillis();
     long checkDuration = commitStartTs - checkStartTs;
     if (!isMemoryShuffleEnabled) {
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleServerPushCost.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleServerPushCost.java
new file mode 100644
index 000000000..92383a1bf
--- /dev/null
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleServerPushCost.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ShuffleServerPushCost {
+  private final String shuffleServerId;
+  private final AtomicLong sentBytes;
+  private final AtomicLong sentDurationMs;
+
+  public ShuffleServerPushCost(String shuffleServerId) {
+    this.shuffleServerId = shuffleServerId;
+    this.sentBytes = new AtomicLong();
+    this.sentDurationMs = new AtomicLong();
+  }
+
+  public void incSentBytes(long bytes) {
+    this.sentBytes.addAndGet(bytes);
+  }
+
+  public void incDurationMs(long duration) {
+    this.sentDurationMs.addAndGet(duration);
+  }
+
+  public void merge(ShuffleServerPushCost cost) {
+    if (!cost.shuffleServerId.equals(this.shuffleServerId)) {
+      return;
+    }
+
+    this.incSentBytes(cost.sentBytes.get());
+    this.incDurationMs(cost.sentDurationMs.get());
+  }
+
+  public long speed() {
+    if (sentDurationMs.get() == 0) {
+      return 0L;
+    }
+    return sentBytes.get() / sentDurationMs.get();
+  }
+
+  public long sentBytes() {
+    return sentBytes.get();
+  }
+
+  public long sentDurationMillis() {
+    return sentDurationMs.get();
+  }
+
+  @Override
+  public String toString() {
+    return "ShuffleServerPushCost{"
+        + "shuffleServerId='"
+        + shuffleServerId
+        + ", sentBytes="
+        + sentBytes
+        + ", sentDurationMs="
+        + sentDurationMs
+        + ", speed="
+        + speed()
+        + "}";
+  }
+}
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleServerPushCostTracker.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleServerPushCostTracker.java
new file mode 100644
index 000000000..f238c1073
--- /dev/null
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleServerPushCostTracker.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.impl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** This class is to track the underlying assigned shuffle servers' data 
pushing speed. */
+public class ShuffleServerPushCostTracker {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ShuffleServerPushCostTracker.class);
+
+  // shuffleServerId -> ShuffleServerPushCost Object
+  private Map<String, ShuffleServerPushCost> tracking;
+
+  public ShuffleServerPushCostTracker() {
+    this.tracking = new ConcurrentHashMap<>();
+  }
+
+  public void merge(ShuffleServerPushCostTracker tracker) {
+    if (tracker == null) {
+      return;
+    }
+    for (Map.Entry<String, ShuffleServerPushCost> entry : 
tracker.tracking.entrySet()) {
+      String id = entry.getKey();
+      ShuffleServerPushCost cost = entry.getValue();
+      this.tracking.computeIfAbsent(id, key -> new 
ShuffleServerPushCost(key)).merge(cost);
+    }
+  }
+
+  public void record(String id, long sentBytes, long pushDuration) {
+    ShuffleServerPushCost cost =
+        this.tracking.computeIfAbsent(id, key -> new 
ShuffleServerPushCost(key));
+    cost.incDurationMs(pushDuration);
+    cost.incSentBytes(sentBytes);
+  }
+
+  public void statistics() {
+    List<ShuffleServerPushCost> shuffleServerPushCosts = new 
ArrayList<>(this.tracking.values());
+    if (CollectionUtils.isEmpty(shuffleServerPushCosts)) {
+      return;
+    }
+
+    Collections.sort(
+        shuffleServerPushCosts, 
Comparator.comparingLong(ShuffleServerPushCost::speed));
+
+    LOGGER.info(
+        "Statistics of shuffle server push speed: \n"
+            + "-------------------------------------------"
+            + "\nMinimum: {} \nP25: {} \nMedian: {} \nP75: {} \nMaximum: {}\n"
+            + "-------------------------------------------",
+        shuffleServerPushCosts.isEmpty() ? 0 : shuffleServerPushCosts.get(0),
+        getPercentile(shuffleServerPushCosts, 25),
+        getPercentile(shuffleServerPushCosts, 50),
+        getPercentile(shuffleServerPushCosts, 75),
+        shuffleServerPushCosts.isEmpty()
+            ? 0
+            : shuffleServerPushCosts.get(shuffleServerPushCosts.size() - 1));
+  }
+
+  private ShuffleServerPushCost getPercentile(
+      List<ShuffleServerPushCost> costs, double percentile) {
+    if (costs.isEmpty()) {
+      return null;
+    }
+    int index = (int) Math.ceil(percentile / 100.0 * costs.size()) - 1;
+    return costs.get(Math.min(Math.max(index, 0), costs.size() - 1));
+  }
+}
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 62f9fd753..440237ee3 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -172,7 +172,8 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
       Map<Long, AtomicInteger> blockIdsSendSuccessTracker,
       FailedBlockSendTracker failedBlockSendTracker,
       boolean allowFastFail,
-      Supplier<Boolean> needCancelRequest) {
+      Supplier<Boolean> needCancelRequest,
+      ShuffleServerPushCostTracker shuffleServerPushCostTracker) {
 
     if (serverToBlockIds == null) {
       return true;
@@ -204,13 +205,11 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
                       long s = System.currentTimeMillis();
                       RssSendShuffleDataResponse response =
                           getShuffleServerClient(ssi).sendShuffleData(request);
-
+                      long pushDuration = System.currentTimeMillis() - s;
                       String logMsg =
                           String.format(
                               "ShuffleWriteClientImpl sendShuffleData with %s 
blocks to %s cost: %s(ms)",
-                              serverToBlockIds.get(ssi).size(),
-                              ssi.getId(),
-                              System.currentTimeMillis() - s);
+                              serverToBlockIds.get(ssi).size(), ssi.getId(), 
pushDuration);
 
                       if (response.getStatusCode() == StatusCode.SUCCESS) {
                         // mark a replica of block that has been sent
@@ -237,6 +236,16 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
                             "{}, it failed wth statusCode[{}]", logMsg, 
response.getStatusCode());
                         return false;
                       }
+
+                      // record shuffle-server push cost
+                      long sentBytes =
+                          shuffleIdToBlocks.values().stream()
+                              .flatMap(x -> x.values().stream())
+                              .flatMap(x -> x.stream())
+                              .map(x -> x.getLength())
+                              .reduce((a, b) -> a + b)
+                              .orElse(0);
+                      shuffleServerPushCostTracker.record(ssi.getId(), 
sentBytes, pushDuration);
                     } catch (Exception e) {
                       recordFailedBlocks(
                           failedBlockSendTracker, serverToBlocks, ssi, 
StatusCode.INTERNAL_ERROR);
@@ -425,6 +434,7 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
                         blockIdsSendSuccessTracker.computeIfAbsent(
                             block, id -> new AtomicInteger(0))));
     FailedBlockSendTracker blockIdsSendFailTracker = new 
FailedBlockSendTracker();
+    ShuffleServerPushCostTracker shuffleServerPushCostTracker = new 
ShuffleServerPushCostTracker();
 
     // sent the primary round of blocks.
     boolean isAllSuccess =
@@ -436,7 +446,8 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
             blockIdsSendSuccessTracker,
             blockIdsSendFailTracker,
             secondaryServerToBlocks.isEmpty(),
-            needCancelRequest);
+            needCancelRequest,
+            shuffleServerPushCostTracker);
 
     // The secondary round of blocks is sent only when the primary group 
issues failed sending.
     // This should be infrequent.
@@ -453,7 +464,8 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
           blockIdsSendSuccessTracker,
           blockIdsSendFailTracker,
           true,
-          needCancelRequest);
+          needCancelRequest,
+          shuffleServerPushCostTracker);
     }
 
     Set<Long> blockIdsSendSuccessSet = Sets.newHashSet();
@@ -470,7 +482,8 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
                 blockIdsSendFailTracker.remove(successBlockId.getKey());
               }
             });
-    return new SendShuffleDataResult(blockIdsSendSuccessSet, 
blockIdsSendFailTracker);
+    return new SendShuffleDataResult(
+        blockIdsSendSuccessSet, blockIdsSendFailTracker, 
shuffleServerPushCostTracker);
   }
 
   /**
diff --git 
a/client/src/main/java/org/apache/uniffle/client/response/SendShuffleDataResult.java
 
b/client/src/main/java/org/apache/uniffle/client/response/SendShuffleDataResult.java
index 595de2931..31bd76be0 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/response/SendShuffleDataResult.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/response/SendShuffleDataResult.java
@@ -20,16 +20,26 @@ package org.apache.uniffle.client.response;
 import java.util.Set;
 
 import org.apache.uniffle.client.impl.FailedBlockSendTracker;
+import org.apache.uniffle.client.impl.ShuffleServerPushCostTracker;
 
 public class SendShuffleDataResult {
 
   private Set<Long> successBlockIds;
   private FailedBlockSendTracker failedBlockSendTracker;
+  private ShuffleServerPushCostTracker shuffleServerPushCostTracker;
 
   public SendShuffleDataResult(
       Set<Long> successBlockIds, FailedBlockSendTracker 
failedBlockSendTracker) {
+    this(successBlockIds, failedBlockSendTracker, new 
ShuffleServerPushCostTracker());
+  }
+
+  public SendShuffleDataResult(
+      Set<Long> successBlockIds,
+      FailedBlockSendTracker failedBlockSendTracker,
+      ShuffleServerPushCostTracker shuffleServerPushCostTracker) {
     this.successBlockIds = successBlockIds;
     this.failedBlockSendTracker = failedBlockSendTracker;
+    this.shuffleServerPushCostTracker = shuffleServerPushCostTracker;
   }
 
   public Set<Long> getSuccessBlockIds() {
@@ -43,4 +53,8 @@ public class SendShuffleDataResult {
   public FailedBlockSendTracker getFailedBlockSendTracker() {
     return failedBlockSendTracker;
   }
+
+  public ShuffleServerPushCostTracker getShuffleServerPushCostTracker() {
+    return shuffleServerPushCostTracker;
+  }
 }

Reply via email to