[ https://issues.apache.org/jira/browse/SPARK-5259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-5259: ----------------------------------- Assignee: (was: Apache Spark) > Fix endless retry stage by add task equal() and hashcode() to avoid > stage.pendingTasks not empty while stage map output is available > ------------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-5259 > URL: https://issues.apache.org/jira/browse/SPARK-5259 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.1.1, 1.2.0 > Reporter: SuYan > > 1. while shuffle stage was retry, there may have 2 taskSet running. > we call the 2 taskSet:taskSet0.0, taskSet0.1, and we know, taskSet0.1 will > re-run taskSet0.0's un-complete task > if taskSet0.0 was run all the task that the taskSet0.1 not complete yet but > covered the partitions. > then stage is Available is true. > {code} > def isAvailable: Boolean = { > if (!isShuffleMap) { > true > } else { > numAvailableOutputs == numPartitions > } > } > {code} > but stage.pending task is not empty, to protect register mapStatus in > mapOutputTracker. > because if task is complete success, pendingTasks is minus Task in > reference-level because the task is not override hashcode() and equals() > pendingTask -= task > but numAvailableOutputs is according to partitionID. > here is the testcase to prove: > {code} > test("Make sure mapStage.pendingtasks is set() " + > "while MapStage.isAvailable is true while stage was retry ") { > val firstRDD = new MyRDD(sc, 6, Nil) > val firstShuffleDep = new ShuffleDependency(firstRDD, null) > val firstShuyffleId = firstShuffleDep.shuffleId > val shuffleMapRdd = new MyRDD(sc, 6, List(firstShuffleDep)) > val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) > val shuffleId = shuffleDep.shuffleId > val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) > submit(reduceRdd, Array(0, 1)) > complete(taskSets(0), Seq( > (Success, makeMapStatus("hostB", 1)), > (Success, makeMapStatus("hostB", 2)), > (Success, makeMapStatus("hostC", 3)), > (Success, makeMapStatus("hostB", 4)), > (Success, makeMapStatus("hostB", 5)), > (Success, makeMapStatus("hostC", 6)) > )) > complete(taskSets(1), Seq( > (Success, makeMapStatus("hostA", 1)), > (Success, makeMapStatus("hostB", 2)), > (Success, makeMapStatus("hostA", 1)), > (Success, makeMapStatus("hostB", 2)), > (Success, makeMapStatus("hostA", 1)) > )) > runEvent(ExecutorLost("exec-hostA")) > runEvent(CompletionEvent(taskSets(1).tasks(0), Resubmitted, null, null, > null, null)) > runEvent(CompletionEvent(taskSets(1).tasks(2), Resubmitted, null, null, > null, null)) > runEvent(CompletionEvent(taskSets(1).tasks(0), > FetchFailed(null, firstShuyffleId, -1, 0, "Fetch Mata data failed"), > null, null, null, null)) > scheduler.resubmitFailedStages() > runEvent(CompletionEvent(taskSets(1).tasks(0), Success, > makeMapStatus("hostC", 1), null, null, null)) > runEvent(CompletionEvent(taskSets(1).tasks(2), Success, > makeMapStatus("hostC", 1), null, null, null)) > runEvent(CompletionEvent(taskSets(1).tasks(4), Success, > makeMapStatus("hostC", 1), null, null, null)) > runEvent(CompletionEvent(taskSets(1).tasks(5), Success, > makeMapStatus("hostB", 2), null, null, null)) > val stage = scheduler.stageIdToStage(taskSets(1).stageId) > assert(stage.attemptId == 2) > assert(stage.isAvailable) > assert(stage.pendingTasks.size == 0) > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org