mridulm commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1441419344


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -846,11 +851,49 @@ private[spark] class TaskSetManager(
     // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before 
reaching here.
     // Note: "result.value()" only deserializes the value when it's called at 
the first time, so
     // here "result.value()" just returns the value and won't block other 
threads.
-    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), 
result.accumUpdates,
-      result.metricPeaks, info)
+
+    emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), Success, 
result.value(),
+      result.accumUpdates, result.metricPeaks, info)
     maybeFinishTaskSet()
   }
 
+  /**
+   * A wrapper around [[DAGScheduler.taskEnded()]] that empties out the 
accumulables for the
+   * TaskInfo object, corresponding to the completed task, referenced by this 
class.
+   *
+   * SPARK-46383: For the completed task, we ship the original TaskInfo to the 
DAGScheduler and only
+   * retain a cloned TaskInfo in this class. We then set the accumulables to 
Nil for the TaskInfo
+   * object that corresponds to the completed task.
+   * We do this to release references to `TaskInfo.accumulables()` as the 
TaskInfo
+   * objects held by this class are long-lived and have a heavy memory 
footprint on the driver.
+   *
+   * This is safe as the TaskInfo accumulables are not needed once they are 
shipped to the
+   * DAGScheduler where they are aggregated. Additionally, the original 
TaskInfo, and not a
+   * clone, must be sent to the DAGScheduler as this TaskInfo object is sent 
to the
+   * DAGScheduler on multiple events during the task's lifetime. Users can 
install
+   * SparkListeners that compare the TaskInfo objects across these 
SparkListener events and
+   * thus the TaskInfo object sent to the DAGScheduler must always reference 
the same TaskInfo
+   * object.
+   */
+  private def emptyTaskInfoAccumulablesAndNotifyDagScheduler(
+      taskId: Long,
+      task: Task[_],
+      reason: TaskEndReason,
+      result: Any,
+      accumUpdates: Seq[AccumulatorV2[_, _]],
+      metricPeaks: Array[Long],
+      taskInfo: TaskInfo): Unit = {
+    val index = taskInfo.index
+    if (conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION)) {

Review Comment:
   Pull `conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION)` as a private 
field



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -787,6 +787,9 @@ private[spark] class TaskSetManager(
     // SPARK-37300: when the task was already finished state, just ignore it,
     // so that there won't cause successful and tasksSuccessful wrong result.
     if(info.finished) {
+      // SPARK-46383: Clear out the accumulables for a completed task to 
reduce accumulable
+      // lifetime.
+      info.resetAccumulables()

Review Comment:
   Only when `conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION)` ?



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -846,11 +851,49 @@ private[spark] class TaskSetManager(
     // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before 
reaching here.
     // Note: "result.value()" only deserializes the value when it's called at 
the first time, so
     // here "result.value()" just returns the value and won't block other 
threads.
-    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), 
result.accumUpdates,
-      result.metricPeaks, info)
+
+    emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), Success, 
result.value(),
+      result.accumUpdates, result.metricPeaks, info)
     maybeFinishTaskSet()
   }
 
+  /**
+   * A wrapper around [[DAGScheduler.taskEnded()]] that empties out the 
accumulables for the
+   * TaskInfo object, corresponding to the completed task, referenced by this 
class.
+   *
+   * SPARK-46383: For the completed task, we ship the original TaskInfo to the 
DAGScheduler and only
+   * retain a cloned TaskInfo in this class. We then set the accumulables to 
Nil for the TaskInfo
+   * object that corresponds to the completed task.
+   * We do this to release references to `TaskInfo.accumulables()` as the 
TaskInfo
+   * objects held by this class are long-lived and have a heavy memory 
footprint on the driver.
+   *
+   * This is safe as the TaskInfo accumulables are not needed once they are 
shipped to the
+   * DAGScheduler where they are aggregated. Additionally, the original 
TaskInfo, and not a
+   * clone, must be sent to the DAGScheduler as this TaskInfo object is sent 
to the
+   * DAGScheduler on multiple events during the task's lifetime. Users can 
install
+   * SparkListeners that compare the TaskInfo objects across these 
SparkListener events and
+   * thus the TaskInfo object sent to the DAGScheduler must always reference 
the same TaskInfo
+   * object.
+   */
+  private def emptyTaskInfoAccumulablesAndNotifyDagScheduler(
+      taskId: Long,
+      task: Task[_],
+      reason: TaskEndReason,
+      result: Any,
+      accumUpdates: Seq[AccumulatorV2[_, _]],
+      metricPeaks: Array[Long],
+      taskInfo: TaskInfo): Unit = {
+    val index = taskInfo.index

Review Comment:
   super nit: pull this variable into the `if` block below



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -874,6 +917,9 @@ private[spark] class TaskSetManager(
     // SPARK-37300: when the task was already finished state, just ignore it,
     // so that there won't cause copiesRunning wrong result.
     if (info.finished) {
+      // SPARK-46383: Clear out the accumulables for a completed task to 
reduce accumulable
+      // lifetime.
+      info.resetAccumulables()

Review Comment:
   Same as above, only when 
`conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION)` ?



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -229,6 +234,77 @@ class TaskSetManagerSuite
     super.afterEach()
   }
 
+  test("SPARK-46383: Accumulables of TaskInfo objects held by TaskSetManager 
must not be " +
+    "accessed once the task has completed") {
+    val conf = new SparkConf().
+      set(config.DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true)
+    sc = new SparkContext("local", "test", conf)
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+    val taskSet = FakeTask.createTaskSet(1)
+    val clock = new ManualClock
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
+    val accumUpdates = taskSet.tasks.head.metrics.internalAccums
+
+    // Offer a host. This will launch the first task.
+    val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+    assert(taskOption.isDefined)
+
+    clock.advance(1)
+    // Tell it the first task has finished successfully
+    manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdates))
+    assert(sched.endedTasks(0) === Success)
+
+    val e = intercept[SparkException]{
+      manager.taskInfos.head._2.accumulables
+    }
+    assert(e.getMessage.contains("Accumulables for the TaskInfo have been 
cleared"))
+  }
+
+  test("SPARK-46383: TaskInfo accumulables are cleared upon task completion") {
+    val conf = new SparkConf().
+      set(config.DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true)
+    sc = new SparkContext("local", "test", conf)
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+    val taskSet = FakeTask.createTaskSet(2)
+    val clock = new ManualClock
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
+    val accumUpdates = taskSet.tasks.head.metrics.internalAccums
+
+    // Offer a host. This will launch the first task.
+    val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+    assert(taskOption.isDefined)
+
+    clock.advance(1)
+    // Tell it the first task has finished successfully
+    manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdates))
+    assert(sched.endedTasks(0) === Success)
+
+    // Only one task was launched and it completed successfully, thus the 
TaskInfo accumulables
+    // should be empty.
+    assert(!manager.taskInfos.exists(l => !l._2.isAccumulablesEmpty))

Review Comment:
   Change from `l` to avoid confusion with `1` (here and elsewhere)



##########
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala:
##########
@@ -289,6 +290,17 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
     stageInfo.rddInfos.forall(_.numPartitions == 4) should be {true}
   }
 
+  test("SPARK-46383: Track TaskInfo objects") {
+    val conf = new 
SparkConf().set(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true)
+    sc = new SparkContext("local", "SparkListenerSuite", conf)
+    val listener = new SaveActiveTaskInfos
+    sc.addSparkListener(listener)
+    val rdd1 = sc.parallelize(1 to 100, 4)
+    sc.runJob(rdd1, (items: Iterator[Int]) => items.size, Seq(0, 1))
+    sc.listenerBus.waitUntilEmpty()
+    listener.taskInfos.size should be { 0 }

Review Comment:
   I am not sure I follow this test, what is it trying to do ?
   This test will be successful even with 
`DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION` = `true`, right ? (Since it is 
simply checking for instance equality in the fired event ?)



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -846,11 +851,49 @@ private[spark] class TaskSetManager(
     // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before 
reaching here.
     // Note: "result.value()" only deserializes the value when it's called at 
the first time, so
     // here "result.value()" just returns the value and won't block other 
threads.
-    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), 
result.accumUpdates,
-      result.metricPeaks, info)
+
+    emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), Success, 
result.value(),
+      result.accumUpdates, result.metricPeaks, info)
     maybeFinishTaskSet()
   }
 
+  /**
+   * A wrapper around [[DAGScheduler.taskEnded()]] that empties out the 
accumulables for the
+   * TaskInfo object, corresponding to the completed task, referenced by this 
class.
+   *
+   * SPARK-46383: For the completed task, we ship the original TaskInfo to the 
DAGScheduler and only
+   * retain a cloned TaskInfo in this class. We then set the accumulables to 
Nil for the TaskInfo
+   * object that corresponds to the completed task.
+   * We do this to release references to `TaskInfo.accumulables()` as the 
TaskInfo
+   * objects held by this class are long-lived and have a heavy memory 
footprint on the driver.
+   *
+   * This is safe as the TaskInfo accumulables are not needed once they are 
shipped to the
+   * DAGScheduler where they are aggregated. Additionally, the original 
TaskInfo, and not a
+   * clone, must be sent to the DAGScheduler as this TaskInfo object is sent 
to the
+   * DAGScheduler on multiple events during the task's lifetime. Users can 
install
+   * SparkListeners that compare the TaskInfo objects across these 
SparkListener events and
+   * thus the TaskInfo object sent to the DAGScheduler must always reference 
the same TaskInfo
+   * object.
+   */
+  private def emptyTaskInfoAccumulablesAndNotifyDagScheduler(
+      taskId: Long,
+      task: Task[_],
+      reason: TaskEndReason,
+      result: Any,
+      accumUpdates: Seq[AccumulatorV2[_, _]],
+      metricPeaks: Array[Long],
+      taskInfo: TaskInfo): Unit = {

Review Comment:
   As we are passing `taskId` already - we can drop `taskInfo` from here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to