cloud-fan commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1428631597


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -846,11 +850,49 @@ private[spark] class TaskSetManager(
     // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before 
reaching here.
     // Note: "result.value()" only deserializes the value when it's called at 
the first time, so
     // here "result.value()" just returns the value and won't block other 
threads.
-    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), 
result.accumUpdates,
-      result.metricPeaks, info)
+
+    emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), Success, 
result.value(),
+      result.accumUpdates, result.metricPeaks, info)
     maybeFinishTaskSet()
   }
 
+  /**
+   * A wrapper around [[DAGScheduler.taskEnded()]] that empties out the 
accumulables for the
+   * TaskInfo object, corresponding to the completed task, referenced by this 
class.
+   *
+   * SPARK-46383: For the completed task, we ship the original TaskInfo to the 
DAGScheduler and only
+   * retain a cloned TaskInfo in this class. We then set the accumulables to 
Nil for the TaskInfo
+   * object that corresponds to the completed task.
+   * We do this to release references to `TaskInfo.accumulables()` as the 
TaskInfo
+   * objects held by this class are long-lived and have a heavy memory 
footprint on the driver.
+   *
+   * This is safe as the TaskInfo accumulables are not needed once they are 
shipped to the
+   * DAGScheduler where they are aggregated. Additionally, the original 
TaskInfo, and not a
+   * clone, must be sent to the DAGScheduler as this TaskInfo object is sent 
to the
+   * DAGScheduler on multiple events during the task's lifetime. Users can 
install
+   * SparkListeners that compare the TaskInfo objects across these 
SparkListener events and
+   * thus the TaskInfo object sent to the DAGScheduler must always reference 
the same TaskInfo
+   * object.
+   */
+  private def emptyTaskInfoAccumulablesAndNotifyDagScheduler(
+      taskId: Long,
+      task: Task[_],
+      reason: TaskEndReason,
+      result: Any,
+      accumUpdates: Seq[AccumulatorV2[_, _]],
+      metricPeaks: Array[Long],
+      taskInfo: TaskInfo): Unit = {
+    val index = taskInfo.index
+    if (conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION)) {
+      val clonedTaskInfo = taskInfo.cloneWithEmptyAccumulables()
+      // Update this task's taskInfo while preserving its position in the list
+      taskAttempts(index) =
+        taskAttempts(index).map { i => if (i eq taskInfo) clonedTaskInfo else 
i }

Review Comment:
   Oh I misunderstood the code before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to