[ https://issues.apache.org/jira/browse/SPARK-30404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006640#comment-17006640 ]
Xuesen Liang commented on SPARK-30404: -------------------------------------- The following is spark driver log of the fixed version: {code:java} 20/01/02 15:09:17 INFO SparkContext: Starting job: main at NativeMethodAccessorImpl.java:0 20/01/02 15:09:17 INFO DAGScheduler: Registering RDD 2 (main at NativeMethodAccessorImpl.java:0) 20/01/02 15:09:17 INFO DAGScheduler: Got job 0 (main at NativeMethodAccessorImpl.java:0) with 2 output partitions 20/01/02 15:09:17 INFO DAGScheduler: Final stage: ResultStage 1 (main at NativeMethodAccessorImpl.java:0) 20/01/02 15:09:17 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0) 20/01/02 15:09:17 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0) 20/01/02 15:09:17 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has no missing parents 20/01/02 15:09:18 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 1)) 20/01/02 15:09:18 INFO YarnClusterScheduler: Adding task set 0.0 with 2 tasks 20/01/02 15:09:18 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 100.76.30.139, executor 6, partition 0, PROCESS_LOCAL, 7704 bytes) 20/01/02 15:09:18 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 9.10.22.124, executor 1, partition 1, PROCESS_LOCAL, 7705 bytes) 20/01/02 15:09:50 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 32602 ms on 100.76.30.139 (executor 6) (1/2) 20/01/02 15:09:58 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 40533 ms on 9.10.22.124 (executor 1) (2/2) 20/01/02 15:09:58 INFO YarnClusterScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 20/01/02 15:09:58 INFO DAGScheduler: ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0) finished in 40.937 s 20/01/02 15:09:58 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[6] at main at NativeMethodAccessorImpl.java:0), which has no missing parents 20/01/02 15:09:58 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[6] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 1)) 20/01/02 15:09:58 INFO YarnClusterScheduler: Adding task set 1.0 with 2 tasks 20/01/02 15:09:58 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 100.76.30.139, executor 6, partition 0, NODE_LOCAL, 7929 bytes) 20/01/02 15:09:58 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 9.10.22.124, executor 1, partition 1, NODE_LOCAL, 7929 bytes) 20/01/02 15:09:58 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 115 ms on 100.76.30.139 (executor 6) (1/2) 20/01/02 15:09:59 INFO TaskSetManager: Marking task 1 in stage 1.0 (on 9.10.22.124) as speculatable because it ran more than 230 ms 20/01/02 15:09:59 INFO TaskSetManager: Starting task 1.1 in stage 1.0 (TID 4, 9.10.133.232, executor 2, partition 1, ANY, 7929 bytes) 20/01/02 15:09:59 WARN TaskSetManager: Lost task 1.1 in stage 1.0 (TID 4, 9.10.133.232, executor 2): FetchFailed(BlockManagerId(6, 100.76.30.139, 7337, None), shuffleId=0, mapId=0, reduceId=1, message=Connection reset by peer) 20/01/02 15:09:59 INFO TaskSetManager: Task 1.1 in stage 1.0 (TID 4) failed, but the task will not be re-executed (either because the task failed with a shuffle data fetch failure, so the previous stage needs to be re-run, or because a different copy of the task has already succeeded). 20/01/02 15:09:59 INFO DAGScheduler: Marking ResultStage 1 (main at NativeMethodAccessorImpl.java:0) as failed due to a fetch failure from ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0) 20/01/02 15:09:59 INFO DAGScheduler: ResultStage 1 (main at NativeMethodAccessorImpl.java:0) failed in 0.279 s due to org.apache.spark.shuffle.FetchFailedException: Connection reset by peer 20/01/02 15:09:59 INFO DAGScheduler: Resubmitting ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0) and ResultStage 1 (main at NativeMethodAccessorImpl.java:0) due to fetch failure 20/01/02 15:09:59 INFO DAGScheduler: Resubmitting failed stages 20/01/02 15:09:59 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has no missing parents 20/01/02 15:09:59 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0)) 20/01/02 15:09:59 INFO YarnClusterScheduler: Adding task set 0.1 with 1 tasks 20/01/02 15:09:59 INFO TaskSetManager: Starting task 0.0 in stage 0.1 (TID 5, 100.76.32.68, executor 4, partition 0, PROCESS_LOCAL, 7704 bytes) 20/01/02 15:10:08 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 10096 ms on 9.10.22.124 (executor 1) (2/2) 20/01/02 15:10:08 INFO YarnClusterScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool 20/01/02 15:10:08 INFO DAGScheduler: ResultStage 1 (main at NativeMethodAccessorImpl.java:0) finished in 10.116 s 20/01/02 15:10:08 INFO DAGScheduler: Job 0 finished: main at NativeMethodAccessorImpl.java:0, took 51.113884 s 20/01/02 15:10:31 INFO TaskSetManager: Finished task 0.0 in stage 0.1 (TID 5) in 32581 ms on 100.76.32.68 (executor 4) (1/1) 20/01/02 15:10:31 INFO YarnClusterScheduler: Removed TaskSet 0.1, whose tasks have all completed, from pool {code} > Fix wrong log for FetchFailed task's successful speculation > ----------------------------------------------------------- > > Key: SPARK-30404 > URL: https://issues.apache.org/jira/browse/SPARK-30404 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.4.4 > Reporter: Xuesen Liang > Priority: Minor > > Steps to reproduce the bug: > 1. Spark speculation is enabled. > 2. For a running task {{X.0}}, a speculative task {{X.1}} is launched. > 3. Then this speculative task {{X.1}} failed by FetchFailedException, and > {{successful(index)}} is set to true > {code:scala} > def TaskSetManager#handleFailedTask(tid: Long, state: TaskState, reason: > TaskFailedReason) { > ... > val failureException: Option[Throwable] = reason match { > case fetchFailed: FetchFailed => > logWarning(failureReason) > if (!successful(index)) { > successful(index) = true > tasksSuccessful += 1 > } > ... > {code} > 4. When the origin running task {{X.0}} finished successfully, it found > {{successful(index) == true}}, then output log: {{Ignoring task-finished > event ....}} > {code:scala} > def TaskSetManager#handleSuccessfulTask(tid: Long, result: > DirectTaskResult[_]): Unit = { > ... > if (!successful(index)) { > tasksSuccessful += 1 > logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID > ${info.taskId}) in" + > s" ${info.duration} ms on ${info.host} (executor ${info.executorId})" > + > s" ($tasksSuccessful/$numTasks)") > // Mark successful and stop if all the tasks have succeeded. > successful(index) = true > if (tasksSuccessful == numTasks) { > isZombie = true > } > } else { > logInfo("Ignoring task-finished event for " + info.id + " in stage " + > taskSet.id + > " because task " + index + " has already completed successfully") > } > ... > {code} > But task {{X.0}} should output log: \{{Finished task X.0 in stage .... }} > > > Relevant spark driver logs as follows: > {code:scala} > 20/01/02 11:21:45 INFO DAGScheduler: Got job 0 (main at > NativeMethodAccessorImpl.java:0) with 2 output partitions > 20/01/02 11:21:45 INFO DAGScheduler: Final stage: ResultStage 1 (main at > NativeMethodAccessorImpl.java:0) > 20/01/02 11:21:45 INFO DAGScheduler: Parents of final stage: > List(ShuffleMapStage 0) > 20/01/02 11:21:45 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0) > 20/01/02 11:21:45 INFO DAGScheduler: Submitting ShuffleMapStage 0 > (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has > no missing parents > 20/01/02 11:21:45 INFO DAGScheduler: Submitting 2 missing tasks from > ShuffleMapStage 0 (MapPartitionsRDD[2] at main at > NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, > 1)) > 20/01/02 11:21:45 INFO YarnClusterScheduler: Adding task set 0.0 with 2 tasks > 20/01/02 11:21:45 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, > 9.179.143.4, executor 1, partition 0, PROCESS_LOCAL, 7704 bytes) > 20/01/02 11:21:45 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, > 9.76.13.26, executor 2, partition 1, PROCESS_LOCAL, 7705 bytes) > 20/01/02 11:22:18 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) > in 32491 ms on 9.179.143.4 (executor 1) (1/2) > 20/01/02 11:22:26 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) > in 40544 ms on 9.76.13.26 (executor 2) (2/2) > 20/01/02 11:22:26 INFO DAGScheduler: ShuffleMapStage 0 (main at > NativeMethodAccessorImpl.java:0) finished in 40.854 s > 20/01/02 11:22:26 INFO YarnClusterScheduler: Removed TaskSet 0.0, whose tasks > have all completed, from pool > 20/01/02 11:22:26 INFO DAGScheduler: Submitting ResultStage 1 > (MapPartitionsRDD[6] at main at NativeMethodAccessorImpl.java:0), which has > no missing parents > 20/01/02 11:22:26 INFO DAGScheduler: Submitting 2 missing tasks from > ResultStage 1 (MapPartitionsRDD[6] at main at > NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, > 1)) > 20/01/02 11:22:26 INFO YarnClusterScheduler: Adding task set 1.0 with 2 tasks > 20/01/02 11:22:26 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, > 9.179.143.4, executor 1, partition 0, NODE_LOCAL, 7929 bytes) > 20/01/02 11:22:26 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, > 9.76.13.26, executor 2, partition 1, NODE_LOCAL, 7929 bytes) > 20/01/02 11:22:26 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) > in 79 ms on 9.179.143.4 (executor 1) (1/2) > 20/01/02 11:22:26 INFO TaskSetManager: Marking task 1 in stage 1.0 (on > 9.76.13.26) as speculatable because it ran more than 158 ms > 20/01/02 11:22:26 INFO TaskSetManager: Starting task 1.1 in stage 1.0 (TID 4, > 9.179.143.52, executor 3, partition 1, ANY, 7929 bytes) > 20/01/02 11:22:26 WARN TaskSetManager: Lost task 1.1 in stage 1.0 (TID 4, > 9.179.143.52, executor 3): FetchFailed(BlockManagerId(1, 9.179.143.4, 7337, > None), shuffleId=0, mapId=0, reduceId=1, > message=org.apache.spark.shuffle.FetchFailedException: Connection reset by > peer) > 20/01/02 11:22:26 INFO TaskSetManager: Task 1.1 in stage 1.0 (TID 4) failed, > but the task will not be re-executed (either because the task failed with a > shuffle data fetch failure, so the previous stage needs to be re-run, or > because a different copy of the task has already succeeded). > 20/01/02 11:22:26 INFO DAGScheduler: Marking ResultStage 1 (main at > NativeMethodAccessorImpl.java:0) as failed due to a fetch failure from > ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0) > 20/01/02 11:22:26 INFO DAGScheduler: ResultStage 1 (main at > NativeMethodAccessorImpl.java:0) failed in 0.261 s due to > org.apache.spark.shuffle.FetchFailedException: Connection reset by peer > 20/01/02 11:22:26 INFO DAGScheduler: Resubmitting ShuffleMapStage 0 (main at > NativeMethodAccessorImpl.java:0) and ResultStage 1 (main at > NativeMethodAccessorImpl.java:0) due to fetch failure > 20/01/02 11:22:26 INFO DAGScheduler: Resubmitting failed stages > 20/01/02 11:22:26 INFO DAGScheduler: Submitting ShuffleMapStage 0 > (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has > no missing parents > 20/01/02 11:22:26 INFO DAGScheduler: Submitting 1 missing tasks from > ShuffleMapStage 0 (MapPartitionsRDD[2] at main at > NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0)) > 20/01/02 11:22:26 INFO YarnClusterScheduler: Adding task set 0.1 with 1 tasks > 20/01/02 11:22:26 INFO TaskSetManager: Starting task 0.0 in stage 0.1 (TID 5, > 9.179.143.4, executor 1, partition 0, PROCESS_LOCAL, 7704 bytes) > // NOTE: Here should be "INFO TaskSetManager: Finished task 1.0 in stage 1.0 > (TID 3) in 10000 ms on 9.76.13.26 (executor 2) (2/2)" > 20/01/02 11:22:36 INFO TaskSetManager: Ignoring task-finished event for 1.0 > in stage 1.0 because task 1 has already completed successfully > 20/01/02 11:22:36 INFO YarnClusterScheduler: Removed TaskSet 1.0, whose tasks > have all completed, from pool > 20/01/02 11:22:36 INFO DAGScheduler: ResultStage 1 (main at > NativeMethodAccessorImpl.java:0) finished in 10.131 s > 20/01/02 11:22:36 INFO DAGScheduler: Job 0 finished: main at > NativeMethodAccessorImpl.java:0, took 51.031212 s > 20/01/02 11:22:58 INFO TaskSetManager: Finished task 0.0 in stage 0.1 (TID 5) > in 32029 ms on 9.179.143.4 (executor 1) (1/1) > 20/01/02 11:22:58 INFO YarnClusterScheduler: Removed TaskSet 0.1, whose tasks > have all completed, from pool > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org