mridulm commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1448069167


##########
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala:
##########
@@ -643,6 +657,29 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
     }
   }
 
+  /**
+   * A simple listener that tracks task infos for all active tasks.
+   */
+  private class SaveActiveTaskInfos extends SparkListener {
+    // Use a set based on IdentityHashMap instead of a HashSet to track unique 
references of
+    // TaskInfo objects.
+    val taskInfos = Collections.newSetFromMap[TaskInfo](new IdentityHashMap)
+
+    override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+      val info = taskStart.taskInfo
+      if (info != null) {
+        taskInfos.add(info)
+      }
+    }
+
+    override def onTaskEnd(task: SparkListenerTaskEnd): Unit = {
+      val info = task.taskInfo
+      if (info != null && taskInfos.contains(info)) {
+        taskInfos.remove(info)
+      }

Review Comment:
   nit:
   
   ```suggestion
         if (info != null) {
           taskInfos.remove(info)
         }
   ```
   
   or even simply 
   ```suggestion
         taskInfos.remove(info)
   ```



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -61,6 +61,11 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: 
FakeTaskScheduler)
       accumUpdates: Seq[AccumulatorV2[_, _]],
       metricPeaks: Array[Long],
       taskInfo: TaskInfo): Unit = {
+    accumUpdates.foreach(acc =>
+      taskInfo.setAccumulables(
+        acc.toInfo(Some(acc.value), Some(acc.value)) +: taskInfo.accumulables)
+    )
+    taskScheduler.endedTasks(taskInfo.index) = reason

Review Comment:
   Duplicate ?
   
   ```suggestion
   ```



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -61,6 +61,11 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: 
FakeTaskScheduler)
       accumUpdates: Seq[AccumulatorV2[_, _]],
       metricPeaks: Array[Long],
       taskInfo: TaskInfo): Unit = {
+    accumUpdates.foreach(acc =>
+      taskInfo.setAccumulables(
+        acc.toInfo(Some(acc.value), Some(acc.value)) +: taskInfo.accumulables)
+    )

Review Comment:
   Add a comment on why we need this ?



##########
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala:
##########
@@ -289,6 +290,17 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
     stageInfo.rddInfos.forall(_.numPartitions == 4) should be {true}
   }
 
+  test("SPARK-46383: Track TaskInfo objects") {
+    val conf = new 
SparkConf().set(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true)
+    sc = new SparkContext("local", "SparkListenerSuite", conf)
+    val listener = new SaveActiveTaskInfos
+    sc.addSparkListener(listener)
+    val rdd1 = sc.parallelize(1 to 100, 4)
+    sc.runJob(rdd1, (items: Iterator[Int]) => items.size, Seq(0, 1))
+    sc.listenerBus.waitUntilEmpty()
+    listener.taskInfos.size should be { 0 }

Review Comment:
   Functionally that (the right task info is in the event) should be covered 
already (in use of `SaveStageAndTaskInfo` for example). Do let me know if that 
is not the case.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to