This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new d9f535d9e [CELEBORN-1143][BUG] SortBasedPusher pushData should inc
memory spill metrics
d9f535d9e is described below
commit d9f535d9ef4970bdfc107b88b35286b9b7858de5
Author: Angerszhuuuu <[email protected]>
AuthorDate: Tue Nov 28 16:26:00 2023 +0800
[CELEBORN-1143][BUG] SortBasedPusher pushData should inc memory spill
metrics
### What changes were proposed in this pull request?
SortBasedPusher `pushData` should inc memory spill metrics
### Why are the changes needed?
Make metrics more acurate
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Closes #2117 from AngersZhuuuu/CELEBORN-1143.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit 662330923716d2b5fe70c960f04853063a9d737c)
Signed-off-by: Cheng Pan <[email protected]>
---
.../java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java | 8 +++++++-
.../org/apache/spark/shuffle/celeborn/SortBasedPusherSuiteJ.java | 7 +++++++
.../org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java | 2 ++
.../org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java | 2 ++
4 files changed, 18 insertions(+), 1 deletion(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
index 305f83bc2..aff8e7ce3 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java
@@ -24,6 +24,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
+import org.apache.spark.TaskContext;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.SparkOutOfMemoryError;
import org.apache.spark.memory.TaskMemoryManager;
@@ -57,6 +58,7 @@ public class SortBasedPusher extends MemoryConsumer {
private long pageCursor = -1;
private final ShuffleClient shuffleClient;
+ private final TaskContext taskContext;
private DataPusher dataPusher;
private final int pushBufferMaxSize;
private final long pushSortMemoryThreshold;
@@ -78,6 +80,7 @@ public class SortBasedPusher extends MemoryConsumer {
public SortBasedPusher(
TaskMemoryManager memoryManager,
ShuffleClient shuffleClient,
+ TaskContext taskContext,
int shuffleId,
int mapId,
int attemptNumber,
@@ -97,6 +100,7 @@ public class SortBasedPusher extends MemoryConsumer {
memoryManager.getTungstenMemoryMode());
this.shuffleClient = shuffleClient;
+ this.taskContext = taskContext;
this.shuffleId = shuffleId;
this.mapId = mapId;
@@ -213,6 +217,7 @@ public class SortBasedPusher extends MemoryConsumer {
long freedBytes = freeMemory();
inMemSorter.freeMemory();
+ taskContext.taskMetrics().incMemoryBytesSpilled(freedBytes);
return freedBytes;
}
@@ -434,11 +439,12 @@ public class SortBasedPusher extends MemoryConsumer {
}
public void cleanupResources() {
- freeMemory();
+ long freedBytes = freeMemory();
if (inMemSorter != null) {
inMemSorter.freeMemory();
inMemSorter = null;
}
+ taskContext.taskMetrics().incMemoryBytesSpilled(freedBytes);
}
public void close() throws IOException {
diff --git
a/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedPusherSuiteJ.java
b/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedPusherSuiteJ.java
index 5bdd704d0..bc86f9db7 100644
---
a/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedPusherSuiteJ.java
+++
b/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SortBasedPusherSuiteJ.java
@@ -27,6 +27,8 @@ import java.util.UUID;
import scala.collection.mutable.ListBuffer;
import org.apache.spark.SparkConf;
+import org.apache.spark.TaskContext$;
+import org.apache.spark.TaskContextImpl;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.memory.UnifiedMemoryManager;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -58,6 +60,8 @@ public class SortBasedPusherSuiteJ {
private final TaskMemoryManager taskMemoryManager =
new TaskMemoryManager(unifiedMemoryManager, 0);
+ private final TaskContextImpl taskContext = TaskContext$.MODULE$.empty();
+
private final File tempFile = new File(tempDir,
UUID.randomUUID().toString());
private static File tempDir = null;
@@ -82,6 +86,7 @@ public class SortBasedPusherSuiteJ {
new SortBasedPusher(
taskMemoryManager,
/*shuffleClient=*/ client,
+ /*taskContext=*/ taskContext,
/*shuffleId=*/ 0,
/*mapId=*/ 0,
/*attemptNumber=*/ 0,
@@ -125,6 +130,8 @@ public class SortBasedPusherSuiteJ {
row5k.getBaseObject(), row5k.getBaseOffset(),
row5k.getSizeInBytes(), 0, true));
pusher.close();
+
+ assertEquals(taskContext.taskMetrics().memoryBytesSpilled(), 2097152);
}
private static UnsafeRow genUnsafeRow(int size) {
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index 62c05858d..d9f09110e 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -133,6 +133,7 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
new SortBasedPusher(
taskContext.taskMemoryManager(),
shuffleClient,
+ taskContext,
shuffleId,
mapId,
taskContext.attemptNumber(),
@@ -153,6 +154,7 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
new SortBasedPusher(
taskContext.taskMemoryManager(),
shuffleClient,
+ taskContext,
shuffleId,
mapId,
taskContext.attemptNumber(),
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index 98c694306..1867e86c6 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -133,6 +133,7 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
new SortBasedPusher(
taskContext.taskMemoryManager(),
shuffleClient,
+ taskContext,
shuffleId,
mapId,
taskContext.attemptNumber(),
@@ -153,6 +154,7 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
new SortBasedPusher(
taskContext.taskMemoryManager(),
shuffleClient,
+ taskContext,
shuffleId,
mapId,
taskContext.attemptNumber(),