[ https://issues.apache.org/jira/browse/SPARK-34779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17610965#comment-17610965 ]
Paul Praet commented on SPARK-34779: ------------------------------------ We are seeing spurious failures on the assert(): {noformat} 22/09/29 09:46:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 3063.0 in stage 1997.0 (TID 677249),5,main] java.lang.AssertionError: assertion failed: task count shouldn't below 0 at scala.Predef$.assert(Predef.scala:223) at org.apache.spark.executor.ExecutorMetricsPoller.decrementCount$1(ExecutorMetricsPoller.scala:130) at org.apache.spark.executor.ExecutorMetricsPoller.$anonfun$onTaskCompletion$3(ExecutorMetricsPoller.scala:135) at java.base/java.util.concurrent.ConcurrentHashMap.computeIfPresent(ConcurrentHashMap.java:1822) at org.apache.spark.executor.ExecutorMetricsPoller.onTaskCompletion(ExecutorMetricsPoller.scala:135) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:737) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 22/09/29 09:46:24 INFO MemoryStore: MemoryStore cleared 22/09/29 09:46:24 INFO BlockManager: BlockManager stopped 22/09/29 09:46:24 INFO ShutdownHookManager: Shutdown hook called 22/09/29 09:46:24 INFO ShutdownHookManager: Deleting directory /mnt/yarn/usercache/hadoop/appcache/application_1664443624160_0001/spark-93efc2d4-84de-494b-a3b7-2cb1c3a45426{noformat} Feels like overkill ? Seen on Spark 3.2.0. > ExecutorMetricsPoller should keep stage entry in stageTCMP until a heartbeat > occurs > ----------------------------------------------------------------------------------- > > Key: SPARK-34779 > URL: https://issues.apache.org/jira/browse/SPARK-34779 > Project: Spark > Issue Type: Improvement > Components: Spark Core > Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1 > Reporter: Baohe Zhang > Assignee: Baohe Zhang > Priority: Major > Fix For: 3.2.0 > > > The current implementation of ExecutoMetricsPoller uses task count in each > stage to decide whether to keep a stage entry or not. In the case of the > executor only has 1 core, it may have these issues: > # Peak metrics missing (due to stage entry being removed within a heartbeat > interval) > # Unnecessary and frequent hashmap entry removal and insertion. > Assuming an executor with 1 core has 2 tasks (task1 and task2, both belong to > stage (0,0)) to execute in a heartbeat interval, the workflow in current > ExecutorMetricsPoller implementation would be: > 1. task1 start -> stage (0, 0) entry created in stageTCMP, task count > increment to1 > 2. 1st poll() -> update peak metrics of stage (0, 0) > 3. task1 end -> stage (0, 0) task count decrement to 0, stage (0, 0) entry > removed, peak metrics lost. > 4. task2 start -> stage (0, 0) entry created in stageTCMP, task count > increment to1 > 5. 2nd poll() -> update peak metrics of stage (0, 0) > 6. task2 end -> stage (0, 0) task count decrement to 0, stage (0, 0) entry > removed, peak metrics lost > 7. heartbeat() -> empty or inaccurate peak metrics for stage(0,0) reported. > We can fix the issue by keeping entries with task count = 0 in stageTCMP map > until a heartbeat occurs. At the heartbeat, after reporting the peak metrics > for each stage, we scan each stage in stageTCMP and remove entries with task > count = 0. > After the fix, the workflow would be: > 1. task1 start -> stage (0, 0) entry created in stageTCMP, task count > increment to1 > 2. 1st poll() -> update peak metrics of stage (0, 0) > 3. task1 end -> stage (0, 0) task count decrement to 0,but the entry (0,0) > still remain. > 4. task2 start -> task count of stage (0,0) increment to1 > 5. 2nd poll() -> update peak metrics of stage (0, 0) > 6. task2 end -> stage (0, 0) task count decrement to 0,but the entry (0,0) > still remain. > 7. heartbeat() -> accurate peak metrics for stage (0, 0) reported. Remove > entry for stage (0,0) in stageTCMP because its task count is 0. > > How to verify the behavior? > Submit a job with a custom polling interval (e.g., 2s) and > spark.executor.cores=1 and check the debug logs of ExecutoMetricsPoller. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org