mridulm commented on code in PR #47776: URL: https://github.com/apache/spark/pull/47776#discussion_r1720593691
########## core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java: ########## @@ -122,6 +122,30 @@ public class TaskMemoryManager { */ private volatile long acquiredButNotUsed = 0L; + /** + * Current off heap memory usage by this task. + */ + private long currentOffHeapMemory = 0L; + + private final Object offHeapMemoryLock = new Object(); + + /* + * Current on heap memory usage by this task. + */ + private long currentOnHeapMemory = 0L; Review Comment: I would prefer removing duplication given this is already maintained in MemoryManager. The change will then become something like: In acquireExecutionMemory: ``` updatePeakStats(mode); return got; ``` ``` private void updatePeakStats(MemoryMode mode) { // this precondition should hold - though we dont need to necessarily assert it. assert(Thread.holdsLock(this)); if (mode == MemoryMode.OFF_HEAP) { peakOffHeapMemory = Math.max(peakOffHeapMemory, memoryManager.offHeapExecutionMemoryUsed); } else { peakOnHeapMemory = Math.max(peakOnHeapMemory, memoryManager.onHeapExecutionMemoryUsed); } } ``` In releaseExecutionMemory: ``` synchronized(this) { memoryManager.releaseExecutionMemory(size, taskAttemptId, consumer.getMode()); updatePeakStats(consumer.getMode()); } ``` Thoughts ? +CC @JoshRosen, @Ngone51. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org