This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new d9f535d9e [CELEBORN-1143][BUG] SortBasedPusher pushData should inc 
memory spill metrics
d9f535d9e is described below

commit d9f535d9ef4970bdfc107b88b35286b9b7858de5
Author: Angerszhuuuu <[email protected]>
AuthorDate: Tue Nov 28 16:26:00 2023 +0800

    [CELEBORN-1143][BUG] SortBasedPusher pushData should inc memory spill 
metrics
    
    ### What changes were proposed in this pull request?
    SortBasedPusher `pushData` should inc memory spill metrics
    
    ### Why are the changes needed?
    Make metrics more acurate
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    
    Closes #2117 from AngersZhuuuu/CELEBORN-1143.
    
    Authored-by: Angerszhuuuu <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit 662330923716d2b5fe70c960f04853063a9d737c)
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java   | 8 +++++++-
 .../org/apache/spark/shuffle/celeborn/SortBasedPusherSuiteJ.java  | 7 +++++++
 .../org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java | 2 ++
 .../org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java | 2 ++
 4 files changed, 18 insertions(+), 1 deletion(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
index 305f83bc2..aff8e7ce3 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
@@ -24,6 +24,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Consumer;
 
+import org.apache.spark.TaskContext;
 import org.apache.spark.memory.MemoryConsumer;
 import org.apache.spark.memory.SparkOutOfMemoryError;
 import org.apache.spark.memory.TaskMemoryManager;
@@ -57,6 +58,7 @@ public class SortBasedPusher extends MemoryConsumer {
   private long pageCursor = -1;
 
   private final ShuffleClient shuffleClient;
+  private final TaskContext taskContext;
   private DataPusher dataPusher;
   private final int pushBufferMaxSize;
   private final long pushSortMemoryThreshold;
@@ -78,6 +80,7 @@ public class SortBasedPusher extends MemoryConsumer {
   public SortBasedPusher(
       TaskMemoryManager memoryManager,
       ShuffleClient shuffleClient,
+      TaskContext taskContext,
       int shuffleId,
       int mapId,
       int attemptNumber,
@@ -97,6 +100,7 @@ public class SortBasedPusher extends MemoryConsumer {
         memoryManager.getTungstenMemoryMode());
 
     this.shuffleClient = shuffleClient;
+    this.taskContext = taskContext;
 
     this.shuffleId = shuffleId;
     this.mapId = mapId;
@@ -213,6 +217,7 @@ public class SortBasedPusher extends MemoryConsumer {
 
       long freedBytes = freeMemory();
       inMemSorter.freeMemory();
+      taskContext.taskMetrics().incMemoryBytesSpilled(freedBytes);
 
       return freedBytes;
     }
@@ -434,11 +439,12 @@ public class SortBasedPusher extends MemoryConsumer {
   }
 
   public void cleanupResources() {
-    freeMemory();
+    long freedBytes = freeMemory();
     if (inMemSorter != null) {
       inMemSorter.freeMemory();
       inMemSorter = null;
     }
+    taskContext.taskMetrics().incMemoryBytesSpilled(freedBytes);
   }
 
   public void close() throws IOException {
diff --git 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedPusherSuiteJ.java
 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedPusherSuiteJ.java
index 5bdd704d0..bc86f9db7 100644
--- 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedPusherSuiteJ.java
+++ 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedPusherSuiteJ.java
@@ -27,6 +27,8 @@ import java.util.UUID;
 import scala.collection.mutable.ListBuffer;
 
 import org.apache.spark.SparkConf;
+import org.apache.spark.TaskContext$;
+import org.apache.spark.TaskContextImpl;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.memory.UnifiedMemoryManager;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -58,6 +60,8 @@ public class SortBasedPusherSuiteJ {
   private final TaskMemoryManager taskMemoryManager =
       new TaskMemoryManager(unifiedMemoryManager, 0);
 
+  private final TaskContextImpl taskContext = TaskContext$.MODULE$.empty();
+
   private final File tempFile = new File(tempDir, 
UUID.randomUUID().toString());
   private static File tempDir = null;
 
@@ -82,6 +86,7 @@ public class SortBasedPusherSuiteJ {
         new SortBasedPusher(
             taskMemoryManager,
             /*shuffleClient=*/ client,
+            /*taskContext=*/ taskContext,
             /*shuffleId=*/ 0,
             /*mapId=*/ 0,
             /*attemptNumber=*/ 0,
@@ -125,6 +130,8 @@ public class SortBasedPusherSuiteJ {
             row5k.getBaseObject(), row5k.getBaseOffset(), 
row5k.getSizeInBytes(), 0, true));
 
     pusher.close();
+
+    assertEquals(taskContext.taskMetrics().memoryBytesSpilled(), 2097152);
   }
 
   private static UnsafeRow genUnsafeRow(int size) {
diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index 62c05858d..d9f09110e 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -133,6 +133,7 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
             new SortBasedPusher(
                 taskContext.taskMemoryManager(),
                 shuffleClient,
+                taskContext,
                 shuffleId,
                 mapId,
                 taskContext.attemptNumber(),
@@ -153,6 +154,7 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
           new SortBasedPusher(
               taskContext.taskMemoryManager(),
               shuffleClient,
+              taskContext,
               shuffleId,
               mapId,
               taskContext.attemptNumber(),
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index 98c694306..1867e86c6 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -133,6 +133,7 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
             new SortBasedPusher(
                 taskContext.taskMemoryManager(),
                 shuffleClient,
+                taskContext,
                 shuffleId,
                 mapId,
                 taskContext.attemptNumber(),
@@ -153,6 +154,7 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
           new SortBasedPusher(
               taskContext.taskMemoryManager(),
               shuffleClient,
+              taskContext,
               shuffleId,
               mapId,
               taskContext.attemptNumber(),

Reply via email to