[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/16620 I'll spend some time today trying to sort out the relative merits of the fix options; but in the meantime, there's also no good reason for `TaskSchedulerImpl.rootPool` to be a `var` initialized as `null`, nor any good reason for `TaskScheduler.rootPool` to be able to produce `null`. Cleaning that up also makes code in this PR slightly simpler: https://github.com/markhamstra/spark/commit/e11fe2a9817559492daee03c8c025879dc44d346 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16813: [SPARK-19466][CORE][SCHEDULER] Improve Fair Sched...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/16813#discussion_r99686720 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala --- @@ -69,19 +72,29 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) val DEFAULT_WEIGHT = 1 override def buildPools() { -var is: Option[InputStream] = None +var fileData: Option[FileData] = None try { - is = Option { -schedulerAllocFile.map { f => - new FileInputStream(f) -}.getOrElse { - Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE) + fileData = schedulerAllocFile.map { f => +Some(FileData(new FileInputStream(f), f)) + }.getOrElse { +val is = Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE) +if(is != null) Some(FileData(is, DEFAULT_SCHEDULER_FILE)) +else { + logWarning(s"No Fair Scheduler file found.") + None } } - is.foreach { i => buildFairSchedulerPool(i) } + fileData.foreach { data => +logInfo(s"Fair Scheduler file: ${data.fileName} is found successfully and will be parsed.") --- End diff -- s"Creating Fair Scheduler pools from ${data.fileName}" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16813: [SPARK-19466][CORE][SCHEDULER] Improve Fair Sched...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/16813#discussion_r99686323 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala --- @@ -69,19 +72,29 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) val DEFAULT_WEIGHT = 1 override def buildPools() { -var is: Option[InputStream] = None +var fileData: Option[FileData] = None try { - is = Option { -schedulerAllocFile.map { f => - new FileInputStream(f) -}.getOrElse { - Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE) + fileData = schedulerAllocFile.map { f => +Some(FileData(new FileInputStream(f), f)) + }.getOrElse { +val is = Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE) +if(is != null) Some(FileData(is, DEFAULT_SCHEDULER_FILE)) +else { + logWarning(s"No Fair Scheduler file found.") --- End diff -- "Fair Scheduler configuration file not found." --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16813: [SPARK-19466][CORE][SCHEDULER] Improve Fair Scheduler Lo...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/16813 Looks reasonable, but I'd prefer slightly different log messages. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/16620#discussion_r97140817 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1193,7 +1193,15 @@ class DAGScheduler( } if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { - markStageAsFinished(shuffleStage) + val activeTaskSetManagerExist = --- End diff -- And since it is being used as `!activeTaskSetManagerExists`, you could reverse the sense, avoid needing the `!`, and call it something like `noActiveTaskSetManager`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/16620#discussion_r97139832 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1193,7 +1193,15 @@ class DAGScheduler( } if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { - markStageAsFinished(shuffleStage) + val activeTaskSetManagerExist = --- End diff -- nit: should be `activeTaskSetManagerExists` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/16620#discussion_r97139668 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1193,7 +1193,15 @@ class DAGScheduler( } if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { - markStageAsFinished(shuffleStage) + val activeTaskSetManagerExist = +if (taskScheduler.rootPool != null) { + taskScheduler.rootPool.getSortedTaskSetQueue.exists { +tsm => tsm.stageId == stageId && !tsm.isZombie + } +} else false --- End diff -- The `if...else` is unnecessary: ```scala val activeTaskSetManagerExist = taskScheduler.rootPool != null && taskScheduler.rootPool.getSortedTaskSetQueue.exists { tsm => tsm => tsm.stageId == stageId && !tsm.isZombie } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should handle stage's pending...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/16620 Jenkins, test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should handle stage's pending...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/16620 Beyond the lack of new tests, this patch is causing a couple of existing DAGSchedulerSuite tests to fail for me locally: "run trivial shuffle with out-of-band failure and retry" and "map stage submission with executor failure late map task completions" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should handle stage's pending...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/16620 ok to test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should handle stage's pending...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/16620 Thanks for the work thus far, @jinxing64 , but this really needs updated test coverage before we can consider merging it. @squito --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16437: [SPARK-19028] [SQL] Fixed non-thread-safe functions used...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/16437 Please update the description in the JIRA ticket. What is there now is simply not adequate, meaning that if anyone has to come back and address this issue some time on the future, what is there does not describe *why* a change was needed sufficiently for that person to avoid needing to look at the details in the code (which may well be quite different by that time.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16291: [SPARK-18838][CORE] Use separate executor service for ea...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/16291 > Each listener should in theory could work independent of each other and we should only guarantee ordered processing of the events within a listener. If we were starting from nothing, then yes, it would be valid and advisable to design the Listener infrastructure using only this weaker guarantee. The issue, though, is that we are not starting from nothing, but rather from a system that currently offers a much stronger guarantee on the synchronized behavior of Listeners. If it is the case that no Listeners currently rely on the stronger guarantee and thus could work completely correctly under the weaker guarantee of this PR, then we could make this change without much additional concern. But reaching that level of confidence in current Listeners is a difficult prerequisite -- strictly speaking, it's an impossible task. We could carefully work through all the internal behavior of Spark's Listeners to convince ourselves that they can work correctly under the new, weaker guarantee. At a bare minimum, we need to do that much before we can consider merging this PR -- but that's probably not enough. The problem is that Listeners aren't just internal to Spark. Users have also developed their own custom Listeners that either implement `SparkListenerInterface` or extend `SparkListener` or `SparkFirehoseListener`, and we can't just assume that those custom Listeners don't rely upon the current guarantee to either synchronize behavior with other custom Listeners or even with Spark internal Listeners. Since we can't know that user Listeners don't already rely upon the current, stronger guarantee, the question now becomes whether we even have the freedom to change that guarantee within the lifetime of Spark 2.x, or whether any such change would have to wait for Spark 3.x. `SparkListener` is still annotated as `@DeveloperAPI`, so if that were the only piece in play, then we could change its guarantee fairly freely. `SparkListenerInterface` is almost as good, since it includes the admonition in a comment to "[n]ote that this is an internal interface which might change in different Spark releases." The stickier issue is with `SparkFirehoseListener`, which carries no such annotations or comments, but is just a plain public class and API. So, after convincing ourselves that Spark's internal Listeners would be fine with this PR, we'd still have to convince the Spark PMC that changing the public `SparkFirehoseListener` (with prominent warnings in the release notes, of course) before Spark 3.x would be acceptable. And all of the above is still really only arguing about whether we *could* adopt this PR in essentially its present form. There are still questions of whether we *should* do this or maybe instead we should do something a little different or more. I can see some merit in Marcelo's "opt in" suggestion. If there is utility in having groups of Listeners that can rely upon synchronized behavior, then we should probably retain one or more threads running synchronized Listeners. For example, if Listener A relies upon synchronization with Listeners B and C while D needs to synchronize with E, but F, G and H are all independent, then there are a couple of things we could do. First, the independent Listeners (F, G and H) can each run in its own thread, providing the scalable performance that this PR is aiming for. After that, we could either have one synchronized Listener thread for all the other Listeners, or we could have one thread for A, B and C and one thread for D and E. Wheth er we support only one synchronized Listener group/thread or multiple, we'd still need some mechanism for Listeners to select into a synchronized group or to indicate that they can and should be run independently on their own thread. @rxin --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12775: [SPARK-14958][Core] Failed task not handled when there's...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/12775 Yeah, that's ok, @kayousterhout --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16291: [SPARK-18838][CORE] Use separate executor service for ea...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/16291 > You are right, that's why the executor service is single threaded which guarantees ordered processing of the events per listener. But this is still substantially different from the ordering guarantees in the current implementation, and I'm not sure that this PR wouldn't invalidate some assumptions made by current listeners. Events will end up in the same order in each `ListenerEventExecutor` (at least by my understanding, which someone else should double-check), but there is no synchronization between `ListenerEventExecutor`s of the processing of their `eventQueue`s; so it is entirely possible. for example, for one `ListenerEventExecutor` to process a task end event for a particular task before another `ListenerEventExecutor` has worked sufficiently through its `eventQueue` to have even seen the corresponding task start event. That is quite a bit different than the prior ordering guarantee implied by the comment "`onPostEvent` is guaranteed to be called in the same thread for all listeners." Whether this actually constitutes a problem or not, I'm not certain; but serializing event processing within each `ListenerEventExecutor` isn't enough if `Listener`s on different `ListenerEventExecutor` threads expect to share mutable state. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15335: [SPARK-17769][Core][Scheduler]Some FetchFailure refactor...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/15335 It's worth something, but not a lot. I think it's worth merging, but if someone thinks it's not, I'm not going to fight it. Read it and merge if you want to. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15335: [SPARK-17769][Core][Scheduler]Some FetchFailure refactor...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/15335 @vanzin It should be committed if you think it adds enough additional clarity that it is worth the penalty of making future backporting or other debugging maintenance a little more difficult. It doesn't change functionality much, so it's mostly just to increase the comprehensibility of the code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE] Introduce "task reaper" to ov...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r92005198 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -161,12 +163,7 @@ private[spark] class Executor( * @param interruptThread whether to interrupt the task thread */ def killAllTasks(interruptThread: Boolean) : Unit = { -// kill all the running tasks -for (taskRunner <- runningTasks.values().asScala) { - if (taskRunner != null) { -taskRunner.kill(interruptThread) - } -} +runningTasks.keys().asScala.foreach(t => killTask(t, interruptThread = interruptThread)) --- End diff -- Yes, it is not at all a likely case. What is the most likely case does concern me, though. The default value for `spark.job.interruptOnCancel` is `false`; and from the little bit of discussion on SPARK-17064, it doesn't seem that it will be very easy to change that with confidence. That means that `interruptThread` will also be `false` except for the hard-coded `true` in `handleSuccessfulTask` (which is itself suspect). The way that I am understanding the default behavior of this PR, then, is that if the reaper functionality is enabled, any kill request of a task (other than from within `handleSuccessfulTask`) that doesn't manage to complete within `killTimeoutMs` of that kill request will result in the Executor's JVM being killed -- i.e. the most likely default behavior looks to boil down to `if(killTask) killJvmAfterKillTimeoutMs`. That is potentially expensive in terms of lost state from that Executor. If the external shuffle service is being used, then we shouldn't need to lose the state of the shuffle files; and if one of the external block storage options is being used, then we shouldn't need to lose state on cached RDDs/tables, broadcast variables and/or accumulators. But none of that externalization and persistence of state across Executor restarts is currently the default -- and that concerns me some. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE] Introduce "task reaper" to ov...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r91764815 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -84,6 +84,16 @@ private[spark] class Executor( // Start worker thread pool private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker") private val executorSource = new ExecutorSource(threadPool, executorId) + // Pool used for threads that supervise task killing / cancellation + private val taskReaperPool = ThreadUtils.newDaemonCachedThreadPool("Task reaper") + // For tasks which are in the process of being killed, this map the most recently created --- End diff -- That sentence no verb. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/16189#discussion_r91364564 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -161,12 +163,7 @@ private[spark] class Executor( * @param interruptThread whether to interrupt the task thread */ def killAllTasks(interruptThread: Boolean) : Unit = { -// kill all the running tasks -for (taskRunner <- runningTasks.values().asScala) { - if (taskRunner != null) { -taskRunner.kill(interruptThread) - } -} +runningTasks.keys().asScala.foreach(t => killTask(t, interruptThread = interruptThread)) --- End diff -- Or access TaskReapers via something like a guava LoadingCache. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16065: [SPARK-18631][SQL] Changed ExchangeCoordinator re-partit...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/16065 @rxin fixed it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16065: [SPARK-17064][SQL] Changed ExchangeCoordinator re...
GitHub user markhamstra opened a pull request: https://github.com/apache/spark/pull/16065 [SPARK-17064][SQL] Changed ExchangeCoordinator re-partitioning to avoid additional data ⦠## What changes were proposed in this pull request? Re-partitioning logic in ExchangeCoordinator changed so that adding another pre-shuffle partition to the post-shuffle partition will not be done if doing so would cause the size of the post-shuffle partition to exceed the target partition size. ## How was this patch tested? Existing tests updated to reflect new expectations. You can merge this pull request into a Git repository by running: $ git pull https://github.com/markhamstra/spark SPARK-17064 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16065.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16065 commit 561fcf67bd3c1541352b00f33981a44fa58a6ccc Author: Mark Hamstra <markhams...@gmail.com> Date: 2016-11-29T20:34:03Z Changed ExchangeCoordinator re-partitioning to avoid additional data skew --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskSetManag...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/15986 Thanks, @JoshRosen --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskSetManag...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/15986 If Kay is happy with the last couple of changes, then I'm fine with this, too. The only tiny nit I've still got is a change from `runningTasksByExecutors()` to `runningTasksByExecutors`. Outside of this PR, there's only a single call site in `SparkStatusTracker`, so the fix is pretty trivial -- but so is this issue itself, so I don't really care much if it stays as is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskS...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/15986#discussion_r89423666 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -335,31 +337,31 @@ private[spark] class TaskSchedulerImpl( var reason: Option[ExecutorLossReason] = None synchronized { try { -if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) { - // We lost this entire executor, so remember that it's gone - val execId = taskIdToExecutorId(tid) - - if (executorIdToTaskCount.contains(execId)) { +taskIdToTaskSetManager.get(tid) match { + case Some(taskSet) if state == TaskState.LOST => +// TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode, +// where each executor corresponds to a single task, so mark the executor as failed. +val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException( + "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)")) reason = Some( SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) removeExecutor(execId, reason.get) --- End diff -- Yup, good point. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskSetManag...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/15986 The code looks pretty good @JoshRosen , but I still want to spend some time looking at your standalone end-to-end reproduction to get more familiar with the details. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskS...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/15986#discussion_r89411560 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -88,10 +88,12 @@ private[spark] class TaskSchedulerImpl( // Incrementing task IDs val nextTaskId = new AtomicLong(0) - // Number of tasks running on each executor - private val executorIdToTaskCount = new HashMap[String, Int] + // IDs of the tasks running on each executor + private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]] - def runningTasksByExecutors(): Map[String, Int] = executorIdToTaskCount.toMap + def runningTasksByExecutors(): Map[String, Int] = synchronized { --- End diff -- Is there a reason why this shouldn't be `def runningTasksByExecutors: Map[String, Int]` -- it's not a mutator. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11888: [SPARK-14069][SQL] Improve SparkStatusTracker to also tr...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/11888 @rxin Do you mean the N/A in "How was this patch tested?" Some guy said that the lack of tests was ok. https://github.com/apache/spark/pull/11888#issuecomment-200162987 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15463: [SPARK-17894] [CORE] Ensure uniqueness of TaskSetManager...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/15463 Whoa! Jenkins disrespects @kayousterhout -- bad Jenkins! Or did you actually fix something @shivaram ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15531: [SQL][STREAMING][TEST] Follow up to remove Option.contai...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/15531 @rxin Nonsense; it's just as good an idea as are List.contains and Set.contains. The only problem with it is that it doesn't exist in Scala 2.10. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15335: [SPARK-17769][Core][Scheduler]Some FetchFailure r...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/15335#discussion_r83710471 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1255,27 +1255,46 @@ class DAGScheduler( s"longer running") } - if (disallowStageRetryForTest) { -abortStage(failedStage, "Fetch failure will not retry stage due to testing config", - None) - } else if (failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) { -abortStage(failedStage, s"$failedStage (${failedStage.name}) " + - s"has failed the maximum allowable number of " + - s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " + - s"Most recent failure reason: ${failureMessage}", None) - } else { -if (failedStages.isEmpty) { - // Don't schedule an event to resubmit failed stages if failed isn't empty, because - // in that case the event will already have been scheduled. - // TODO: Cancel running tasks in the stage - logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + -s"$failedStage (${failedStage.name}) due to fetch failure") - messageScheduler.schedule(new Runnable { -override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) - }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) + val shouldAbortStage = +failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) || +disallowStageRetryForTest + + if (shouldAbortStage) { +val abortMessage = if (disallowStageRetryForTest) { + "Fetch failure will not retry stage due to testing config" +} else { + s"""$failedStage (${failedStage.name}) + |has failed the maximum allowable number of + |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. + |Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ") } +abortStage(failedStage, abortMessage, None) + } else { // update failedStages and make sure a ResubmitFailedStages event is enqueued +// TODO: Cancel running tasks in the failed stage -- cf. SPARK-17064 +val noResubmitEnqueued = !failedStages.contains(failedStage) --- End diff -- Ok, but it's really not that complicated or difficult to understand. There is only one way to add stages to `failedStages`: within the `FetchFailed` case. When a `failedStage` is added to `failedStages`, it is always accompanied by the parent `mapStage`. There are only two ways to remove stages from `failedStages`: 1) within the handling of a `ResubmitFailedStages` event, when the entire `failedStages` is cleared; 2) within `cleanupStateForJobAndIndependentStages` when we call `removeStage`. Obviously, 1) can't produce a state where `mapStage` is not in `failedStage` while a corresponding `failedStage` is, so the only logic we need to concern ourselves with is in 2). In order for 2) to produce a state where `mapStage` is absent from `failedStages` while an associated `failedStage` is present, `removeStage` would need to have been called on the `mapStage` while not being called on the `failedStage`. But that can't happen because `removeStage` will not be called on a stage unless no Job needs that stage anymore. If no job needs the `mapStage`, then no job can need a `failedStage` that uses the output of that `mapStage` -- i.e. it is not possible that a `mapStage` will be removed in `cleanupStateForJobAndIndependentStages` unless every associated `failedStage` will also be removed. Conclusion: It is never possible for `mapStage` to be absent from `failedStages` at the same time that `failedStages` is present, so the proposed `|| !failedStages.contains(mapStage)` condition will never be checked -- it would just be unreachable and misleading code. There also isn't really any need for concern over lack of tests. There is no need to prove correctness of the current code for something that can't happen presently, so the only point of such a test would be to guard against some future mistaken change making it possible to remove a failed `mapStage` while some `failedStage` still needs it. If that happens, then we've got far bigger problems than checking whether we need to issue a new `ResubmitFailedStages` event, and checks for that kind of broken removal of parents while their children are still d
[GitHub] spark issue #14158: [SPARK-13547] [SQL] [WEBUI] Add SQL query in web UI's SQ...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/14158 @nblintao Got it; thanks. There may be distinct queries that will be entirely the same within the first 1000 characters, but that's just the nature of working with these very large queries -- there are lots of things that make them difficult, but it sounds like you've made sure not to make things difficult for more normal queries as a consequence, so that's all good. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14158: [SPARK-13547] [SQL] [WEBUI] Add SQL query in web UI's SQ...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/14158 @nblintao I really haven't reviewed these changes closely enough to have a specific complaint or concern in mind, but I'm more concerned about what happens when you ask to see "more" when that "more" could be several tens of thousands of characters, not just the few thousand that many would consider to be a very long SQL query. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14158: [SPARK-13547] [SQL] [WEBUI] Add SQL query in web UI's SQ...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/14158 It's great if it is already addressed, but I just didn't see anything explicit in the discussion or examples that showed any query of the magnitude that I am talking about. Machine-generated queries can be staggeringly large (e.g. I've seen queries that were so long and complicated that they took Spark SQL more than 9 minutes just to parse), and this UI enhancement must be prepared to handle those. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14158: [SPARK-13547] [SQL] [WEBUI] Add SQL query in web UI's SQ...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/14158 Please be certain that this works well even for very large queries. They are not commonplace, but I know that Spark SQL does sometimes get asked to handle SQL queries that are hundreds of lines long. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15335: [SPARK-17769][Core][Scheduler]Some FetchFailure r...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/15335#discussion_r83289400 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1255,27 +1255,46 @@ class DAGScheduler( s"longer running") } - if (disallowStageRetryForTest) { -abortStage(failedStage, "Fetch failure will not retry stage due to testing config", - None) - } else if (failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) { -abortStage(failedStage, s"$failedStage (${failedStage.name}) " + - s"has failed the maximum allowable number of " + - s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " + - s"Most recent failure reason: ${failureMessage}", None) - } else { -if (failedStages.isEmpty) { - // Don't schedule an event to resubmit failed stages if failed isn't empty, because - // in that case the event will already have been scheduled. - // TODO: Cancel running tasks in the stage - logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + -s"$failedStage (${failedStage.name}) due to fetch failure") - messageScheduler.schedule(new Runnable { -override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) - }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) + val shouldAbortStage = +failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) || +disallowStageRetryForTest + + if (shouldAbortStage) { +val abortMessage = if (disallowStageRetryForTest) { + "Fetch failure will not retry stage due to testing config" +} else { + s"""$failedStage (${failedStage.name}) + |has failed the maximum allowable number of + |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. + |Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ") } +abortStage(failedStage, abortMessage, None) + } else { // update failedStages and make sure a ResubmitFailedStages event is enqueued +// TODO: Cancel running tasks in the failed stage -- cf. SPARK-17064 +val noResubmitEnqueued = !failedStages.contains(failedStage) failedStages += failedStage failedStages += mapStage +if (noResubmitEnqueued) { + // We expect one executor failure to trigger many FetchFailures in rapid succession, + // but all of those task failures can typically be handled by a single resubmission of + // the failed stage. We avoid flooding the scheduler's event queue with resubmit + // messages by checking whether a resubmit is already in the event queue for the + // failed stage. If there is already a resubmit enqueued for a different failed + // stage, that event would also be sufficient to handle the current failed stage, but + // producing a resubmit for each failed stage makes debugging and logging a little + // simpler while not producing an overwhelming number of scheduler events. + logInfo( +s"Resubmitting $mapStage (${mapStage.name}) and " + +s"$failedStage (${failedStage.name}) due to fetch failure" + ) + messageScheduler.schedule( --- End diff -- Yup, move further discussion there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15441: [SPARK-4411] [Web UI] Add "kill" link for jobs in...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/15441#discussion_r83105534 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala --- @@ -35,4 +37,20 @@ private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") { attachPage(new AllJobsPage(this)) attachPage(new JobPage(this)) + + def handleKillRequest(request: HttpServletRequest): Unit = { +if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) { + val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean + val jobId = Option(request.getParameter("id")).map(_.toInt) + jobId.foreach { id => +if (killFlag && jobProgresslistener.activeJobs.contains(id)) { + sc.foreach(_.cancelJob(id)) + // Do a quick pause here to give Spark time to kill the job so it shows up as + // killed after the refresh. Note that this will block the serving thread so the + // time should be limited in duration. + Thread.sleep(100) --- End diff -- Your doing the sleep here even if `sc` is `None`, but I wouldn't expect that that is even possible or that it is handled completely correctly elsewhere in the Web UI if it actually is possible, so this is likely fine in practice. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15335: [SPARK-17769][Core][Scheduler]Some FetchFailure r...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/15335#discussion_r83073697 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1255,27 +1255,46 @@ class DAGScheduler( s"longer running") } - if (disallowStageRetryForTest) { -abortStage(failedStage, "Fetch failure will not retry stage due to testing config", - None) - } else if (failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) { -abortStage(failedStage, s"$failedStage (${failedStage.name}) " + - s"has failed the maximum allowable number of " + - s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " + - s"Most recent failure reason: ${failureMessage}", None) - } else { -if (failedStages.isEmpty) { - // Don't schedule an event to resubmit failed stages if failed isn't empty, because - // in that case the event will already have been scheduled. - // TODO: Cancel running tasks in the stage - logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + -s"$failedStage (${failedStage.name}) due to fetch failure") - messageScheduler.schedule(new Runnable { -override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) - }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) + val shouldAbortStage = +failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) || +disallowStageRetryForTest + + if (shouldAbortStage) { +val abortMessage = if (disallowStageRetryForTest) { + "Fetch failure will not retry stage due to testing config" +} else { + s"""$failedStage (${failedStage.name}) + |has failed the maximum allowable number of + |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. + |Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ") } +abortStage(failedStage, abortMessage, None) + } else { // update failedStages and make sure a ResubmitFailedStages event is enqueued +// TODO: Cancel running tasks in the failed stage -- cf. SPARK-17064 +val noResubmitEnqueued = !failedStages.contains(failedStage) failedStages += failedStage failedStages += mapStage +if (noResubmitEnqueued) { + // We expect one executor failure to trigger many FetchFailures in rapid succession, + // but all of those task failures can typically be handled by a single resubmission of + // the failed stage. We avoid flooding the scheduler's event queue with resubmit + // messages by checking whether a resubmit is already in the event queue for the + // failed stage. If there is already a resubmit enqueued for a different failed + // stage, that event would also be sufficient to handle the current failed stage, but + // producing a resubmit for each failed stage makes debugging and logging a little + // simpler while not producing an overwhelming number of scheduler events. + logInfo( +s"Resubmitting $mapStage (${mapStage.name}) and " + +s"$failedStage (${failedStage.name}) due to fetch failure" + ) + messageScheduler.schedule( --- End diff -- Ok, I can get started on that. I believe that leaves this PR ready to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15335: [SPARK-17769][Core][Scheduler]Some FetchFailure r...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/15335#discussion_r82944965 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1255,27 +1255,46 @@ class DAGScheduler( s"longer running") } - if (disallowStageRetryForTest) { -abortStage(failedStage, "Fetch failure will not retry stage due to testing config", - None) - } else if (failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) { -abortStage(failedStage, s"$failedStage (${failedStage.name}) " + - s"has failed the maximum allowable number of " + - s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " + - s"Most recent failure reason: ${failureMessage}", None) - } else { -if (failedStages.isEmpty) { - // Don't schedule an event to resubmit failed stages if failed isn't empty, because - // in that case the event will already have been scheduled. - // TODO: Cancel running tasks in the stage - logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + -s"$failedStage (${failedStage.name}) due to fetch failure") - messageScheduler.schedule(new Runnable { -override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) - }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) + val shouldAbortStage = +failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) || +disallowStageRetryForTest + + if (shouldAbortStage) { +val abortMessage = if (disallowStageRetryForTest) { + "Fetch failure will not retry stage due to testing config" +} else { + s"""$failedStage (${failedStage.name}) + |has failed the maximum allowable number of + |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. + |Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ") } +abortStage(failedStage, abortMessage, None) + } else { // update failedStages and make sure a ResubmitFailedStages event is enqueued +// TODO: Cancel running tasks in the failed stage -- cf. SPARK-17064 +val noResubmitEnqueued = !failedStages.contains(failedStage) failedStages += failedStage failedStages += mapStage +if (noResubmitEnqueued) { + // We expect one executor failure to trigger many FetchFailures in rapid succession, + // but all of those task failures can typically be handled by a single resubmission of + // the failed stage. We avoid flooding the scheduler's event queue with resubmit + // messages by checking whether a resubmit is already in the event queue for the + // failed stage. If there is already a resubmit enqueued for a different failed + // stage, that event would also be sufficient to handle the current failed stage, but + // producing a resubmit for each failed stage makes debugging and logging a little + // simpler while not producing an overwhelming number of scheduler events. + logInfo( +s"Resubmitting $mapStage (${mapStage.name}) and " + +s"$failedStage (${failedStage.name}) due to fetch failure" + ) + messageScheduler.schedule( --- End diff -- Ah, sorry for ascribing the prior comment to your preferences. That comment actually did make sense a long time ago when the resubmitting of stages really was done periodically by an Akka scheduled event that fired every something seconds. I'm pretty sure the RESUBMIT_TIMEOUT stuff is also legacy code that doesn't make sense and isn't necessary any more. So, do you want to do the follow-up PR to get rid of it, or shall I? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15441: [SPARK-4411] [Web UI] Add "kill" link for jobs in...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/15441#discussion_r82925875 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala --- @@ -35,4 +37,18 @@ private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") { attachPage(new AllJobsPage(this)) attachPage(new JobPage(this)) + + def handleKillRequest(request: HttpServletRequest): Unit = { +if (killEnabled && (parent.securityManager.checkModifyPermissions(request.getRemoteUser))) { + val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean + val jobId = Option(request.getParameter("id")).getOrElse("-1").toInt + if (jobId >= 0 && killFlag && jobProgresslistener.activeJobs.contains(jobId)) { +sc.get.cancelJob(jobId) + } --- End diff -- And if the Job isn't actually going to be canceled, then there is no need to delay the page refresh (which I'm not entirely happy with, but I'm not going to try to resolve that issue right now.) So... ```scala sc.foreach { sparkContext => sparkContext.cancelJob(id) Thread.sleep(100) } ``` And we better make that `jobId` is an `Option[Int]` instead of an `Option[String]`, so... ```scala val jobId = Option(request.getParameter("id")).map(_.toInt) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15441: [SPARK-4411] [Web UI] Add "kill" link for jobs in...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/15441#discussion_r82924909 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala --- @@ -35,4 +37,18 @@ private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") { attachPage(new AllJobsPage(this)) attachPage(new JobPage(this)) + + def handleKillRequest(request: HttpServletRequest): Unit = { +if (killEnabled && (parent.securityManager.checkModifyPermissions(request.getRemoteUser))) { + val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean + val jobId = Option(request.getParameter("id")).getOrElse("-1").toInt + if (jobId >= 0 && killFlag && jobProgresslistener.activeJobs.contains(jobId)) { +sc.get.cancelJob(jobId) + } --- End diff -- Similarly, there is no need for `sc.get`. In fact, that's a bug if `parent.sc` really should be an `Option` and thus could be `None`. ```scala sc.foreach(_.cancelJob(id)) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15441: [SPARK-4411] [Web UI] Add "kill" link for jobs in...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/15441#discussion_r82923313 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala --- @@ -35,4 +37,18 @@ private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") { attachPage(new AllJobsPage(this)) attachPage(new JobPage(this)) + + def handleKillRequest(request: HttpServletRequest): Unit = { +if (killEnabled && (parent.securityManager.checkModifyPermissions(request.getRemoteUser))) { + val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean + val jobId = Option(request.getParameter("id")).getOrElse("-1").toInt + if (jobId >= 0 && killFlag && jobProgresslistener.activeJobs.contains(jobId)) { +sc.get.cancelJob(jobId) + } --- End diff -- Creating an `Option` only to immediately `get` the value out of it is poor style, and unnecessary. ```scala val jobId = Option(request.getParameter("id")) jobId.foreach { id => if (killFlag && jobProgresslistener.activeJobs.contains(id)) { sc.get.cancelJob(id) } } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/15249#discussion_r82891721 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -98,6 +84,14 @@ private[spark] class TaskSetManager( var totalResultSize = 0L var calculatedTasks = 0 + val taskSetBlacklistOpt: Option[TaskSetBlacklist] = { --- End diff -- I too go back and forth on naming `Option`s, and haven't yet hit upon a convention that is entirely satisfactory. Another `Option` option (uh yeah) is `maybeFoo`, which we've also used within the Spark codebase. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15335: [SPARK-17769][Core][Scheduler]Some FetchFailure r...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/15335#discussion_r82869819 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1255,27 +1255,46 @@ class DAGScheduler( s"longer running") } - if (disallowStageRetryForTest) { -abortStage(failedStage, "Fetch failure will not retry stage due to testing config", - None) - } else if (failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) { -abortStage(failedStage, s"$failedStage (${failedStage.name}) " + - s"has failed the maximum allowable number of " + - s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " + - s"Most recent failure reason: ${failureMessage}", None) - } else { -if (failedStages.isEmpty) { - // Don't schedule an event to resubmit failed stages if failed isn't empty, because - // in that case the event will already have been scheduled. - // TODO: Cancel running tasks in the stage - logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + -s"$failedStage (${failedStage.name}) due to fetch failure") - messageScheduler.schedule(new Runnable { -override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) - }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) + val shouldAbortStage = +failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) || +disallowStageRetryForTest + + if (shouldAbortStage) { +val abortMessage = if (disallowStageRetryForTest) { + "Fetch failure will not retry stage due to testing config" +} else { + s"""$failedStage (${failedStage.name}) + |has failed the maximum allowable number of + |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. + |Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ") } +abortStage(failedStage, abortMessage, None) + } else { // update failedStages and make sure a ResubmitFailedStages event is enqueued +// TODO: Cancel running tasks in the failed stage -- cf. SPARK-17064 +val noResubmitEnqueued = !failedStages.contains(failedStage) failedStages += failedStage failedStages += mapStage +if (noResubmitEnqueued) { + // We expect one executor failure to trigger many FetchFailures in rapid succession, + // but all of those task failures can typically be handled by a single resubmission of + // the failed stage. We avoid flooding the scheduler's event queue with resubmit + // messages by checking whether a resubmit is already in the event queue for the + // failed stage. If there is already a resubmit enqueued for a different failed + // stage, that event would also be sufficient to handle the current failed stage, but + // producing a resubmit for each failed stage makes debugging and logging a little + // simpler while not producing an overwhelming number of scheduler events. + logInfo( +s"Resubmitting $mapStage (${mapStage.name}) and " + +s"$failedStage (${failedStage.name}) due to fetch failure" + ) + messageScheduler.schedule( --- End diff -- 1. I don't like "Periodically" in your suggested comment, since this is a one-shot action after a delay of RESUBMIT_TIMEOUT milliseconds. 2. I agree that this delay-before-resubmit logic is suspect. If we are both thinking correctly that a 200 ms delay on top of the time to re-run the `mapStage` is all but inconsequential, then removing it in this PR would be fine. If there are unanticipated consequences, though, I'd prefer to have that change in a separate PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15335: [SPARK-17769][Core][Scheduler]Some FetchFailure r...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/15335#discussion_r82864060 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1255,27 +1255,46 @@ class DAGScheduler( s"longer running") } - if (disallowStageRetryForTest) { -abortStage(failedStage, "Fetch failure will not retry stage due to testing config", - None) - } else if (failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) { -abortStage(failedStage, s"$failedStage (${failedStage.name}) " + - s"has failed the maximum allowable number of " + - s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " + - s"Most recent failure reason: ${failureMessage}", None) - } else { -if (failedStages.isEmpty) { - // Don't schedule an event to resubmit failed stages if failed isn't empty, because - // in that case the event will already have been scheduled. - // TODO: Cancel running tasks in the stage - logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + -s"$failedStage (${failedStage.name}) due to fetch failure") - messageScheduler.schedule(new Runnable { -override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) - }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) + val shouldAbortStage = +failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) || +disallowStageRetryForTest + + if (shouldAbortStage) { +val abortMessage = if (disallowStageRetryForTest) { + "Fetch failure will not retry stage due to testing config" +} else { + s"""$failedStage (${failedStage.name}) + |has failed the maximum allowable number of + |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. + |Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ") } +abortStage(failedStage, abortMessage, None) + } else { // update failedStages and make sure a ResubmitFailedStages event is enqueued +// TODO: Cancel running tasks in the failed stage -- cf. SPARK-17064 +val noResubmitEnqueued = !failedStages.contains(failedStage) --- End diff -- Right here is the only place we put anything into `failedStages`, so `failedStage` and `mapStage` always go in as pairs. The only places where we remove things from `failedStages` are `resubmitFailedStages` and `DAGScheduler#cleanupStateForJobAndIndependentStages#removeStage`. We clear `failedStages` in `resubmitFailedStages`, so the only place where `failedStage` and `mapStage` could get unpaired in `failedStages` is in `cleanupStateForJobAndIndependentStages#removeStage`. That would happen if the number of Jobs that use `failedStage` and `mapStage` is unequal. If I'm thinking correctly, that could only happen if the `mapStage` is used by more Jobs than is the `failedStage`. In that case, cleaning up the last Job that uses `failedStage` would remove `failedStage` from `failedStages` while `mapStage` would remain. To fall into your proposed `|| !failedStages.contains(mapStage)` branch, another `failedStage` needing `mapStage`, this time coming from one of the remaining Jobs using `mapStage`, would need to fail. If that is the case, then we still want to log the failure of the new `failedStage`, so I don't think we want `|| !failedStages.contains(mapStage)` -- without it, we'll get a duplicate of `mapStage` added to `failedStages`, but that's no big deal. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12775: [SPARK-14958][Core] Failed task not handled when ...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/12775#discussion_r82079358 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala --- @@ -135,8 +135,9 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul logError( "Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader) case ex: Exception => // No-op + } finally { --- End diff -- makes sense --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15350: [SPARK-17778][Tests]Mock SparkContext to reduce memory u...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/15350 @zsxwing `build/mvn -U -Pyarn -Phadoop-2.7 -Pkinesis-asl -Phive -Phive-thriftserver -Dpyspark -Dsparkr test` completely succeeded on one of my machines where it was previously failing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15326: [SPARK-17759] [CORE] FairSchedulableBuilder should avoid...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/15326 @erenavsarogullari Your concern was entirely legitimate, and is also why I called in @kayousterhout to double-check my claim that other duplicate Schedulables would also be a problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15326: [SPARK-17759] [CORE] FairSchedulableBuilder should avoid...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/15326 ...and it would be 2.0.2 at this point. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15326: [SPARK-17759] [CORE] FairSchedulableBuilder should avoid...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/15326 @kayousterhout @rxin Ok, but if we're going to change the behavior, then we need to be sure that change at least makes it into the release notes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15335: Some FetchFailure refactoring
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/15335 @squito the promised follow-up PR --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15335: Some FetchFailure refactoring
GitHub user markhamstra opened a pull request: https://github.com/apache/spark/pull/15335 Some FetchFailure refactoring ## What changes were proposed in this pull request? Readability rewrites. Changed order of `failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)` and `disallowStageRetryForTest` evaluation. Stage resubmission guard condition changed from `failedStages.isEmpty` to `!failedStages.contains(failedStage)` Log all resubmission of stages ## How was this patch tested? existing tests (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) You can merge this pull request into a Git repository by running: $ git pull https://github.com/markhamstra/spark SPARK-17769 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15335.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15335 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15301: [SPARK-17717][SQL] Add exist/find methods to Cata...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/15301#discussion_r81270990 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala --- @@ -102,6 +102,83 @@ abstract class Catalog { def listColumns(dbName: String, tableName: String): Dataset[Column] /** + * Find the database with the specified name. This returns [[None]] when no [[Database]] can be + * found. + * + * @since 2.1.0 + */ + def findDatabase(dbName: String): Option[Database] --- End diff -- Sorry, I'm a bit late to this party. In terms of Scala vs. Java API, "find*" doesn't really strike me as the best naming, especially not after these "find*" methods no longer produce an Option. In Scala, I expect a `find` to take a predicate and to produce an Option that may contain the first element to satisfy the predicate. Something that throws an exception when the one thing asked for doesn't exist feels more like a `get` than a `find` to me. Can we reconsider before we get to the point where we find that this API is set in stone? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/15249#discussion_r81036771 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -592,34 +587,54 @@ private[spark] class TaskSetManager( * failures (this is because the method picks on unscheduled task, and then iterates through each * executor until it finds one that the task hasn't failed on already). */ - private[scheduler] def abortIfCompletelyBlacklisted(executors: Iterable[String]): Unit = { - -val pendingTask: Option[Int] = { - // usually this will just take the last pending task, but because of the lazy removal - // from each list, we may need to go deeper in the list. We poll from the end because - // failed tasks are put back at the end of allPendingTasks, so we're more likely to find - // an unschedulable task this way. - val indexOffset = allPendingTasks.lastIndexWhere { indexInTaskSet => -copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet) - } - if (indexOffset == -1) { -None - } else { -Some(allPendingTasks(indexOffset)) - } -} + private[scheduler] def abortIfCompletelyBlacklisted( + hostToExecutors: HashMap[String, HashSet[String]]): Unit = { +taskSetBlacklistOpt.foreach { taskSetBlacklist => +// If no executors have registered yet, don't abort the stage, just wait. We probably +// got here because a task set was added before the executors registered. + if (hostToExecutors.nonEmpty) { +// take any task that needs to be scheduled, and see if we can find some executor it *could* +// run on +val pendingTask: Option[Int] = { + // usually this will just take the last pending task, but because of the lazy removal + // from each list, we may need to go deeper in the list. We poll from the end because + // failed tasks are put back at the end of allPendingTasks, so we're more likely to find + // an unschedulable task this way. + val indexOffset = allPendingTasks.lastIndexWhere { indexInTaskSet => +copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet) + } + if (indexOffset == -1) { +None + } else { +Some(allPendingTasks(indexOffset)) + } +} -// If no executors have registered yet, don't abort the stage, just wait. We probably -// got here because a task set was added before the executors registered. -if (executors.nonEmpty) { - // take any task that needs to be scheduled, and see if we can find some executor it *could* - // run on - pendingTask.foreach { taskId => -if (executors.forall(executorIsBlacklisted(_, taskId))) { - val execs = executors.toIndexedSeq.sorted.mkString("(", ",", ")") - val partition = tasks(taskId).partitionId - abort(s"Aborting ${taskSet} because task $taskId (partition $partition)" + -s" has already failed on executors $execs, and no other executors are available.") +pendingTask.foreach { indexInTaskSet => + // try to find some executor this task can run on. Its possible that some *other* + // task isn't schedulable anywhere, but we will discover that in some later call, + // when that unschedulable task is the last task remaining. + val blacklistedEverywhere = hostToExecutors.forall { case (host, execs) => +// Check if the task can run on the node +val nodeBlacklisted = + taskSetBlacklist.isNodeBlacklistedForTaskSet(host) || + taskSetBlacklist.isNodeBlacklistedForTask(host, indexInTaskSet) +if (nodeBlacklisted) { + true +} else { + // Check if the task can run on any of the executors + execs.forall { exec => + taskSetBlacklist.isExecutorBlacklistedForTaskSet(exec) || + taskSetBlacklist.isExecutorBlacklistedForTask(exec, indexInTaskSet) + } +} + } + if (blacklistedEverywhere) { +val partition = tasks(indexInTaskSet).partitionId +abort(s"Aborting ${taskSet} because task $indexInTaskSet (partition $partition) " + --- End diff -- nit: `$taskSet` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does
[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/15249#discussion_r81035029 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -478,8 +473,8 @@ private[spark] class TaskSetManager( // a good proxy to task serialization time. // val timeTaken = clock.getTime() - startTime val taskName = s"task ${info.id} in stage ${taskSet.id}" - logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," + -s" $taskLocality, ${serializedTask.limit} bytes)") + logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " + +s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)") sched.dagScheduler.taskStarted(task, info) return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId, --- End diff -- Not part of your code, but I'm not seeing a good reason for the non-local return here. The last lines of `resourceOffer` can just as easily be... ```scala sched.dagScheduler.taskStarted(task, info) Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId, taskName, index, serializedTask)) case _ => None } } else None } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/15249#discussion_r81024744 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -421,7 +412,11 @@ private[spark] class TaskSetManager( maxLocality: TaskLocality.TaskLocality) : Option[TaskDescription] = { -if (!isZombie) { +val offerBlacklisted = taskSetBlacklistOpt.map { blacklist => + blacklist.isNodeBlacklistedForTaskSet(host) || +blacklist.isExecutorBlacklistedForTaskSet(execId) +}.getOrElse(false) --- End diff -- Another place to use `.exists { p }` instead of `.map { p }.getOrElse(false)`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/15249#discussion_r81022074 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -266,19 +263,11 @@ private[spark] class TaskSetManager( taskAttempts(taskIndex).exists(_.host == host) } - /** - * Is this re-execution of a failed task on an executor it already failed in before - * EXECUTOR_TASK_BLACKLIST_TIMEOUT has elapsed ? - */ - private[scheduler] def executorIsBlacklisted(execId: String, taskId: Int): Boolean = { -if (failedExecutors.contains(taskId)) { - val failed = failedExecutors.get(taskId).get - - return failed.contains(execId) && -clock.getTimeMillis() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT -} - -false + private def isTaskBlacklistedOnExecOrNode(index: Int, execId: String, host: String): Boolean = { +taskSetBlacklistOpt.map { blacklist => + blacklist.isNodeBlacklistedForTask(host, index) || +blacklist.isExecutorBlacklistedForTask(execId, index) +}.getOrElse(false) --- End diff -- ```scala taskSetBlacklistOpt.exists { blacklist => blacklist.isNodeBlacklistedForTask(host, index) || blacklist.isExecutorBlacklistedForTask(execId, index) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/15249#discussion_r81020674 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +private[scheduler] object BlacklistTracker extends Logging { + + private val DEFAULT_TIMEOUT = "1h" + + /** + * Returns true if the blacklist is enabled, based on checking the configuration in the following + * order: + * 1. Is it specifically enabled or disabled? + * 2. Is it enabled via the legacy timeout conf? + * 3. Use the default for the spark-master: + * - off for local mode + * - on for distributed modes (including local-cluster) + */ + def isBlacklistEnabled(conf: SparkConf): Boolean = { +conf.get(config.BLACKLIST_ENABLED) match { + case Some(isEnabled) => +isEnabled + case None => +// if they've got a non-zero setting for the legacy conf, always enable the blacklist, +// otherwise, use the default based on the cluster-mode (off for local-mode, on otherwise). +val legacyKey = config.BLACKLIST_LEGACY_TIMEOUT_CONF.key +conf.get(config.BLACKLIST_LEGACY_TIMEOUT_CONF) match { + case Some(legacyTimeout) => +if (legacyTimeout == 0) { + logWarning(s"Turning off blacklisting due to legacy configuaration:" + --- End diff -- ...and the line break isn't necessary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/15249#discussion_r81020284 --- Diff: core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +private[scheduler] object BlacklistTracker extends Logging { + + private val DEFAULT_TIMEOUT = "1h" + + /** + * Returns true if the blacklist is enabled, based on checking the configuration in the following + * order: + * 1. Is it specifically enabled or disabled? + * 2. Is it enabled via the legacy timeout conf? + * 3. Use the default for the spark-master: + * - off for local mode + * - on for distributed modes (including local-cluster) + */ + def isBlacklistEnabled(conf: SparkConf): Boolean = { +conf.get(config.BLACKLIST_ENABLED) match { + case Some(isEnabled) => +isEnabled + case None => +// if they've got a non-zero setting for the legacy conf, always enable the blacklist, +// otherwise, use the default based on the cluster-mode (off for local-mode, on otherwise). +val legacyKey = config.BLACKLIST_LEGACY_TIMEOUT_CONF.key +conf.get(config.BLACKLIST_LEGACY_TIMEOUT_CONF) match { + case Some(legacyTimeout) => +if (legacyTimeout == 0) { + logWarning(s"Turning off blacklisting due to legacy configuaration:" + --- End diff -- typo: configuration --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/15249#discussion_r81018983 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -50,22 +48,12 @@ import org.apache.spark.util.{AccumulatorV2, Clock, SystemClock, Utils} *task set will be aborted */ private[spark] class TaskSetManager( -sched: TaskSchedulerImpl, +val sched: TaskSchedulerImpl, val taskSet: TaskSet, val maxTaskFailures: Int, -clock: Clock = new SystemClock()) - extends Schedulable with Logging { +val clock: Clock = new SystemClock()) extends Schedulable with Logging { --- End diff -- Why are `sched` and `clock` being made fields? It doesn't seem necessary to me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15213: [SPARK-17644] [CORE] Do not add failedStages when abortS...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/15213 > Honestly, I think just getting the fix in is important enough that I'm fine w/ putting in the minimally invasive thing now. That's fine, @squito -- go ahead and merge when you're happy with your requested changes, and then I'll follow up in short order with a separate refactoring PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15221: [SPARK-17648][CORE] TaskScheduler really needs offers to...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/15221 I'm glad Jenkins finally got it together. LGTM. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15213: [SPARK-17644] [CORE] Do not add failedStages when abortS...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/15213 @scwf I understand that you were trying to make the least invasive fix possible to deal with the problem. That's usually a good thing to do, but even when that kind of fix is getting to the root of the problem it can still result in layers of patches that are hard to make sense of. That's not really the fault of any one patch; rather, the blame lies more with those of us who often didn't produce clear, maintainable code in the first place. When it's possible to see re-organizing principles that will make the code clearer, reduce duplication, make future maintenance less error prone, etc., then it's usually a good idea to do a little larger refactoring instead of just a minimally invasive fix. I think this is a small example of where that kind of refactoring makes sense, so that's why I made my code suggestion. If you can see ways to make things even clearer, then feel free to suggest them. I'm sure that Kay, Imran and others who also have been trying to make these kinds of clarifying changes in the DAGScheduler will also chime in if they have further suggestions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15213: [SPARK-17644] [CORE] Do not add failedStages when abortS...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/15213 The fix is logically correct; however, the prior code is needlessly complex and not as easy to understand as it should be, and the proposed fix doesn't improve on that. I'd like to take the opportunity to make the code easier to understand and maintain. Something like this: ```scala // It is likely that we receive multiple FetchFailed for a single stage (because we have // multiple tasks running concurrently on different executors). In that case, it is // possible the fetch failure has already been handled by the scheduler. if (runningStages.contains(failedStage)) { logInfo(s"Marking $failedStage (${failedStage.name}) as failed " + s"due to a fetch failure from $mapStage (${mapStage.name})") markStageAsFinished(failedStage, Some(failureMessage)) } else { logDebug(s"Received fetch failure from $task, but its from $failedStage which is no " + s"longer running") } val shouldAbortStage = failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) || disallowStageRetryForTest if (shouldAbortStage) { val abortMessage = if (disallowStageRetryForTest) { "Fetch failure will not retry stage due to testing config" } else { s"$failedStage (${failedStage.name}) " + s"has failed the maximum allowable number of " + s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " + s"Most recent failure reason: $failureMessage" } abortStage(failedStage, abortMessage, None) } else { // update failedStages and make sure a ResubmitFailedStages event is enqueued // TODO: Cancel running tasks in the failed stage -- cf. SPARK-17064 val noResubmitEnqueued = failedStages.isEmpty failedStages += failedStage failedStages += mapStage if (noResubmitEnqueued) { // If failedStages is not empty, then a previous FetchFailed already went through // this block of code and queued up a ResubmitFailedStages event that has not yet // run. We, therefore, only need to queue up a new ResubmitFailedStages event when // failedStages.isEmpty. logInfo( s"Resubmitting $mapStage (${mapStage.name}) and " + s"$failedStage (${failedStage.name}) due to fetch failure" ) messageScheduler.schedule( new Runnable { override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS ) } } // Mark the map whose fetch failed as broken in the map stage ``` This should be equivalent to what you have, @scwf, with the exception that `fetchFailedAttemptIds.add(stageAttemptId)` is done even when `disallowStageRetryForTest` is `true` -- which seems like a better idea to me. Also available here: https://github.com/markhamstra/spark/commit/368f82d9789ec04565af835e7cb80d1cdb0ccf0c @squito --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15213: [SPARK-17644] [CORE] Fix the race condition when DAGSche...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/15213 Right, but `abortStage` occurs elsewhere. "When abort stage" seems to imply that this fix is necessary for all usages of `abortStage` when the actual problem is not in `abortStage` but rather in improper additions to `failedStages`. I've got to go now, but I'll come back to this soon(ish). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15213: [SPARK-17644] [CORE] Fix the race condition when DAGSche...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/15213 @scwf That description would actually be at least as bad since there are multiple routes to `abortStage` and this issue of adding to `failedStages` only applies to these two. I'll take another look soon and see if I can come up with a clean refactoring and a better description for the commit message. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15213: [SPARK-17644] [CORE] Fix the race condition when DAGSche...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/15213 Ok, that makes better sense. The `disallowStageRetryForTest` case doesn't worry me too much since it is only used in tests. If we can fix this case, great; else if it remains possible to create failing tests that can never happen outside of the tests, then that is not all that important (but should at least be noted in comments in the test suite.) Yes, not adding to `failedStages` after going down either of those two paths to `abortStage` is a correct fix even if the description of the problem wasn't really accurate. I'll take another look over the weekend to see if the logic can be expressed a bit more clearly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15213: [SPARK-17644] [CORE] Fix the race condition when DAGSche...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/15213 This doesn't make sense to me. The DAGSchedulerEventProcessLoop runs on a single thread and processes a single event from its queue at a time. When the first CompletionEvent is run as a result of a fetch failure, failedStages is added to and a ResubmitFailedStages event is queued. After handleTaskCompletion is done, the next event from the queue will be processed. As events are sequentially dequeued and handled, either the ResubmitFailedStages event will be handled before the CompletionEvent for the second fetch failure, or the CompletionEvent will be handled before the ResubmitFailedStages event. If the ResubmitFailedStages is handled first, then failedStages will be cleared in resubmitFailedStages, and there will be nothing preventing the subsequent CompletionEvent from queueing another ResubmitFailedStages event to handle additional fetch failures. In the alternative that the second CompletionEvent is queued and handled before the ResubmitFailedStages event, then the additional stages are added to the non-empty failedStages, but there is no need to schedule another ResubmitFailedStages event because the one from the first CompletionEvent is still on the queue and the handling of that queued event will also handle the newly added failedStages from the second CompletionEvent. In either ordering, all the failedStages are handled and there is no race condition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15084: [SPARK-17529][core] Implement BitSet.clearUntil and use ...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/15084 @rxin @jegonzal looking for a third-party review --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15084: [SPARK-17529][core] Implement BitSet.clearUntil and use ...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/15084 test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14952: [SPARK-17110] Fix StreamCorruptionException in Bl...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/14952#discussion_r77697794 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -520,10 +520,11 @@ private[spark] class BlockManager( * * This does not acquire a lock on this block in this JVM. */ - private def getRemoteValues(blockId: BlockId): Option[BlockResult] = { + private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = { +val ct = implicitly[ClassTag[T]] getRemoteBytes(blockId).map { data => val values = -serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true)) +serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct) --- End diff -- How do you forget to pass a correct ClassTag when the compiler is enforcing its presence via the context bound? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14958: [SPARK-17378] [BUILD] Upgrade snappy-java to 1.1.2.6
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/14958 @srowen The bug does manifest in Spark. I've been running this upgrade in production for a few weeks with no issues. This should be merged IMO. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14952: [SPARK-17110] Fix StreamCorruptionException in Bl...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/14952#discussion_r77459444 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -520,10 +520,11 @@ private[spark] class BlockManager( * * This does not acquire a lock on this block in this JVM. */ - private def getRemoteValues(blockId: BlockId): Option[BlockResult] = { + private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = { +val ct = implicitly[ClassTag[T]] getRemoteBytes(blockId).map { data => val values = -serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true)) +serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct) --- End diff -- I'm not saying this should definitely be done one way or the other, but I'm curious why you have a preference for the extra code and more verbose API that come with making the classTag an explicit parameter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14737: [SPARK-17171][WEB UI] DAG will list all partition...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/14737#discussion_r75602737 --- Diff: core/src/main/scala/org/apache/spark/ui/SparkUI.scala --- @@ -141,6 +141,7 @@ private[spark] object SparkUI { val DEFAULT_POOL_NAME = "default" val DEFAULT_RETAINED_STAGES = 1000 val DEFAULT_RETAINED_JOBS = 1000 + val DEFAULT_RETAINED_NODES = 2 --- End diff -- `NODES`, both here and in `spark.ui.retainedNodes` if far too ambiguous and non-specific for this configuration value -- "node" is already overloaded too many times in the existing Spark code and documentation; we don't need or want to add another overload. Additionally, the default behavior should be the same as current behavior, since the change in behavior would be unexpected and it is far from clear to me that the overwhelming majority of users would prefer the proposed new behavior. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14737: [SPARK-17171][WEB UI] DAG will list all partition...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/14737#discussion_r75602575 --- Diff: core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala --- @@ -119,18 +119,47 @@ private[ui] object RDDOperationGraph extends Logging { { if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" } val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName) +var rootNodeCount = 0 +val addRDDIds = new mutable.HashSet[Int]() +val dropRDDIds = new mutable.HashSet[Int]() + +def isAllowed(ids: mutable.HashSet[Int], rdd: RDDInfo): Boolean = { + val parentIds = rdd.parentIds + if (parentIds.size == 0) { +rootNodeCount < retainedNodes + } else { +if (ids.size > 0) { +parentIds.exists(id => ids.contains(id) || !dropRDDIds.contains(id)) +} else { +true +} + } +} + // Find nodes, edges, and operation scopes that belong to this stage -stage.rddInfos.foreach { rdd => - edges ++= rdd.parentIds.map { parentId => RDDOperationEdge(parentId, rdd.id) } +stage.rddInfos.sortBy(_.id).foreach { rdd => + val keepNode: Boolean = isAllowed(addRDDIds, rdd) + if (keepNode) { +addRDDIds.add(rdd.id) +edges ++= rdd.parentIds.filter(id => !dropRDDIds.contains(id)) + .map { parentId => RDDOperationEdge(parentId, rdd.id) } --- End diff -- This isn't just a question of whether `{ }` is necessary, but also whether using them to delimit closures with `map`, `filter`, etc. has become the defacto and accepted style in Spark code. It has -- for just one of many, many examples, see the code that this diff is replacing. Delimiting closures in this way even when not strictly necessary improves consistency and readability by not requiring more parsing of parentheses levels. I wouldn't have recommended that this diff be changed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14557: [SPARK-16709][CORE] Kill the running task if stage faile...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/14557 There are multiple issues with this PR. Some are at a more stylistic level, but some include deeper issues -- e.g. see SPARK-17064. Most fundamentally, this PR is the wrong solution at least in the sense that it does not implement a minimal fix without other side effects. The problem is that TaskCommitDenied is not being handled properly when a duplicate Task tries to commit a result that has already been successfully committed by another attempt of this Task. The proper fix needs to be at that point of committing duplicate results, not by making the larger, unnecessary change in how we handle cancellation/interruption of other Tasks in a TaskSet when one of them produces a FetchFailed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12436: [SPARK-14649][CORE] DagScheduler should not run duplicat...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/12436 See https://issues.apache.org/jira/browse/SPARK-17064 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14534: [SPARK-16941]Add SynchronizedMap trait with Map i...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/14534#discussion_r73908742 --- Diff: sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala --- @@ -39,8 +39,10 @@ private[thriftserver] class SparkSQLOperationManager() val handleToOperation = ReflectionUtils .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation") - val sessionToActivePool = Map[SessionHandle, String]() - val sessionToContexts = Map[SessionHandle, SQLContext]() + val sessionToActivePool = new mutable.HashMap[SessionHandle, String]() --- End diff -- Correct; SynchronizedMap has been deprecated since Scala 2.11.0 with this comment in the API docs: "Synchronization via traits is deprecated as it is inherently unreliable. Consider java.util.concurrent.ConcurrentHashMap as an alternative." The title of this PR must be updated to match what is actually being done after the switch to use ConcurrentHashMap since we don't want the misleading "Add SynchronizedMap trait" to persist in the commit history. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14533: [SPARK-16606] [CORE] isleading warning for SparkContext....
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/14533 PR title typo? Intended "misleading"? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14332: [SPARK-16694] [CORE] Use for/foreach rather than map for...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/14332 The Scala API docs clearly specify that the most idiomatic usage of Option is to treat it as a collection or monad and use map, flatMap, filter, or foreach. Those docs also clearly specify that using Option values via pattern matching is less idiomatic. There is nothing fundamentally wrong with `map` applying a function that produces results of type `Unit`, nor is such an operation some kind of special not-a-transformation that must always be treated differently from operations that have been blessed as legitimate transformations. Sure, when you eventually come to the boundaries of purely functional code and it is time to deal with side effects, then the distinctness of `Unit` comes into play. But it is a basic FP principle or best practice to not treat types as specific instead of abstract until you logically must deal with the specifics -- and that is just not the general case with `map` functions that produce `Unit`, nor with folding an Option. Usage of `Unit` consistent with usage of other types in things like `map` and `fold` can be confusing to the many Spark contributors who aren't completely at home with idiomatic FP, and that is why, e.g., we chose not to allow folding of Option within the Spark code. That's not going to change based on any argument in this PR's comments. But neither should we shy away from any and all usages of `Unit` producing functions within `map` -- especially not when that usage is the preferred, idiomatic usage of Option. The issue that SPARK-16694 should be dealing with is not a fundamental problem with `map` and `Unit`, but rather with problems caused by the not completely sound implementation of `view` in Scala. There are known problems and ugly corner cases associated with `view`, and that's why I think Scala programmers would be well advised to avoid it when they can, or to use it with great caution when they must. Alternatively, you can more safely use something like Josh Sureth's Viewduction work: https://github.com/jsuereth/viewducers --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14332: [SPARK-16694] [CORE] Use for/foreach rather than ...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/14332#discussion_r72014373 --- Diff: examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala --- @@ -54,14 +54,13 @@ object DataFrameExample { } } -parser.parse(args, defaultParams).map { params => - run(params) -}.getOrElse { - sys.exit(1) +parser.parse(args, defaultParams) match { + case Some(params) => run(params) + case _ => sys.exit(1) } --- End diff -- It appears that you are reading what you want to see instead of what is really there. All of the examples in the Scala API doc involve side-effects. The three examples are essentially equivalent in their intent and effects, and using pattern matching with `Option` is clearly called out as the least idiomatic. `fold` isn't in question since it has already been ruled out by other Spark committers as beyond the ken of those unfamiliar with idiomatic functional programming. I see no reason, however, to do anything different in Spark's code than what the Scala API declares to be "most idiomatic way". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14332: [SPARK-16694] [CORE] Use for/foreach rather than ...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/14332#discussion_r7120 --- Diff: examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala --- @@ -54,14 +54,13 @@ object DataFrameExample { } } -parser.parse(args, defaultParams).map { params => - run(params) -}.getOrElse { - sys.exit(1) +parser.parse(args, defaultParams) match { + case Some(params) => run(params) + case _ => sys.exit(1) } --- End diff -- The argument from consistency says to treat `Option` as a collection or monad, not as something special, and to treat `Unit` as just another type, not as something special. Treated consistently, the return of the zero-value in a fold over a None is not more surprising than folding over an empty List producing the zero-value. To functional programmer aware of the semantics of fold, producing zeroes from folding over empty collections is every bit as explicit as an if-else or pattern match. The argument about using `fold` with `Option` in Spark isn't going anywhere at this point, but you should look at the Scala API docs, which include the comment that "[a] less-idiomatic way to use scala.Option values is via pattern matching." Also see other commentary, such as http://blog.originate.com/blog/2014/06/15/idiomatic-scala-your-options-do-not-match/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14332: [SPARK-16694] [CORE] Use for/foreach rather than ...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/14332#discussion_r71993558 --- Diff: examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala --- @@ -54,14 +54,13 @@ object DataFrameExample { } } -parser.parse(args, defaultParams).map { params => - run(params) -}.getOrElse { - sys.exit(1) +parser.parse(args, defaultParams) match { + case Some(params) => run(params) + case _ => sys.exit(1) } --- End diff -- Oh, and the "less obvious" argument is why my opinion on `fold` with `Option` was rejected -- even though it is a perfectly logical and obvious thing to do for a functional programmer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14332: [SPARK-16694] [CORE] Use for/foreach rather than ...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/14332#discussion_r71993490 --- Diff: examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala --- @@ -54,14 +54,13 @@ object DataFrameExample { } } -parser.parse(args, defaultParams).map { params => - run(params) -}.getOrElse { - sys.exit(1) +parser.parse(args, defaultParams) match { + case Some(params) => run(params) + case _ => sys.exit(1) } --- End diff -- There is nothing incorrect about generating an expression of type Unit; what I am talking about is only pattern matching on Options, not the `view` issue; IJ has lots of opinions, not all of them correct. What theoretical problem are you talking about with `fold` over `Option`? The `view` issue does not come into play since the pattern matching with Option construct is not even valid if you are dealing with `anOption.view`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14332: [SPARK-16694] [CORE] Use for/foreach rather than ...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/14332#discussion_r71993163 --- Diff: examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala --- @@ -54,14 +54,13 @@ object DataFrameExample { } } -parser.parse(args, defaultParams).map { params => - run(params) -}.getOrElse { - sys.exit(1) +parser.parse(args, defaultParams) match { + case Some(params) => run(params) + case _ => sys.exit(1) } --- End diff -- I wouldn't change these kinds of constructs. It's almost purely a stylistic issue (for which I don't believe we have an officially declared preference), and 1) pattern matching over Options is poor form, IMO; 2) the pattern matching alternative here doesn't any more unambiguously imply or signal use of side effects. If I had my way, we'd `fold` Options, but that's an argument I lost long ago: ```scala val anOption: Option[T] = ... anOption.fold { handleNone } { t => ... } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14330: [SPARK-16693][SPARKR] Remove methods deprecated in 2.0.0...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/14330 See JIRA comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14039: [SPARK-15896][SQL] Clean up shuffle files just after job...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/14039 I haven't got anything more concrete to offer at this time than the descriptions in the relevant JIRA's, but I do have this running in production with 1.6, and it does work. Essentially, you build a cache in your application whose keys are a canonicalization of query fragments and whose values are RDDs associated with that fragment of the logical plan, and which produce the shuffle files. For as long as you hold the references to those RDDs in your cache, Spark won't remove the shuffle files. For as long as you have sufficient memory available to the OS, those shuffle files will be accessed via the OS buffer cache, which is actually pretty quick and doesn't require any of Java heap management and garbage collection. That was the original motivation behind using shuffle files in this way and before off-heap caching and unified memory management were available. It's less necessary now (at least once I figure out how to do the mapping between logical plan fragments and tables c ached off-heap), but it is still a valid alternative caching mechanism. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14039: [SPARK-15896][SQL] Clean up shuffle files just after job...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/14039 Actually, they can be reused -- not in Spark as distributed, but it is an open question whether reusing shuffle files within Spark SQL is something that we should be doing and want to support. It can be an effective alternative means of caching. https://issues.apache.org/jira/browse/SPARK-13756 Until that issue is definitively decided, we should not pre-empt the possibility with this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13685: [SPARK-15963][CORE] Catch `TaskKilledException` correctl...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/13685 LGTM. Good work! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13677: [SPARK 15926] Improve readability of DAGScheduler stage ...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/13677 LGTM, and runs without flakiness for me when rebased onto master with the https://github.com/apache/spark/pull/13688 HOTFIX. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13688: [HOTFIX] [CORE] fix flaky BasicSchedulerIntegrationTest
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/13688 As a HOTIFX, LGTM; but I agree that there is room for a follow-up. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13677: [SPARK 15926] Improve readability of DAGScheduler stage ...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/13677 > One goal of this change it to make it clearer which functions may create new stages (as opposed to looking up stages that already exist). Something that I have been looking at of late, and I know that @squito has looked at some, too. In short, I'm pretty confident that we doing some silliness around creating new stages instead of reusing already existing stages, then recognizing that all the task for the "new" stages are already completed (at least we're smart enough to reuse the map outputs), so the "new" stages just become "skipped". I'll take a closer look at this tomorrow, and may have a follow-on PR in the not too distant future. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13646: [SPARK-15927] Eliminate redundant DAGScheduler code.
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/13646 LGTM, but I agree with Imran's renaming suggestion, and his new test looks good. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13591: [Minor] Replace all occurrences of None: Option[X] with ...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/13591 Yeah, I wouldn't bother. The way this PR does it is arguably better because it can be done consistently across all collections: ```scala val l0 = List.empty[String] val s0 = Set.empty[Int] val o0 = Option.empty[Byte] ``` ...but it's pretty trivial, and I don't buy the readability argument. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9876][SQL]: Update Parquet to 1.8.1.
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/13280 Oh, wait... sorry, I just realized that @liancheng said he also merged to branch-2.0. +1 on reverting that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9876][SQL]: Update Parquet to 1.8.1.
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/13280 @rxin Huh? The merge was to master, not branch-2.0. Doesn't that put it on the 2.1 track and not into 2.0.0? I think that is all that Yin was saying, that @rdblue was mistaken about this change going into 2.0. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-15176][Core] Add maxShares setting to P...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/12951#issuecomment-30643 Added my comments to the JIRA. In short, I think there is a legitimate use case for this, and there is a significant gap in our current fair-scheduling pool API. Implementing a maxShare property is actually something that has been on my todo list for awhile. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10372] [CORE] basic test framework for ...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/8559#issuecomment-221414334 @squito Yeah, that's fine. I haven't gone through the new tests closely to make sure that they are doing what they say they are doing, but the changes to both non-test code and previous tests look safe. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10372] [CORE] basic test framework for ...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/8559#issuecomment-221001489 @squito I don't feel really strongly about it, either. My only concern was for others adding tests needing a mock ExternalClusterManager in the future and not knowing which one to use, why they are different, whether any new mocking needs to be added to both, etc. If someone does feel strongly about maintaining the separation, then we can put things back the way you had them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10372] [CORE] basic test framework for ...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/8559#discussion_r64170026 --- Diff: core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager --- @@ -1 +1,2 @@ -org.apache.spark.scheduler.DummyExternalClusterManager \ No newline at end of file +org.apache.spark.scheduler.DummyExternalClusterManager +org.apache.spark.scheduler.MockExternalClusterManager --- End diff -- Did you ever look at combining DummyExternalClusterManager and MockExternalClusterManager? They are just two variations on a fake ExternalClusterManager for use in tests. I realize that the focus of the tests for Dummy... and Mock... are different, so the two variations may not be easy or clean to combine, but if we could have just one fake ExternalClusterManager that still had a relatively clean implementation, I think that would be better than maintaining two. OTOH, if combining them gets messy, then just go with what you've already got. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org