[GitHub] spark pull request #20640: [SPARK-19755][Mesos] Blacklist is always active f...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/20640#discussion_r169500415 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -571,7 +568,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( cpus + totalCoresAcquired <= maxCores && mem <= offerMem && numExecutors < executorLimit && - slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES && + !scheduler.nodeBlacklist().contains(slaveId) && --- End diff -- In other places it looks like the hostname is used in the blacklist - why does this check against the slaveId instead of the offerHostname? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20640: [SPARK-19755][Mesos] Blacklist is always active f...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/20640#discussion_r169497847 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -648,15 +645,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( totalGpusAcquired -= gpus gpusByTaskId -= taskId } -// If it was a failure, mark the slave as failed for blacklisting purposes -if (TaskState.isFailed(state)) { - slave.taskFailures += 1 - - if (slave.taskFailures >= MAX_SLAVE_FAILURES) { -logInfo(s"Blacklisting Mesos slave $slaveId due to too many failures; " + --- End diff -- Is it a concern to lose this error message? (I don't know anything about Mesos but it does seem potentially useful?) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17471: [SPARK-3577] Report Spill size on disk for UnsafeExterna...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17471 I think the power is still out in the CS building at Berkeley because of the earthquake, so I'm guessing Jenkins is down as a result (note that even the vanilla AMP website doesn't work: http://amplab.cs.berkeley.edu/) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16877: [WIP] [SPARK-19538] Explicitly tell the DAGSchedu...
Github user kayousterhout closed the pull request at: https://github.com/apache/spark/pull/16877 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15326: [SPARK-17759] [CORE] Avoid adding duplicate schedulables
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/15326 @HyukjinKwon what's the ping here for? It looks like I left some comments that @erenavsarogullari will address when he has time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17543: [SPARK-20230] FetchFailedExceptions should invalidate fi...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17543 In theory (as you may know), the way this is supposed to work is that, since each reduce task reads the map outputs in random order, we delay re-scheduling the earlier stage, to try to collect as many failures as possible (and so you don't need 1 stage failure for each failed map task). But I agree that in general things don't work well when there are lots of fetch failures, which is what https://issues.apache.org/jira/browse/SPARK-20178 is tracking. I'm not yet convinced that this is the most important fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17543: [SPARK-20230] FetchFailedExceptions should invali...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17543#discussion_r110014380 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1281,10 +1281,24 @@ class DAGScheduler( val failedStage = stageIdToStage(task.stageId) val mapStage = shuffleIdToMapStage(shuffleId) +def invalidateLostFilesAndExecutor(): Unit = { + // Mark the map whose fetch failed as broken in the map stage + if (mapId != -1) { +mapStage.removeOutputLoc(mapId, bmAddress) +mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) + } + + // TODO: mark the executor as failed only if there were lots of fetch failures on it + if (bmAddress != null) { +handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch)) --- End diff -- Ah sorry I concurrently edited my comments when I looked at this more. I'm still confused though -- why does (7) matter (that the metadata for the executor didn't get cleaned up) given that it didn't have any shuffle outputs? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17543: [SPARK-20230] FetchFailedExceptions should invali...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17543#discussion_r110010527 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1281,10 +1281,24 @@ class DAGScheduler( val failedStage = stageIdToStage(task.stageId) val mapStage = shuffleIdToMapStage(shuffleId) +def invalidateLostFilesAndExecutor(): Unit = { + // Mark the map whose fetch failed as broken in the map stage + if (mapId != -1) { +mapStage.removeOutputLoc(mapId, bmAddress) +mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) + } + + // TODO: mark the executor as failed only if there were lots of fetch failures on it + if (bmAddress != null) { +handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch)) + } +} + if (failedStage.latestInfo.attemptId != task.stageAttemptId) { logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" + s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + s"(attempt ID ${failedStage.latestInfo.attemptId}) running") + invalidateLostFilesAndExecutor() --- End diff -- Actually I'm confused about why you need this at all. It looks like the map output location only gets added when the task is successful -- so what map output loc. info needs to be invalidated? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17543: [SPARK-20230] FetchFailedExceptions should invali...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17543#discussion_r110009919 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1281,10 +1281,24 @@ class DAGScheduler( val failedStage = stageIdToStage(task.stageId) val mapStage = shuffleIdToMapStage(shuffleId) +def invalidateLostFilesAndExecutor(): Unit = { + // Mark the map whose fetch failed as broken in the map stage + if (mapId != -1) { +mapStage.removeOutputLoc(mapId, bmAddress) +mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) + } + + // TODO: mark the executor as failed only if there were lots of fetch failures on it + if (bmAddress != null) { +handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch)) --- End diff -- Why do you need to invalidate the whole executor here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17533: [SPARK-20219] Schedule tasks based on size of input from...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17533 I'm hesitant about this and posted some comments on the JIRA (we should try to keep high-level discussion about whether this change makes sense there, so it's easier to reference in the future and not tangled up in the low-level PR comments) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17445: [SPARK-20115] [CORE] Fix DAGScheduler to recompute all t...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17445 Jenkins this is ok to test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15326: [SPARK-17759] [CORE] Avoid adding duplicate sched...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/15326#discussion_r109322968 --- Diff: core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala --- @@ -292,7 +290,100 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { } } - private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, + test("FIFO Scheduler should not add duplicate TaskSetManager") { +sc = new SparkContext(LOCAL, APP_NAME) +val taskScheduler = new TaskSchedulerImpl(sc) + +val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0) +val schedulableBuilder = new FIFOSchedulableBuilder(rootPool) + +val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler) +val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler) +schedulableBuilder.addTaskSetManager(taskSetManager0, null) +schedulableBuilder.addTaskSetManager(taskSetManager0, null) --- End diff -- When would this happen? (the same TSM getting added twice) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15326: [SPARK-17759] [CORE] Avoid adding duplicate sched...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/15326#discussion_r109323006 --- Diff: core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala --- @@ -292,7 +290,100 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { } } - private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, + test("FIFO Scheduler should not add duplicate TaskSetManager") { +sc = new SparkContext(LOCAL, APP_NAME) +val taskScheduler = new TaskSchedulerImpl(sc) + +val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0) +val schedulableBuilder = new FIFOSchedulableBuilder(rootPool) + +val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler) +val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler) +schedulableBuilder.addTaskSetManager(taskSetManager0, null) +schedulableBuilder.addTaskSetManager(taskSetManager0, null) +schedulableBuilder.addTaskSetManager(taskSetManager1, null) + +assert(rootPool.schedulableQueue.size === 2) +assert(rootPool.schedulableNameToSchedulable.size === 2) + +assert(rootPool.getSchedulableByName(taskSetManager0.name) === taskSetManager0) +assert(rootPool.getSchedulableByName(taskSetManager1.name) === taskSetManager1) + } + + test("Fair Scheduler should not create duplicate pool") { +sc = createSparkContext("fairscheduler-duplicate-pools.xml") + +val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) +val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) +schedulableBuilder.buildPools() + +assert(rootPool.schedulableQueue.size === 2) +assert(rootPool.schedulableNameToSchedulable.size === 2) + +verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, SchedulingMode.FIFO) +verifyPool(rootPool, "duplicate_pool1", 1, 1, SchedulingMode.FAIR) + } + + test("Fair Scheduler should not add duplicate TaskSetManager via default pool") { +sc = new SparkContext(LOCAL, APP_NAME) +val taskScheduler = new TaskSchedulerImpl(sc) + +val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) +val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) +schedulableBuilder.buildPools() + +val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler) +val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler) +schedulableBuilder.addTaskSetManager(taskSetManager0, null) +schedulableBuilder.addTaskSetManager(taskSetManager0, null) --- End diff -- similar to the above -- when could this happen? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15326: [SPARK-17759] [CORE] Avoid adding duplicate sched...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/15326#discussion_r109322980 --- Diff: core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala --- @@ -292,7 +290,100 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { } } - private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, + test("FIFO Scheduler should not add duplicate TaskSetManager") { +sc = new SparkContext(LOCAL, APP_NAME) +val taskScheduler = new TaskSchedulerImpl(sc) + +val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0) +val schedulableBuilder = new FIFOSchedulableBuilder(rootPool) + +val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler) +val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler) +schedulableBuilder.addTaskSetManager(taskSetManager0, null) +schedulableBuilder.addTaskSetManager(taskSetManager0, null) +schedulableBuilder.addTaskSetManager(taskSetManager1, null) + +assert(rootPool.schedulableQueue.size === 2) +assert(rootPool.schedulableNameToSchedulable.size === 2) + +assert(rootPool.getSchedulableByName(taskSetManager0.name) === taskSetManager0) +assert(rootPool.getSchedulableByName(taskSetManager1.name) === taskSetManager1) + } + + test("Fair Scheduler should not create duplicate pool") { +sc = createSparkContext("fairscheduler-duplicate-pools.xml") --- End diff -- Can you add a comment about what this is doing? Something like Load the scheduler pools from fairscheduler-duplicate-pools, which has 4 entries, but two are duplicates, and make sure that the duplicates are ignored. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15326: [SPARK-17759] [CORE] Avoid adding duplicate sched...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/15326#discussion_r109322987 --- Diff: core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala --- @@ -292,7 +290,100 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { } } - private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, + test("FIFO Scheduler should not add duplicate TaskSetManager") { +sc = new SparkContext(LOCAL, APP_NAME) +val taskScheduler = new TaskSchedulerImpl(sc) + +val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0) +val schedulableBuilder = new FIFOSchedulableBuilder(rootPool) + +val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler) +val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler) +schedulableBuilder.addTaskSetManager(taskSetManager0, null) +schedulableBuilder.addTaskSetManager(taskSetManager0, null) +schedulableBuilder.addTaskSetManager(taskSetManager1, null) + +assert(rootPool.schedulableQueue.size === 2) +assert(rootPool.schedulableNameToSchedulable.size === 2) + +assert(rootPool.getSchedulableByName(taskSetManager0.name) === taskSetManager0) +assert(rootPool.getSchedulableByName(taskSetManager1.name) === taskSetManager1) + } + + test("Fair Scheduler should not create duplicate pool") { +sc = createSparkContext("fairscheduler-duplicate-pools.xml") + +val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) +val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) +schedulableBuilder.buildPools() + +assert(rootPool.schedulableQueue.size === 2) +assert(rootPool.schedulableNameToSchedulable.size === 2) + +verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, SchedulingMode.FIFO) +verifyPool(rootPool, "duplicate_pool1", 1, 1, SchedulingMode.FAIR) --- End diff -- can you add a comment here saying to make sure that the 1st pool specified is the one that gets used? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15326: [SPARK-17759] [CORE] Avoid adding duplicate schedulables
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/15326 Jenkins this is OK to test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17445: [SPARK-20115] [CORE] Fix DAGScheduler to recompute all t...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17445 Have you seen #17088? I just glanced at this quickly but I think this is a duplicate of that (SPARK-19753) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17445: [SPARK-20115] [CORE] Fix DAGScheduler to recompute all t...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17445 Jenkins this is OK to test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17208: [SPARK-19868] conflict TasksetManager lead to spark stop...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17208 LGTM merged this to master --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17297: [SPARK-14649][CORE] DagScheduler should not run duplicat...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17297 Agree sounds good! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17448: [SPARK-20119][test-maven]Fix the test case fail in DataS...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17448 Thanks @gatorsmile! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17297: [SPARK-14649][CORE] DagScheduler should not run duplicat...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17297 @sitalkedia they're in core/target/unit-tests.log Sometimes it's easier to move the logs to the tests (so they show up in-line), which you can do by changing core/src/test/resources/log4j.properties to log to the console instead of to a file. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17297: [SPARK-14649][CORE] DagScheduler should not run duplicat...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17297 To recap the issue that Imran and I discussed here, I think it can be summarized as follows: - A Fetch Failure happens at some time t and indicates that the map output on machine M has been lost - Consider some running task that's read x map outputs and still needs to process y map outputs - Scenario A: (PRO of this PR) If the output from M was in the x outputs that are already read, we should keep running the task (as this PR does), because the task already successfully fetched the output from the failed machine. We don't do this currently, meaning we're throwing away the wasted work. - Scenario B: (CON of this PR) If the output from M was in the y outputs that have not yet been read, then we should cancel the task, because the task won't learn about the new location for the re-generated output of M (IIUC, there's no functionality to do this now) so is going to fail later on. The current code will re-run the task, which is what we should do. This code will try to re-use the old task, which means the job will take longer to run because the task will fail later on and need to be re-started. If my description above is correct, then this PR is assuming that scenario A is more likely than scenario B, but it seems to me that these two scenarios are equally likely (in which case this PR provides no net benefit). @sitalkedia what are your thoughts here / did I miss something in my description above? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17325: [SPARK-19803][CORE][TEST] Proactive replication test fai...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17325 test changes LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17325: [SPARK-19803][CORE][TEST] Proactive replication test fai...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17325 Left an inline comment to eliminate the brittle Thread.sleep that remains in the test (posting here to make sure this doesn't get lost!) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17325: [SPARK-19803][CORE][TEST] Proactive replication t...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17325#discussion_r108254598 --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala --- @@ -481,27 +481,39 @@ class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehav assert(blockLocations.size === replicationFactor) // remove a random blockManager -val executorsToRemove = blockLocations.take(replicationFactor - 1) +val executorsToRemove = blockLocations.take(replicationFactor - 1).toSet logInfo(s"Removing $executorsToRemove") -executorsToRemove.foreach{exec => - master.removeExecutor(exec.executorId) +initialStores.filter(bm => executorsToRemove.contains(bm.blockManagerId)).foreach { bm => + master.removeExecutor(bm.blockManagerId.executorId) + bm.stop() // giving enough time for replication to happen and new block be reported to master - Thread.sleep(200) + eventually(timeout(5 seconds), interval(100 millis)) { +val newLocations = master.getLocations(blockId).toSet +assert(newLocations.size === replicationFactor) + } } -val newLocations = eventually(timeout(5 seconds), interval(10 millis)) { +val newLocations = eventually(timeout(5 seconds), interval(100 millis)) { val _newLocations = master.getLocations(blockId).toSet assert(_newLocations.size === replicationFactor) _newLocations } logInfo(s"New locations : $newLocations") -// there should only be one common block manager between initial and new locations -assert(newLocations.intersect(blockLocations.toSet).size === 1) -// check if all the read locks have been released +// new locations should not contain stopped block managers +assert(newLocations.forall(bmId => !executorsToRemove.contains(bmId)), + "New locations contain stopped block managers.") + +// this is to ensure the last read lock gets released before we try to +// check for read-locks. The check for read-locks using the method below is not +// idempotent, and therefore can't be used in an `eventually` block. +Thread.sleep(500) --- End diff -- scala test has an eventually primitive that you can use for this; I submitted a PR to your branch with what I was thinking here: https://github.com/shubhamchopra/spark/pull/1/commits/0c2eb9360c027e5d8bc8381e7e1bd56d7911527a In general we should avoid sleeping in tests -- because it tends to lead to Jenkins failures when Jenkins is slow. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17208: [SPARK-19868] conflict TasksetManager lead to spark stop...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17208 Yes can you also merge @squito's test case? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17166: [SPARK-19820] [core] Allow reason to be specified for ta...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17166 I merged this to master. I realized that the PR description is still from an old version of the change, so I modified the commit message to add that this also adds the SparkContext.killTaskAttempt method. Thanks for all of the work here @ericl! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16867: [SPARK-16929] Improve performance when check speculatabl...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/16867 Merged this to master -- thanks for all of the quick updates here @jinxing64! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17297: [SPARK-14649][CORE] DagScheduler should not run duplicat...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17297 @sitalkedia can you file a JIRA in the future when you see flaky test failures? In this case I updated an existing JIRA (https://issues.apache.org/jira/browse/SPARK-19612) but please do this next time -- otherwise these issues never get fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17325: [SPARK-19803][CORE][TEST] Proactive replication t...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17325#discussion_r107821258 --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala --- @@ -481,27 +481,39 @@ class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehav assert(blockLocations.size === replicationFactor) // remove a random blockManager -val executorsToRemove = blockLocations.take(replicationFactor - 1) +val executorsToRemove = blockLocations.take(replicationFactor - 1).toSet logInfo(s"Removing $executorsToRemove") -executorsToRemove.foreach{exec => - master.removeExecutor(exec.executorId) +initialStores.filter(bm => executorsToRemove.contains(bm.blockManagerId)).foreach { bm => + master.removeExecutor(bm.blockManagerId.executorId) + bm.stop() // giving enough time for replication to happen and new block be reported to master - Thread.sleep(200) + eventually(timeout(5 seconds), interval(100 millis)) { +val newLocations = master.getLocations(blockId).toSet +assert(newLocations.size === replicationFactor) + } } -val newLocations = eventually(timeout(5 seconds), interval(10 millis)) { +val newLocations = eventually(timeout(5 seconds), interval(100 millis)) { val _newLocations = master.getLocations(blockId).toSet assert(_newLocations.size === replicationFactor) _newLocations } logInfo(s"New locations : $newLocations") -// there should only be one common block manager between initial and new locations -assert(newLocations.intersect(blockLocations.toSet).size === 1) -// check if all the read locks have been released +// new locations should not contain stopped block managers +assert(newLocations.forall(bmId => !executorsToRemove.contains(bmId)), + "New locations contain stopped block managers.") + +// this is to ensure the last read lock gets released before we try to +// check for read-locks. The check for read-locks using the method below is not +// idempotent, and therefore can't be used in an `eventually` block. +Thread.sleep(500) --- End diff -- Do you think it's better to just add a private[spark] method to check for read locks? I'm worried this test will still be brittle and it seems relatively easy to just add that method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16905: [SPARK-19567][CORE][SCHEDULER] Support some Schedulable ...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/16905 LGTM merged into master --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15326: [SPARK-17759] [CORE] Avoid adding duplicate schedulables
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/15326 @erenavsarogullari is this ready to be updated now that #16813 has been merged? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/16867#discussion_r107812908 --- Diff: core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.util.Arrays --- End diff -- super nit: can you combine these into one import (import java.util.{Arrays, NoSuchElementException}) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/16867#discussion_r107812876 --- Diff: core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.util.Arrays +import java.util.NoSuchElementException + +import scala.collection.mutable.ArrayBuffer +import scala.util.Random + +import org.apache.spark.SparkFunSuite + +class MedianHeapSuite extends SparkFunSuite { + + test("If no numbers in MedianHeap, NoSuchElementException is thrown.") { +val medianHeap = new MedianHeap() +intercept[NoSuchElementException] { + medianHeap.median +} + } + + test("Median should be correct when size of MedianHeap is even") { +val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) +val medianHeap = new MedianHeap() +array.foreach(medianHeap.insert(_)) +assert(medianHeap.size() === 10) +assert(medianHeap.median === ((array(4) + array(5)) / 2.0)) --- End diff -- instead of indexing into the array, I think it would be clearer here to just hard-code 4.5 (it's easier to see that the median is 4.5 than to reason about the indices in the array) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/16867#discussion_r107812807 --- Diff: core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.util.Arrays +import java.util.NoSuchElementException + +import scala.collection.mutable.ArrayBuffer +import scala.util.Random + +import org.apache.spark.SparkFunSuite + +class MedianHeapSuite extends SparkFunSuite { + + test("If no numbers in MedianHeap, NoSuchElementException is thrown.") { +val medianHeap = new MedianHeap() +intercept[NoSuchElementException] { + medianHeap.median +} + } + + test("Median should be correct when size of MedianHeap is even") { +val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) +val medianHeap = new MedianHeap() +array.foreach(medianHeap.insert(_)) +assert(medianHeap.size() === 10) +assert(medianHeap.median === ((array(4) + array(5)) / 2.0)) + } + + test("Median should be correct when size of MedianHeap is odd") { +val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8) +val medianHeap = new MedianHeap() +array.foreach(medianHeap.insert(_)) +assert(medianHeap.size() === 9) +assert(medianHeap.median === (array(4))) + } + + test("Median should be correct though there are duplicated numbers inside.") { +val array = Array(0, 0, 1, 1, 2, 2, 3, 3, 4, 4) --- End diff -- Can you change this to something like: Array(0, 0, 1, 1, 2, 3, 4)? Otherwise the median heap could be handling the duplicates wrong (e.g., by not actually inserting duplicates), and the assertion at the bottom would still old. Then the check at the end can be `medianHeap.median === 1`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/16867#discussion_r107812986 --- Diff: core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.util.Arrays +import java.util.NoSuchElementException + +import scala.collection.mutable.ArrayBuffer +import scala.util.Random + +import org.apache.spark.SparkFunSuite + +class MedianHeapSuite extends SparkFunSuite { + + test("If no numbers in MedianHeap, NoSuchElementException is thrown.") { +val medianHeap = new MedianHeap() +intercept[NoSuchElementException] { + medianHeap.median +} + } + + test("Median should be correct when size of MedianHeap is even") { +val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) +val medianHeap = new MedianHeap() +array.foreach(medianHeap.insert(_)) +assert(medianHeap.size() === 10) +assert(medianHeap.median === ((array(4) + array(5)) / 2.0)) + } + + test("Median should be correct when size of MedianHeap is odd") { +val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8) +val medianHeap = new MedianHeap() +array.foreach(medianHeap.insert(_)) +assert(medianHeap.size() === 9) +assert(medianHeap.median === (array(4))) + } + + test("Median should be correct though there are duplicated numbers inside.") { +val array = Array(0, 0, 1, 1, 2, 2, 3, 3, 4, 4) +val medianHeap = new MedianHeap() +array.foreach(medianHeap.insert(_)) +assert(medianHeap.size === 10) +assert(medianHeap.median === ((array(4) + array(5)) / 2.0)) + } + + test("Median should be correct when skew situations.") { --- End diff -- "when skew situations" --> "when input data is skewed" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/16867#discussion_r107812930 --- Diff: core/src/test/scala/org/apache/spark/util/collection/MedianHeapSuite.scala --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.util.Arrays +import java.util.NoSuchElementException + +import scala.collection.mutable.ArrayBuffer +import scala.util.Random + +import org.apache.spark.SparkFunSuite + +class MedianHeapSuite extends SparkFunSuite { + + test("If no numbers in MedianHeap, NoSuchElementException is thrown.") { +val medianHeap = new MedianHeap() +intercept[NoSuchElementException] { + medianHeap.median +} + } + + test("Median should be correct when size of MedianHeap is even") { +val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) +val medianHeap = new MedianHeap() +array.foreach(medianHeap.insert(_)) +assert(medianHeap.size() === 10) +assert(medianHeap.median === ((array(4) + array(5)) / 2.0)) + } + + test("Median should be correct when size of MedianHeap is odd") { +val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8) +val medianHeap = new MedianHeap() +array.foreach(medianHeap.insert(_)) +assert(medianHeap.size() === 9) +assert(medianHeap.median === (array(4))) --- End diff -- similarly here -- just `medianHeap.median === 4` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/16867#discussion_r107812584 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala --- @@ -893,6 +893,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val taskSet = FakeTask.createTaskSet(4) // Set the speculation multiplier to be 0 so speculative tasks are launched immediately sc.conf.set("spark.speculation.multiplier", "0.0") +sc.conf.set("spark.speculation", "true") --- End diff -- Ohhh cool that makes sense --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17208: [SPARK-19868] conflict TasksetManager lead to spark stop...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17208 @liujianhuiouc have you had time to fix this up yet? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17166: [SPARK-19820] [core] Allow reason to be specified for ta...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17166 LGTM -- this looks great. Thanks for coming up with a simple way to address @mridulm's feedback Eric! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107778618 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -467,7 +474,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( taskState: TaskState, reason: TaskFailedReason): Unit = synchronized { taskSetManager.handleFailedTask(tid, taskState, reason) -if (!taskSetManager.isZombie && taskState != TaskState.KILLED) { +if (!taskSetManager.isZombie) { --- End diff -- Oh that sounds great to me @ericl and minimally invasive! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107776242 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -467,7 +474,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( taskState: TaskState, reason: TaskFailedReason): Unit = synchronized { taskSetManager.handleFailedTask(tid, taskState, reason) -if (!taskSetManager.isZombie && taskState != TaskState.KILLED) { +if (!taskSetManager.isZombie) { --- End diff -- @mridulm It's possible that the makeOffers() call causes a different job's tasks to be executed on the given executor. Fundamentally, the problem is that the killed task needs to be re-scheduled on a different executor, and the only way to guarantee that the task gets offered new/different executors is to do a full reviveOffers() call (which is why the code in question exists in the first place). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107776906 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -467,7 +474,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( taskState: TaskState, reason: TaskFailedReason): Unit = synchronized { taskSetManager.handleFailedTask(tid, taskState, reason) -if (!taskSetManager.isZombie && taskState != TaskState.KILLED) { +if (!taskSetManager.isZombie) { --- End diff -- What if Eric changes TSM.handleFailedTask to return a boolean value indicating whether the failed task needs to be re-scheduled? Then we could use that to decide whether to call reviveOffers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107740806 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -467,7 +474,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( taskState: TaskState, reason: TaskFailedReason): Unit = synchronized { taskSetManager.handleFailedTask(tid, taskState, reason) -if (!taskSetManager.isZombie && taskState != TaskState.KILLED) { +if (!taskSetManager.isZombie) { --- End diff -- @mridulm for (a), what evidence do you have that reviveOffers is costly and the bottleneck in scheduling a large job? I agree that we're adding many reviveOffers calls in the case of large jobs -- but for the other jobs I've benchmarked in the past, I haven't seen this be a bottleneck (and when all of a job's tasks have started running, reviveOffers should be very fast). Re: (b), I did an extensive review of the associated scheduler code and tasks *are* re-attempted (as was verified by Eric's test). As I mentioned above, the only reason they're not re-attempted in the current uses of TaskKilled is because either the stage has been killed (so the task set is marked as a zombie) or because the task attempt has already succeeded elsewhere (as for speculative tasks). Also the Mesos scheduler code uses TaskKilled in the same way as this PR (where Mesos may kill a task that should be re-scheduled elsewhere). If you don't think that killed tasks that haven't succeeded elsewhere will be re-run, can you point to the specific code? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17325: [SPARK-19803][CORE][TEST] Proactive replication test fai...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17325 It looks like this is actually fixing a bug in addition to updating the test. Is it possible to write a unit test for the bug? In any case, can you update the JIRA to describe the bug? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17325: [SPARK-19803][CORE][TEST] Proactive replication test fai...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17325 Jenkins, this is OK to test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16905: [SPARK-19567][CORE][SCHEDULER] Support some Schedulable ...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/16905 Jenkins add to whitelist --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107561714 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -296,12 +298,13 @@ private[spark] class Executor( // If this task has been killed before we deserialized it, let's quit now. Otherwise, // continue executing the task. -if (killed) { +val killReason = reasonIfKilled --- End diff -- Ugh in retrospect I think TaskContext should have just clearly documented that an invariant of reasonIfKilled is that, once set, it won't be un-set, and then we'd avoid all of these corner cases. But not worth changing now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16905: [SPARK-19567][CORE][SCHEDULER] Support some Schedulable ...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/16905 Jenkins, this is ok to test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16905: [SPARK-19567][CORE][SCHEDULER] Support some Schedulable ...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/16905 Jenkins test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17166: [SPARK-19820] [core] Allow reason to be specified for ta...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17166 LGTM. I'll merge once tests pass. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107552896 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala --- @@ -215,7 +215,7 @@ private[spark] class PythonRunner( case e: Exception if context.isInterrupted => logDebug("Exception thrown after task interruption", e) -throw new TaskKilledException +throw new TaskKilledException(context.getKillReason().getOrElse("unknown reason")) --- End diff -- Hm ok if Mridul wants this then fine to leave as-is --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107553383 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -239,14 +239,26 @@ private[spark] class TaskSchedulerImpl private[scheduler]( //simply abort the stage. tsm.runningTasksSet.foreach { tid => val execId = taskIdToExecutorId(tid) - backend.killTask(tid, execId, interruptThread) + backend.killTask(tid, execId, interruptThread, reason = "stage cancelled") } tsm.abort("Stage %s cancelled".format(stageId)) logInfo("Stage %d was cancelled".format(stageId)) } } } + override def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean = { +logInfo(s"Killing task $taskId: $reason") +val execId = taskIdToExecutorId.get(taskId) +if (execId.isDefined) { + backend.killTask(taskId, execId.get, interruptThread, reason) + true +} else { + logInfo(s"Could not kill task $taskId because no task with that ID was found.") --- End diff -- logWarn? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107553047 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -296,12 +298,13 @@ private[spark] class Executor( // If this task has been killed before we deserialized it, let's quit now. Otherwise, // continue executing the task. -if (killed) { +val killReason = reasonIfKilled --- End diff -- why re-name the variable here (instead of just using reasonIfKilled below)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107541348 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -467,7 +474,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( taskState: TaskState, reason: TaskFailedReason): Unit = synchronized { taskSetManager.handleFailedTask(tid, taskState, reason) -if (!taskSetManager.isZombie && taskState != TaskState.KILLED) { +if (!taskSetManager.isZombie) { --- End diff -- I left my comment before seeings Eric's -- but agree with Eric that we should leave this as-is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107534312 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -239,14 +239,21 @@ private[spark] class TaskSchedulerImpl private[scheduler]( //simply abort the stage. tsm.runningTasksSet.foreach { tid => val execId = taskIdToExecutorId(tid) - backend.killTask(tid, execId, interruptThread) + backend.killTask(tid, execId, interruptThread, reason = "stage cancelled") } tsm.abort("Stage %s cancelled".format(stageId)) logInfo("Stage %d was cancelled".format(stageId)) } } } + override def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Unit = { +logInfo(s"Killing task ($reason): $taskId") +val execId = taskIdToExecutorId.getOrElse( + taskId, throw new IllegalArgumentException("Task not found: " + taskId)) --- End diff -- Also it's kind of ugly that this throws an exception (seems like it could be an unhappy surprise to the user that their SparkContext threw an exception / died). How about instead changing the killTaskAttempt calls to return a boolean that's True if the task was successfully killed (and the returning false here)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107539290 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -467,7 +474,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( taskState: TaskState, reason: TaskFailedReason): Unit = synchronized { taskSetManager.handleFailedTask(tid, taskState, reason) -if (!taskSetManager.isZombie && taskState != TaskState.KILLED) { +if (!taskSetManager.isZombie) { --- End diff -- Also, I spent a while making sure that everything is ok in TSM.handleFailedTask @mridum, and all the code there seems to handle resubmission automatically (it just didn't happen previously, when we used TaskKilled for speculative tasks, because we have a check not to re-run tasks if one copy succeeded already) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107533896 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -239,14 +239,21 @@ private[spark] class TaskSchedulerImpl private[scheduler]( //simply abort the stage. tsm.runningTasksSet.foreach { tid => val execId = taskIdToExecutorId(tid) - backend.killTask(tid, execId, interruptThread) + backend.killTask(tid, execId, interruptThread, reason = "stage cancelled") } tsm.abort("Stage %s cancelled".format(stageId)) logInfo("Stage %d was cancelled".format(stageId)) } } } + override def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Unit = { +logInfo(s"Killing task ($reason): $taskId") +val execId = taskIdToExecutorId.getOrElse( + taskId, throw new IllegalArgumentException("Task not found: " + taskId)) --- End diff -- similarly how about s"Cannot kill task $taskId because it no task with that ID was found." --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107539016 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -467,7 +474,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( taskState: TaskState, reason: TaskFailedReason): Unit = synchronized { taskSetManager.handleFailedTask(tid, taskState, reason) -if (!taskSetManager.isZombie && taskState != TaskState.KILLED) { +if (!taskSetManager.isZombie) { --- End diff -- Without this change, the job could hang: if just one task was left, and that task got killed, I don't think reviveOffers would ever be called. @mridulm I'm not that concerned about the extra calls to reviveOffers. In the worse case, if every task in a job is speculated (which of course can't actually happen), this leads to 2x the number of calls to reviveOffers -- so it still doesn't change the asymptotic time complexity even in the worse case. There are already a bunch of cases where we're pretty conservative with reviveOffers, in the sense that we call it even though we might not need to (e.g., when an executor dies, even if there aren't any tasks that need to be run; or every time there are speculative tasks available to run, even if there aren't any resources to run them on) so this change is in keeping with that pattern. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107533603 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -239,14 +239,21 @@ private[spark] class TaskSchedulerImpl private[scheduler]( //simply abort the stage. tsm.runningTasksSet.foreach { tid => val execId = taskIdToExecutorId(tid) - backend.killTask(tid, execId, interruptThread) + backend.killTask(tid, execId, interruptThread, reason = "stage cancelled") } tsm.abort("Stage %s cancelled".format(stageId)) logInfo("Stage %d was cancelled".format(stageId)) } } } + override def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Unit = { +logInfo(s"Killing task ($reason): $taskId") --- End diff -- super nit but can you make this s"Killing task $taskId ($reason)"? This is somewhat more consistent with task-level logging elsewhere --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107531839 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -426,15 +427,17 @@ private[spark] class Executor( setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) -case _: TaskKilledException => - logInfo(s"Executor killed $taskName (TID $taskId)") +case t: TaskKilledException => + logInfo(s"Executor killed $taskName (TID $taskId), reason: ${t.reason}") setTaskFinishedAndClearInterruptStatus() - execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) + execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(t.reason))) case _: InterruptedException if task.killed => - logInfo(s"Executor interrupted and killed $taskName (TID $taskId)") + val killReason = task.maybeKillReason.getOrElse("unknown reason") --- End diff -- Can you change `if task.killed` to `if task.maybeKillReason.isDefied`, and then just do .get here? Then you could get rid of the task.killed variable and avoid the weird dependency between task.killed being set and task.maybeKillReason being defined. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107533292 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala --- @@ -160,15 +160,20 @@ private[spark] abstract class Task[T]( // A flag to indicate whether the task is killed. This is used in case context is not yet // initialized when kill() is invoked. - @volatile @transient private var _killed = false + @volatile @transient private var _maybeKillReason: String = null --- End diff -- Can you update the comment here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107528922 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala --- @@ -215,7 +215,7 @@ private[spark] class PythonRunner( case e: Exception if context.isInterrupted => logDebug("Exception thrown after task interruption", e) -throw new TaskKilledException +throw new TaskKilledException(context.getKillReason().getOrElse("unknown reason")) --- End diff -- why do you need the getOrElse here? (since isInterrupted is true, shouldn't this always be defined?) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107528253 --- Diff: core/src/main/scala/org/apache/spark/TaskContextImpl.scala --- @@ -59,8 +59,8 @@ private[spark] class TaskContextImpl( /** List of callback functions to execute when the task fails. */ @transient private val onFailureCallbacks = new ArrayBuffer[TaskFailureListener] - // Whether the corresponding task has been killed. - @volatile private var interrupted: Boolean = false + // If defined, the corresponding task has been killed for the contained reason. + @volatile private var maybeKillReason: Option[String] = None --- End diff -- How about calling this `reasonIfKilled`, here and elsewhere? (if you strongly prefer the existing name find to leave as-is -- I just slightly prefer making it somewhat more obvious that this and the fact that the task has been killed are tightly intertwined). In any case, can you expand the comment a bit to one you used below: "If specified, this task has been killed and this option contains the reason." --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r107533097 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala --- @@ -160,15 +160,20 @@ private[spark] abstract class Task[T]( // A flag to indicate whether the task is killed. This is used in case context is not yet // initialized when kill() is invoked. - @volatile @transient private var _killed = false + @volatile @transient private var _maybeKillReason: String = null protected var _executorDeserializeTime: Long = 0 protected var _executorDeserializeCpuTime: Long = 0 /** * Whether the task has been killed. */ - def killed: Boolean = _killed + def killed: Boolean = _maybeKillReason != null + + /** + * If this task has been killed, contains the reason for the kill. --- End diff -- As above, can you make the comment "If specified, this task has been killed and this option contains the reason." (assuming that you get rid of the killed variable) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17338: [SPARK-19990][SQL][test-maven]create a temp file for fil...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17338 @srowen I think #17344 was intended to be for a different JIRA and was accidentally assigned the same JIRA# as this PR (although they are related issues). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17344: [SPARK-19990][TEST] Use the database after Hive's curren...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17344 Also @gatorsmile don't forget to update the corresponding JIRA when you merge a PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17344: [SPARK-19990][TEST] Use the database after Hive's curren...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17344 It looks like this has the wrong JIRA id and should be SPARK-19988? I've marked that jira as resolved by this PR, but let me know if I've misunderstood things here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17166: [SPARK-19820] [core] Allow reason to be specified for ta...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17166 I realized you could also just let the task reason be the empty string (or some default reason) in the Executor code. That involves changing fewer LOC but doesn't seem like the right long-term decision, because then it's weird that developers can throw a TaskKilledException but can't specify a reason (and it also leaves the current problem where the string reason propagates through the code in a very hard to reason about way). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17088: [SPARK-19753][CORE] Un-register all shuffle output on a ...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17088 Ok that makes sense. I wanted to make sure that there wasn't some bug in SlaveLost (which might lead to a simpler fix than this) but @squito's description makes it clear that there are a bunch of situations that SlaveLost can't handle correctly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17088: [SPARK-19753][CORE] Un-register all shuffle output on a ...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17088 One meta question here: why aren't we getting a SlaveLost message in this case? I'm asking since there's already code in #14931 to un-register shuffle service files when we get a SlaveLost message, and that seems like a more bulletproof way of handling the case where an entire slave goes down. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17095: [SPARK-19763][SQL]qualified external datasource table lo...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17095 Sounds like this was caused by a different PR (see the comment on the JIRA) and is now being fixed, so never mind here! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17095: [SPARK-19763][SQL]qualified external datasource table lo...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17095 I suspect that this PR is the cause of consistent failures in the maven build, in the HiveCatalogedDDLSuite unit test: https://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.hive.execution.HiveCatalogedDDLSuite&test_name=create+temporary+view+using Based on the error message: https://spark-tests.appspot.com/test-logs/408097945 it looks like the way the path is getting re-written (I *think* by the code in this PR) is causing Hadoop's path code to barf. The create temporary view using unit test is the only one in that suite that reads from a CSV file, which would explain why that's the only one that's failing. @windpiger or @cloud-fan would one of you mind looking into this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17307: [SPARK-13369] Make number of consecutive fetch failures ...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17307 Jenkins, retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17307: [SPARK-13369] Make number of consecutive fetch failures ...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17307 @squito FYI I filed a JIRA for the 2nd of the two unit tests that failed in that run (looks like you'd already filed a JIRA for the first one) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r106555744 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -710,7 +710,11 @@ private[spark] class TaskSetManager( logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " + s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " + s"as the attempt ${info.attemptNumber} succeeded on ${info.host}") - sched.backend.killTask(attemptInfo.taskId, attemptInfo.executorId, true) + sched.backend.killTask( +attemptInfo.taskId, +attemptInfo.executorId, +interruptThread = true, +reason = "another attempt succeeded") --- End diff -- Ok let's leave this as-is -- seems too complicated to have a longer and shorter reason (and unlike the reason above, this one is per-task, so hard to summarize on the stage page) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17166: [SPARK-19820] [core] Allow reason to be specified...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17166#discussion_r106555639 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -2250,6 +2250,22 @@ class SparkContext(config: SparkConf) extends Logging { } /** + * Kill and reschedule the given task attempt. Task ids can be obtained from the Spark UI + * or through SparkListener.onTaskStart. + * + * @param taskId the task ID to kill. This id uniquely identifies the task attempt. + * @param interruptThread whether to interrupt the thread running the task. + * @param reason the reason for killing the task, which should be a short string. If a task + * is killed multiple times with different reasons, only one reason will be reported. + */ + def killTaskAttempt( + taskId: Long, + interruptThread: Boolean = true, + reason: String = "cancelled"): Unit = { --- End diff -- As discussed how about "killed via SparkContext.killTaskAttempt" or similar? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17166: [SPARK-19820] [core] Allow reason to be specified for ta...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17166 Thinking about this more, this seems like two separate changes (that should probably be separated): (1) Allowing cancellations to be injected via SparkContext. This seems like it should have its own JIRA, and is relatively few LOC (so should be easy to decouple). Those changes look fine and I think are good to merge as-is if you move them to a new PR. (2) Allowing reasons to be specified. This changes the API and changes many LOC. I'm skeptical of this change: I think this could be helpful if descriptive reasons are allowed (like the few I suggested in the comments), but if you restrict reasons to a few words so that they fit in the stage summary page, they don't seem very useful to a user. E.g., the default message of "cancelled" when sc.killTask is used seems pretty meaningless (and will require someone to read the code to understand -- at which point it seems like they might as well look in the logs instead of getting info from the UI). This doesn't seem useful enough to merit an API change, but maybe I'm missing something important here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17307: [SPARK-13369] Make number of consecutive fetch fa...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17307#discussion_r106337950 --- Diff: docs/configuration.md --- @@ -1506,6 +1506,11 @@ Apart from these, the following properties are also available, and may be useful of this setting is to act as a safety-net to prevent runaway uncancellable tasks from rendering an executor unusable. + spark.stage.maxConsecutiveAttempts + 4 + +Number of consecutive stage retries allowed before a stage is aborted. --- End diff -- Hah sorry for all of the comment changes from the combination of Imran and me!! But I agree that this was an issue before and would be good to update. Thanks for the many updates here @sitalkedia. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17307: [SPARK-13369] Make number of consecutive fetch failures ...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17307 @sitalkedia please do not ignore flaky test failures. I updated the JIRA associated with this one: https://issues.apache.org/jira/browse/SPARK-19803?jql=project%20%3D%20SPARK%20AND%20labels%20%3D%20flaky-test but please be sure to do this yourself before re-testing next time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16905: [SPARK-19567][CORE][SCHEDULER] Support some Schedulable ...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/16905 Jenkins retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17307: [SPARK-13369] Make number of consecutive fetch fa...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17307#discussion_r106305175 --- Diff: docs/configuration.md --- @@ -1506,6 +1506,11 @@ Apart from these, the following properties are also available, and may be useful of this setting is to act as a safety-net to prevent runaway uncancellable tasks from rendering an executor unusable. + spark.stage.maxConsecutiveAttempts + 4 + +Number of consecutive stage retries allowed before a stage is aborted (since you can have multiple fetch failures in one stage attempt) --- End diff -- Oops sorry just noticing that I don't think @squito intended for the part in parens to be in the comment? You could make this instead Number of consecutive stage retries allowed before a stage is aborted (stages are retried if shuffle fetch failures occur). or, you could just eliminate the part in parens. Sorry I didn't notice this earlier! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15604: [SPARK-18066] [CORE] [TESTS] Add Pool usage policies tes...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/15604 Thanks for your work on this @erenavsarogullari. I've merged this into master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17307: [SPARK-13369] Make number of consecutive fetch fa...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17307#discussion_r106298407 --- Diff: docs/configuration.md --- @@ -1506,6 +1506,11 @@ Apart from these, the following properties are also available, and may be useful of this setting is to act as a safety-net to prevent runaway uncancellable tasks from rendering an executor unusable. + spark.stage.maxAttempts --- End diff -- @squito maxConsecutiveAttempts? Realized maxAttempts is slightly misleading / might be good to emphasize that they're consecutive --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17307: [SPARK-13369] Make number of consecutive fetch fa...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17307#discussion_r106298013 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala --- @@ -34,8 +34,10 @@ private[spark] class ResultStage( val partitions: Array[Int], parents: List[Stage], firstJobId: Int, +maxConsecutiveFetchFailures: Int, --- End diff -- I don't think you need this anymore. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17307: [SPARK-13369] Make number of consecutive fetch fa...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17307#discussion_r106298195 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -187,6 +187,11 @@ class DAGScheduler( /** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */ private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false) + /** Number of consecutive fetch failures allowed before a stage is aborted */ --- End diff -- Can you change this and the comment on line 1738 to the comment Imran suggested for the docs? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17307: [SPARK-13369] Make number of consecutive fetch fa...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17307#discussion_r106298559 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -187,6 +187,11 @@ class DAGScheduler( /** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */ private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false) + /** Number of consecutive fetch failures allowed before a stage is aborted */ + private[scheduler] val maxConsecutiveFetchFailuresPerStage = --- End diff -- Ok one more naming nit: can you make this consistent with the config name, so maxConsecutiveStageAttempts (and ditto with the constant)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17307: [SPARK-13369] Make number of consecutive fetch fa...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17307#discussion_r106297978 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala --- @@ -41,9 +41,10 @@ private[spark] class ShuffleMapStage( numTasks: Int, parents: List[Stage], firstJobId: Int, +maxConsecutiveFetchFailures: Int, --- End diff -- I don't think you need this anymore --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17307: [SPARK-13369] Make number of consecutive fetch fa...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/17307#discussion_r106297948 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Stage.scala --- @@ -59,6 +59,7 @@ private[scheduler] abstract class Stage( val numTasks: Int, val parents: List[Stage], val firstJobId: Int, +val maxConsecutiveFetchFailures: Int, --- End diff -- I don't think you need this anymore --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16905: [SPARK-19567][CORE][SCHEDULER] Support some Schedulable ...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/16905 Jenkins test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11254: [SPARK-13369] Make number of consecutive fetch fa...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/11254#discussion_r106293790 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Stage.scala --- @@ -118,7 +119,7 @@ private[scheduler] abstract class Stage( */ private[scheduler] def failedOnFetchAndShouldAbort(stageAttemptId: Int): Boolean = { fetchFailedAttemptIds.add(stageAttemptId) -fetchFailedAttemptIds.size >= Stage.MAX_CONSECUTIVE_FETCH_FAILURES +fetchFailedAttemptIds.size >= maxConsecutiveFetchFailure --- End diff -- What about instead cleaning this code path up a bit (this isn't the fault of your PR, but your PR could help make things better here!) by: (1) eliminate this method (2) make the fetchFailedAttemptIds variable non-private (3) In the DAGScheduler on line 1268 (right after the else), add "stage.fetchFailedAttemptIds.add(task.stageAttemptId" (4) change the call to this method to instead directly check "(stage.fetchFailedAttemptIds.size >= maxConsec..." IMO as is, this function is hurting understandability as-is. This change would also allow you to eliminate maxConsecutiveFetchFailure from the Stage object altogether. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11254: [SPARK-13369] Make number of consecutive fetch fa...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/11254#discussion_r106292588 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Stage.scala --- @@ -58,6 +58,7 @@ private[scheduler] abstract class Stage( val numTasks: Int, val parents: List[Stage], val firstJobId: Int, +val maxConsecutiveFetchFailure: Int, --- End diff -- nit: this should have an "s" at the end (same elsewhere) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11254: [SPARK-13369] Make number of consecutive fetch fa...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/11254#discussion_r106292533 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Stage.scala --- @@ -145,6 +146,6 @@ private[scheduler] abstract class Stage( } private[scheduler] object Stage { - // The number of consecutive failures allowed before a stage is aborted - val MAX_CONSECUTIVE_FETCH_FAILURES = 4 + // The number of consecutive fetch failures allowed before a stage is aborted + val DEFAULT_MAX_CONSECUTIVE_FETCH_FAILURES = 4 --- End diff -- Given that this is used in the DAGScheduler, not here, I think it makes more sense to put it in the DAGScheduler object. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11254: [SPARK-13369] Make number of consecutive fetch fa...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/11254#discussion_r106292014 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -180,6 +180,11 @@ class DAGScheduler( /** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */ private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false) + /** Number of consecutive fetch failures allowed before a stage is aborted */ + private[scheduler] val maxConsecutiveFetchFailuresPerStage = +sc.getConf.getInt("spark.max.fetch.failures.per.stage", --- End diff -- I agree with spark.stage.maxAttempts. We could also do spark.stage.maxFailures but somehow that seems more easily confused with the number of task failures per stage. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11254: [SPARK-13369] Make number of consecutive fetch failures ...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/11254 GIven discussion with @sitalkedia on other PRs I agree that it makes sense to add now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16905: [SPARK-19567][CORE][SCHEDULER] Support some Sched...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/16905#discussion_r106244124 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -73,17 +73,15 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } } - def setupScheduler(confs: (String, String)*): TaskSchedulerImpl = { + private def setupScheduler(confs: (String, String)*): TaskSchedulerImpl = { --- End diff -- one quick comment here if Imran didn't already merge: can you un-do these changes? It's not useful / necessary to make test classes private (they're already hidden by the build), and this change will make git blames more confusing in the future. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17297: [SPARK-14649][CORE] DagScheduler should not run duplicat...
Github user kayousterhout commented on the issue: https://github.com/apache/spark/pull/17297 @sitalkedia I won't have time to review this in detail for at least a few weeks, just so you know (although others may have time to review / merge it). At a very high level, I'm concerned about the amount of complexity that this adds to the scheduler code. We've recently had to deal with a number of subtle bugs with jobs hanging or Spark crashing as a result of trying to handle map output from old tasks. As a result, I'm hesitant to add more complexity -- and the associated risk of bugs that cause job failures + expense of maintaining the code -- to improve performance. At the point I'd lean towards cancelling outstanding map tasks when a fetch failure occurs (there's currently a TODO in the code to do this) to simplify these issues. This would improve performance in some ways, by freeing up slots that could be used for something else, at the expense of wasted work if the tasks have already made significant progress. But it would significantly simplify the scheduler code, which given the debugging + reviewer time that has gone into fixing subtle issues with this code path, I think is worthwhile. Curious what other folks think here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/16867#discussion_r106235234 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -911,17 +919,16 @@ private[spark] class TaskSetManager( override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = { // Can't speculate if we only have one task, and no need to speculate if the task set is a // zombie. -if (isZombie || numTasks == 1) { +if (isZombie || numTasks == 1 || !speculationEnabled) { --- End diff -- I don't think you need this change -- whether speculation is enabled is checked before calling this function (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L177) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16867: [SPARK-16929] Improve performance when check spec...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/16867#discussion_r106237592 --- Diff: core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import scala.collection.mutable --- End diff -- import PriorityQueue directly --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org