[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

2017-02-07 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/16620
  
I'll spend some time today trying to sort out the relative merits of the 
fix options; but in the meantime, there's also no good reason for 
`TaskSchedulerImpl.rootPool` to be a `var` initialized as `null`, nor any good 
reason for `TaskScheduler.rootPool` to be able to produce `null`.  Cleaning 
that up also makes code in this PR slightly simpler: 
https://github.com/markhamstra/spark/commit/e11fe2a9817559492daee03c8c025879dc44d346


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16813: [SPARK-19466][CORE][SCHEDULER] Improve Fair Sched...

2017-02-06 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/16813#discussion_r99686720
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala ---
@@ -69,19 +72,29 @@ private[spark] class FairSchedulableBuilder(val 
rootPool: Pool, conf: SparkConf)
   val DEFAULT_WEIGHT = 1
 
   override def buildPools() {
-var is: Option[InputStream] = None
+var fileData: Option[FileData] = None
 try {
-  is = Option {
-schedulerAllocFile.map { f =>
-  new FileInputStream(f)
-}.getOrElse {
-  
Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
+  fileData = schedulerAllocFile.map { f =>
+Some(FileData(new FileInputStream(f), f))
+  }.getOrElse {
+val is = 
Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
+if(is != null) Some(FileData(is, DEFAULT_SCHEDULER_FILE))
+else {
+  logWarning(s"No Fair Scheduler file found.")
+  None
 }
   }
 
-  is.foreach { i => buildFairSchedulerPool(i) }
+  fileData.foreach { data =>
+logInfo(s"Fair Scheduler file: ${data.fileName} is found 
successfully and will be parsed.")
--- End diff --

s"Creating Fair Scheduler pools from ${data.fileName}"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16813: [SPARK-19466][CORE][SCHEDULER] Improve Fair Sched...

2017-02-06 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/16813#discussion_r99686323
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala ---
@@ -69,19 +72,29 @@ private[spark] class FairSchedulableBuilder(val 
rootPool: Pool, conf: SparkConf)
   val DEFAULT_WEIGHT = 1
 
   override def buildPools() {
-var is: Option[InputStream] = None
+var fileData: Option[FileData] = None
 try {
-  is = Option {
-schedulerAllocFile.map { f =>
-  new FileInputStream(f)
-}.getOrElse {
-  
Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
+  fileData = schedulerAllocFile.map { f =>
+Some(FileData(new FileInputStream(f), f))
+  }.getOrElse {
+val is = 
Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
+if(is != null) Some(FileData(is, DEFAULT_SCHEDULER_FILE))
+else {
+  logWarning(s"No Fair Scheduler file found.")
--- End diff --

"Fair Scheduler configuration file not found."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16813: [SPARK-19466][CORE][SCHEDULER] Improve Fair Scheduler Lo...

2017-02-06 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/16813
  
Looks reasonable, but I'd prefer slightly different log messages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

2017-01-20 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/16620#discussion_r97140817
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1193,7 +1193,15 @@ class DAGScheduler(
 }
 
 if (runningStages.contains(shuffleStage) && 
shuffleStage.pendingPartitions.isEmpty) {
-  markStageAsFinished(shuffleStage)
+  val activeTaskSetManagerExist =
--- End diff --

And since it is being used as `!activeTaskSetManagerExists`, you could 
reverse the sense, avoid needing the `!`, and call it something like 
`noActiveTaskSetManager`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

2017-01-20 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/16620#discussion_r97139832
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1193,7 +1193,15 @@ class DAGScheduler(
 }
 
 if (runningStages.contains(shuffleStage) && 
shuffleStage.pendingPartitions.isEmpty) {
-  markStageAsFinished(shuffleStage)
+  val activeTaskSetManagerExist =
--- End diff --

nit: should be `activeTaskSetManagerExists`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

2017-01-20 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/16620#discussion_r97139668
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1193,7 +1193,15 @@ class DAGScheduler(
 }
 
 if (runningStages.contains(shuffleStage) && 
shuffleStage.pendingPartitions.isEmpty) {
-  markStageAsFinished(shuffleStage)
+  val activeTaskSetManagerExist =
+if (taskScheduler.rootPool != null) {
+  taskScheduler.rootPool.getSortedTaskSetQueue.exists {
+tsm => tsm.stageId == stageId && !tsm.isZombie
+  }
+} else false
--- End diff --

The `if...else` is unnecessary:
```scala
val activeTaskSetManagerExist =
  taskScheduler.rootPool != null &&
  taskScheduler.rootPool.getSortedTaskSetQueue.exists { tsm =>
tsm => tsm.stageId == stageId && !tsm.isZombie
  }
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should handle stage's pending...

2017-01-17 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/16620
  
Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should handle stage's pending...

2017-01-17 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/16620
  
Beyond the lack of new tests, this patch is causing a couple of existing 
DAGSchedulerSuite tests to fail for me locally: "run trivial shuffle with 
out-of-band failure and retry" and "map stage submission with executor failure 
late map task completions"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should handle stage's pending...

2017-01-17 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/16620
  
ok to test



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should handle stage's pending...

2017-01-17 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/16620
  
Thanks for the work thus far, @jinxing64 , but this really needs updated 
test coverage before we can consider merging it.

@squito 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16437: [SPARK-19028] [SQL] Fixed non-thread-safe functions used...

2016-12-31 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/16437
  
Please update the description in the JIRA ticket.  What is there now is 
simply not adequate, meaning that if anyone has to come back and address this 
issue some time on the future, what is there does not describe *why* a change 
was needed sufficiently for that person to avoid needing to look at the details 
in the code (which may well be quite different by that time.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16291: [SPARK-18838][CORE] Use separate executor service for ea...

2016-12-22 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/16291
  
> Each listener should in theory could work independent of each other and 
we should only guarantee ordered processing of the events within a listener.

If we were starting from nothing, then yes, it would be valid and advisable 
to design the Listener infrastructure using only this weaker guarantee.  The 
issue, though, is that we are not starting from nothing, but rather from a 
system that currently offers a much stronger guarantee on the synchronized 
behavior of Listeners.  If it is the case that no Listeners currently rely on 
the stronger guarantee and thus could work completely correctly under the 
weaker guarantee of this PR, then we could make this change without much 
additional concern.  But reaching that level of confidence in current Listeners 
is a difficult prerequisite -- strictly speaking, it's an impossible task.

We could carefully work through all the internal behavior of Spark's 
Listeners to convince ourselves that they can work correctly under the new, 
weaker guarantee.  At a bare minimum, we need to do that much before we can 
consider merging this PR -- but that's probably not enough.  The problem is 
that Listeners aren't just internal to Spark.  Users have also developed their 
own custom Listeners that either implement `SparkListenerInterface` or extend 
`SparkListener` or `SparkFirehoseListener`, and we can't just assume that those 
custom Listeners don't rely upon the current guarantee to either synchronize 
behavior with other custom Listeners or even with Spark internal Listeners.  
Since we can't know that user Listeners don't already rely upon the current, 
stronger guarantee, the question now becomes whether we even have the freedom 
to change that guarantee within the lifetime of Spark 2.x, or whether any such 
change would have to wait for Spark 3.x.

`SparkListener` is still annotated as `@DeveloperAPI`, so if that were the 
only piece in play, then we could change its guarantee fairly freely.  
`SparkListenerInterface` is almost as good, since it includes the admonition in 
a comment to "[n]ote that this is an internal interface which might change in 
different Spark releases."  The stickier issue is with `SparkFirehoseListener`, 
which carries no such annotations or comments, but is just a plain public class 
and API.  So, after convincing ourselves that Spark's internal Listeners would 
be fine with this PR, we'd still have to convince the Spark PMC that changing 
the public `SparkFirehoseListener` (with prominent warnings in the release 
notes, of course) before Spark 3.x would be acceptable.

And all of the above is still really only arguing about whether we *could* 
adopt this PR in essentially its present form.  There are still questions of 
whether we *should* do this or maybe instead we should do something a little 
different or more.  I can see some merit in Marcelo's "opt in" suggestion.  If 
there is utility in having groups of Listeners that can rely upon synchronized 
behavior, then we should probably retain one or more threads running 
synchronized Listeners.  For example, if Listener A relies upon synchronization 
with Listeners B and C while D needs to synchronize with E, but F, G and H are 
all independent, then there are a couple of things we could do.  First, the 
independent Listeners (F, G and H) can each run in its own thread, providing 
the scalable performance that this PR is aiming for.  After that, we could 
either have one synchronized Listener thread for all the other Listeners, or we 
could have one thread for A, B and C and one thread for D and E.  Wheth
 er we support only one synchronized Listener group/thread or multiple, we'd 
still need some mechanism for Listeners to select into a synchronized group or 
to indicate that they can and should be run independently on their own thread.

@rxin   


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #12775: [SPARK-14958][Core] Failed task not handled when there's...

2016-12-19 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/12775
  
Yeah, that's ok, @kayousterhout 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16291: [SPARK-18838][CORE] Use separate executor service for ea...

2016-12-19 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/16291
  
> You are right, that's why the executor service is single threaded which 
guarantees ordered processing of the events per listener.

But this is still substantially different from the ordering guarantees in 
the current implementation, and I'm not sure that this PR wouldn't invalidate 
some assumptions made by current listeners.   Events will end up in the same 
order in each `ListenerEventExecutor` (at least by my understanding, which 
someone else should double-check), but there is no synchronization between 
`ListenerEventExecutor`s of the processing of their `eventQueue`s; so it is 
entirely possible. for example, for one `ListenerEventExecutor` to process a 
task end event for a particular task before another `ListenerEventExecutor` has 
worked sufficiently through its `eventQueue` to have even seen the 
corresponding task start event.  That is quite a bit different than the prior 
ordering guarantee implied by the comment "`onPostEvent` is guaranteed to be 
called in the same thread for all listeners."

Whether this actually constitutes a problem or not, I'm not certain; but 
serializing event processing within each `ListenerEventExecutor` isn't enough 
if `Listener`s on different `ListenerEventExecutor` threads expect to share 
mutable state. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15335: [SPARK-17769][Core][Scheduler]Some FetchFailure refactor...

2016-12-15 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/15335
  
It's worth something, but not a lot.  I think it's worth merging, but if 
someone thinks it's not, I'm not going to fight it.  Read it and merge if you 
want to.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15335: [SPARK-17769][Core][Scheduler]Some FetchFailure refactor...

2016-12-15 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/15335
  
@vanzin It should be committed if you think it adds enough additional 
clarity that it is worth the penalty of making future backporting or other 
debugging maintenance a little more difficult.  It doesn't change functionality 
much, so it's mostly just to increase the comprehensibility of the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE] Introduce "task reaper" to ov...

2016-12-12 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r92005198
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -161,12 +163,7 @@ private[spark] class Executor(
* @param interruptThread whether to interrupt the task thread
*/
   def killAllTasks(interruptThread: Boolean) : Unit = {
-// kill all the running tasks
-for (taskRunner <- runningTasks.values().asScala) {
-  if (taskRunner != null) {
-taskRunner.kill(interruptThread)
-  }
-}
+runningTasks.keys().asScala.foreach(t => killTask(t, interruptThread = 
interruptThread))
--- End diff --

Yes, it is not at all a likely case.  What is the most likely case does 
concern me, though.

The default value for `spark.job.interruptOnCancel` is `false`; and from 
the little bit of discussion on SPARK-17064, it doesn't seem that it will be 
very easy to change that with confidence.  That means that `interruptThread` 
will also be `false` except for the hard-coded `true` in `handleSuccessfulTask` 
(which is itself suspect).  The way that I am understanding the default 
behavior of this PR, then, is that if the reaper functionality is enabled, any 
kill request of a task (other than from within `handleSuccessfulTask`) that 
doesn't manage to complete within `killTimeoutMs` of that kill request will 
result in the Executor's JVM being killed -- i.e. the most likely default 
behavior looks to boil down to `if(killTask) killJvmAfterKillTimeoutMs`.  That 
is potentially expensive in terms of lost state from that Executor.  If the 
external shuffle service is being used, then we shouldn't need to lose the 
state of the shuffle files; and if one of the external block storage options is 
 being used, then we shouldn't need to lose state on cached RDDs/tables, 
broadcast variables and/or accumulators.  But none of that externalization and 
persistence of state across Executor restarts is currently the default -- and 
that concerns me some.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE] Introduce "task reaper" to ov...

2016-12-09 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r91764815
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -84,6 +84,16 @@ private[spark] class Executor(
   // Start worker thread pool
   private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor 
task launch worker")
   private val executorSource = new ExecutorSource(threadPool, executorId)
+  // Pool used for threads that supervise task killing / cancellation
+  private val taskReaperPool = ThreadUtils.newDaemonCachedThreadPool("Task 
reaper")
+  // For tasks which are in the process of being killed, this map the most 
recently created
--- End diff --

That sentence no verb.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE][WIP] Introduce "task reaper" ...

2016-12-07 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r91364564
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -161,12 +163,7 @@ private[spark] class Executor(
* @param interruptThread whether to interrupt the task thread
*/
   def killAllTasks(interruptThread: Boolean) : Unit = {
-// kill all the running tasks
-for (taskRunner <- runningTasks.values().asScala) {
-  if (taskRunner != null) {
-taskRunner.kill(interruptThread)
-  }
-}
+runningTasks.keys().asScala.foreach(t => killTask(t, interruptThread = 
interruptThread))
--- End diff --

Or access TaskReapers via something like a guava LoadingCache.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16065: [SPARK-18631][SQL] Changed ExchangeCoordinator re-partit...

2016-11-29 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/16065
  
@rxin fixed it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16065: [SPARK-17064][SQL] Changed ExchangeCoordinator re...

2016-11-29 Thread markhamstra
GitHub user markhamstra opened a pull request:

https://github.com/apache/spark/pull/16065

[SPARK-17064][SQL] Changed ExchangeCoordinator re-partitioning to avoid 
additional data …

## What changes were proposed in this pull request?

Re-partitioning logic in ExchangeCoordinator changed so that adding another 
pre-shuffle partition to the post-shuffle partition will not be done if doing 
so would cause the size of the post-shuffle partition to exceed the target 
partition size.  

## How was this patch tested?

Existing tests updated to reflect new expectations.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/markhamstra/spark SPARK-17064

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16065.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16065


commit 561fcf67bd3c1541352b00f33981a44fa58a6ccc
Author: Mark Hamstra <markhams...@gmail.com>
Date:   2016-11-29T20:34:03Z

Changed ExchangeCoordinator re-partitioning to avoid additional data skew




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskSetManag...

2016-11-28 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/15986
  
Thanks, @JoshRosen 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskSetManag...

2016-11-28 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/15986
  
If Kay is happy with the last couple of changes, then I'm fine with this, 
too.  The only tiny nit I've still got is a change from 
`runningTasksByExecutors()` to `runningTasksByExecutors`.  Outside of this PR, 
there's only a single call site in `SparkStatusTracker`, so the fix is pretty 
trivial -- but so is this issue itself, so I don't really care much if it stays 
as is. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskS...

2016-11-23 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/15986#discussion_r89423666
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -335,31 +337,31 @@ private[spark] class TaskSchedulerImpl(
 var reason: Option[ExecutorLossReason] = None
 synchronized {
   try {
-if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
-  // We lost this entire executor, so remember that it's gone
-  val execId = taskIdToExecutorId(tid)
-
-  if (executorIdToTaskCount.contains(execId)) {
+taskIdToTaskSetManager.get(tid) match {
+  case Some(taskSet) if state == TaskState.LOST =>
+// TaskState.LOST is only used by the deprecated Mesos 
fine-grained scheduling mode,
+// where each executor corresponds to a single task, so mark 
the executor as failed.
+val execId = taskIdToExecutorId.getOrElse(tid, throw new 
IllegalStateException(
+  "taskIdToTaskSetManager.contains(tid) <=> 
taskIdToExecutorId.contains(tid)"))
 reason = Some(
   SlaveLost(s"Task $tid was lost, so marking the executor as 
lost as well."))
 removeExecutor(execId, reason.get)
--- End diff --

Yup, good point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskSetManag...

2016-11-23 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/15986
  
The code looks pretty good @JoshRosen , but I still want to spend some time 
looking at your standalone end-to-end reproduction to get more familiar with 
the details.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskS...

2016-11-23 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/15986#discussion_r89411560
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -88,10 +88,12 @@ private[spark] class TaskSchedulerImpl(
   // Incrementing task IDs
   val nextTaskId = new AtomicLong(0)
 
-  // Number of tasks running on each executor
-  private val executorIdToTaskCount = new HashMap[String, Int]
+  // IDs of the tasks running on each executor
+  private val executorIdToRunningTaskIds = new HashMap[String, 
HashSet[Long]]
 
-  def runningTasksByExecutors(): Map[String, Int] = 
executorIdToTaskCount.toMap
+  def runningTasksByExecutors(): Map[String, Int] = synchronized {
--- End diff --

Is there a reason why this shouldn't be `def runningTasksByExecutors: 
Map[String, Int]` -- it's not a mutator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11888: [SPARK-14069][SQL] Improve SparkStatusTracker to also tr...

2016-11-23 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/11888
  
@rxin Do you mean the N/A in "How was this patch tested?"  Some guy said 
that the lack of tests was ok.  
https://github.com/apache/spark/pull/11888#issuecomment-200162987


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15463: [SPARK-17894] [CORE] Ensure uniqueness of TaskSetManager...

2016-10-21 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/15463
  
Whoa!  Jenkins disrespects @kayousterhout -- bad Jenkins!  Or did you 
actually fix something @shivaram ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15531: [SQL][STREAMING][TEST] Follow up to remove Option.contai...

2016-10-18 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/15531
  
@rxin Nonsense; it's just as good an idea as are List.contains and 
Set.contains.  The only problem with it is that it doesn't exist in Scala 2.10.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15335: [SPARK-17769][Core][Scheduler]Some FetchFailure r...

2016-10-17 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/15335#discussion_r83710471
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1255,27 +1255,46 @@ class DAGScheduler(
   s"longer running")
   }
 
-  if (disallowStageRetryForTest) {
-abortStage(failedStage, "Fetch failure will not retry stage 
due to testing config",
-  None)
-  } else if 
(failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
-abortStage(failedStage, s"$failedStage (${failedStage.name}) " 
+
-  s"has failed the maximum allowable number of " +
-  s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
-  s"Most recent failure reason: ${failureMessage}", None)
-  } else {
-if (failedStages.isEmpty) {
-  // Don't schedule an event to resubmit failed stages if 
failed isn't empty, because
-  // in that case the event will already have been scheduled.
-  // TODO: Cancel running tasks in the stage
-  logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
-s"$failedStage (${failedStage.name}) due to fetch failure")
-  messageScheduler.schedule(new Runnable {
-override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
-  }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+  val shouldAbortStage =
+failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) ||
+disallowStageRetryForTest
+
+  if (shouldAbortStage) {
+val abortMessage = if (disallowStageRetryForTest) {
+  "Fetch failure will not retry stage due to testing config"
+} else {
+  s"""$failedStage (${failedStage.name})
+ |has failed the maximum allowable number of
+ |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}.
+ |Most recent failure reason: 
$failureMessage""".stripMargin.replaceAll("\n", " ")
 }
+abortStage(failedStage, abortMessage, None)
+  } else { // update failedStages and make sure a 
ResubmitFailedStages event is enqueued
+// TODO: Cancel running tasks in the failed stage -- cf. 
SPARK-17064
+val noResubmitEnqueued = !failedStages.contains(failedStage)
--- End diff --

Ok, but it's really not that complicated or difficult to understand.

There is only one way to add stages to `failedStages`: within the 
`FetchFailed` case.  When a `failedStage` is added to `failedStages`, it is 
always accompanied by the parent `mapStage`.

There are only two ways to remove stages from `failedStages`: 1) within the 
handling of a `ResubmitFailedStages` event, when the entire `failedStages` is 
cleared; 2) within `cleanupStateForJobAndIndependentStages` when we call 
`removeStage`.  Obviously, 1) can't produce a state where `mapStage` is not in 
`failedStage` while a corresponding `failedStage` is, so the only logic we need 
to concern ourselves with is in 2).

In order for 2) to produce a state where `mapStage` is absent from 
`failedStages` while an associated `failedStage` is present, `removeStage` 
would need to have been called on the `mapStage` while not being called on the 
`failedStage`.  But that can't happen because `removeStage` will not be called 
on a stage unless no Job needs that stage anymore.  If no job needs the 
`mapStage`, then no job can need a `failedStage` that uses the output of that 
`mapStage` -- i.e. it is not possible that a `mapStage` will be removed in 
`cleanupStateForJobAndIndependentStages` unless every associated `failedStage` 
will also be removed.

Conclusion: It is never possible for `mapStage` to be absent from 
`failedStages` at the same time that `failedStages` is present, so the proposed 
`|| !failedStages.contains(mapStage)` condition will never be checked -- it 
would just be unreachable and misleading code.

There also isn't really any need for concern over lack of tests.  There is 
no need to prove correctness of the current code for something that can't 
happen presently, so the only point of such a test would be to guard against 
some future mistaken change making it possible to remove a failed `mapStage` 
while some `failedStage` still needs it.  If that happens, then we've got far 
bigger problems than checking whether we need to issue a new 
`ResubmitFailedStages` event, and checks for that kind of broken removal of 
parents while their children are still d

[GitHub] spark issue #14158: [SPARK-13547] [SQL] [WEBUI] Add SQL query in web UI's SQ...

2016-10-13 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/14158
  
@nblintao Got it; thanks.  There may be distinct queries that will be 
entirely the same within the first 1000 characters, but that's just the nature 
of working with these very large queries -- there are lots of things that make 
them difficult, but it sounds like you've made sure not to make things 
difficult for more normal queries as a consequence, so that's all good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14158: [SPARK-13547] [SQL] [WEBUI] Add SQL query in web UI's SQ...

2016-10-13 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/14158
  
@nblintao I really haven't reviewed these changes closely enough to have a 
specific complaint or concern in mind, but I'm more concerned about what 
happens when you ask to see "more" when that "more" could be several tens of 
thousands of characters, not just the few thousand that many would consider to 
be a very long SQL query.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14158: [SPARK-13547] [SQL] [WEBUI] Add SQL query in web UI's SQ...

2016-10-13 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/14158
  
It's great if it is already addressed, but I just didn't see anything 
explicit in the discussion or examples that showed any query of the magnitude 
that I am talking about.  Machine-generated queries can be staggeringly large 
(e.g. I've seen queries that were so long and complicated that they took Spark 
SQL more than 9 minutes just to parse), and this UI enhancement must be 
prepared to handle those.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14158: [SPARK-13547] [SQL] [WEBUI] Add SQL query in web UI's SQ...

2016-10-13 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/14158
  
Please be certain that this works well even for very large queries.  They 
are not commonplace, but I know that Spark SQL does sometimes get asked to 
handle SQL queries that are hundreds of lines  long.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15335: [SPARK-17769][Core][Scheduler]Some FetchFailure r...

2016-10-13 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/15335#discussion_r83289400
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1255,27 +1255,46 @@ class DAGScheduler(
   s"longer running")
   }
 
-  if (disallowStageRetryForTest) {
-abortStage(failedStage, "Fetch failure will not retry stage 
due to testing config",
-  None)
-  } else if 
(failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
-abortStage(failedStage, s"$failedStage (${failedStage.name}) " 
+
-  s"has failed the maximum allowable number of " +
-  s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
-  s"Most recent failure reason: ${failureMessage}", None)
-  } else {
-if (failedStages.isEmpty) {
-  // Don't schedule an event to resubmit failed stages if 
failed isn't empty, because
-  // in that case the event will already have been scheduled.
-  // TODO: Cancel running tasks in the stage
-  logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
-s"$failedStage (${failedStage.name}) due to fetch failure")
-  messageScheduler.schedule(new Runnable {
-override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
-  }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+  val shouldAbortStage =
+failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) ||
+disallowStageRetryForTest
+
+  if (shouldAbortStage) {
+val abortMessage = if (disallowStageRetryForTest) {
+  "Fetch failure will not retry stage due to testing config"
+} else {
+  s"""$failedStage (${failedStage.name})
+ |has failed the maximum allowable number of
+ |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}.
+ |Most recent failure reason: 
$failureMessage""".stripMargin.replaceAll("\n", " ")
 }
+abortStage(failedStage, abortMessage, None)
+  } else { // update failedStages and make sure a 
ResubmitFailedStages event is enqueued
+// TODO: Cancel running tasks in the failed stage -- cf. 
SPARK-17064
+val noResubmitEnqueued = !failedStages.contains(failedStage)
 failedStages += failedStage
 failedStages += mapStage
+if (noResubmitEnqueued) {
+  // We expect one executor failure to trigger many 
FetchFailures in rapid succession,
+  // but all of those task failures can typically be handled 
by a single resubmission of
+  // the failed stage.  We avoid flooding the scheduler's 
event queue with resubmit
+  // messages by checking whether a resubmit is already in the 
event queue for the
+  // failed stage.  If there is already a resubmit enqueued 
for a different failed
+  // stage, that event would also be sufficient to handle the 
current failed stage, but
+  // producing a resubmit for each failed stage makes 
debugging and logging a little
+  // simpler while not producing an overwhelming number of 
scheduler events.
+  logInfo(
+s"Resubmitting $mapStage (${mapStage.name}) and " +
+s"$failedStage (${failedStage.name}) due to fetch failure"
+  )
+  messageScheduler.schedule(
--- End diff --

Yup, move further discussion there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15441: [SPARK-4411] [Web UI] Add "kill" link for jobs in...

2016-10-12 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/15441#discussion_r83105534
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala ---
@@ -35,4 +37,20 @@ private[ui] class JobsTab(parent: SparkUI) extends 
SparkUITab(parent, "jobs") {
 
   attachPage(new AllJobsPage(this))
   attachPage(new JobPage(this))
+
+  def handleKillRequest(request: HttpServletRequest): Unit = {
+if (killEnabled && 
parent.securityManager.checkModifyPermissions(request.getRemoteUser)) {
+  val killFlag = 
Option(request.getParameter("terminate")).getOrElse("false").toBoolean
+  val jobId = Option(request.getParameter("id")).map(_.toInt)
+  jobId.foreach { id =>
+if (killFlag && jobProgresslistener.activeJobs.contains(id)) {
+  sc.foreach(_.cancelJob(id))
+  // Do a quick pause here to give Spark time to kill the job so 
it shows up as
+  // killed after the refresh. Note that this will block the 
serving thread so the
+  // time should be limited in duration.
+  Thread.sleep(100)
--- End diff --

Your doing the sleep here even if `sc` is `None`, but I wouldn't expect 
that that is even possible or that it is handled completely correctly elsewhere 
in the Web UI if it actually is possible, so this is likely fine in practice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15335: [SPARK-17769][Core][Scheduler]Some FetchFailure r...

2016-10-12 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/15335#discussion_r83073697
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1255,27 +1255,46 @@ class DAGScheduler(
   s"longer running")
   }
 
-  if (disallowStageRetryForTest) {
-abortStage(failedStage, "Fetch failure will not retry stage 
due to testing config",
-  None)
-  } else if 
(failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
-abortStage(failedStage, s"$failedStage (${failedStage.name}) " 
+
-  s"has failed the maximum allowable number of " +
-  s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
-  s"Most recent failure reason: ${failureMessage}", None)
-  } else {
-if (failedStages.isEmpty) {
-  // Don't schedule an event to resubmit failed stages if 
failed isn't empty, because
-  // in that case the event will already have been scheduled.
-  // TODO: Cancel running tasks in the stage
-  logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
-s"$failedStage (${failedStage.name}) due to fetch failure")
-  messageScheduler.schedule(new Runnable {
-override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
-  }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+  val shouldAbortStage =
+failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) ||
+disallowStageRetryForTest
+
+  if (shouldAbortStage) {
+val abortMessage = if (disallowStageRetryForTest) {
+  "Fetch failure will not retry stage due to testing config"
+} else {
+  s"""$failedStage (${failedStage.name})
+ |has failed the maximum allowable number of
+ |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}.
+ |Most recent failure reason: 
$failureMessage""".stripMargin.replaceAll("\n", " ")
 }
+abortStage(failedStage, abortMessage, None)
+  } else { // update failedStages and make sure a 
ResubmitFailedStages event is enqueued
+// TODO: Cancel running tasks in the failed stage -- cf. 
SPARK-17064
+val noResubmitEnqueued = !failedStages.contains(failedStage)
 failedStages += failedStage
 failedStages += mapStage
+if (noResubmitEnqueued) {
+  // We expect one executor failure to trigger many 
FetchFailures in rapid succession,
+  // but all of those task failures can typically be handled 
by a single resubmission of
+  // the failed stage.  We avoid flooding the scheduler's 
event queue with resubmit
+  // messages by checking whether a resubmit is already in the 
event queue for the
+  // failed stage.  If there is already a resubmit enqueued 
for a different failed
+  // stage, that event would also be sufficient to handle the 
current failed stage, but
+  // producing a resubmit for each failed stage makes 
debugging and logging a little
+  // simpler while not producing an overwhelming number of 
scheduler events.
+  logInfo(
+s"Resubmitting $mapStage (${mapStage.name}) and " +
+s"$failedStage (${failedStage.name}) due to fetch failure"
+  )
+  messageScheduler.schedule(
--- End diff --

Ok, I can get started on that.  I believe that leaves this PR ready to 
merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15335: [SPARK-17769][Core][Scheduler]Some FetchFailure r...

2016-10-12 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/15335#discussion_r82944965
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1255,27 +1255,46 @@ class DAGScheduler(
   s"longer running")
   }
 
-  if (disallowStageRetryForTest) {
-abortStage(failedStage, "Fetch failure will not retry stage 
due to testing config",
-  None)
-  } else if 
(failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
-abortStage(failedStage, s"$failedStage (${failedStage.name}) " 
+
-  s"has failed the maximum allowable number of " +
-  s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
-  s"Most recent failure reason: ${failureMessage}", None)
-  } else {
-if (failedStages.isEmpty) {
-  // Don't schedule an event to resubmit failed stages if 
failed isn't empty, because
-  // in that case the event will already have been scheduled.
-  // TODO: Cancel running tasks in the stage
-  logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
-s"$failedStage (${failedStage.name}) due to fetch failure")
-  messageScheduler.schedule(new Runnable {
-override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
-  }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+  val shouldAbortStage =
+failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) ||
+disallowStageRetryForTest
+
+  if (shouldAbortStage) {
+val abortMessage = if (disallowStageRetryForTest) {
+  "Fetch failure will not retry stage due to testing config"
+} else {
+  s"""$failedStage (${failedStage.name})
+ |has failed the maximum allowable number of
+ |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}.
+ |Most recent failure reason: 
$failureMessage""".stripMargin.replaceAll("\n", " ")
 }
+abortStage(failedStage, abortMessage, None)
+  } else { // update failedStages and make sure a 
ResubmitFailedStages event is enqueued
+// TODO: Cancel running tasks in the failed stage -- cf. 
SPARK-17064
+val noResubmitEnqueued = !failedStages.contains(failedStage)
 failedStages += failedStage
 failedStages += mapStage
+if (noResubmitEnqueued) {
+  // We expect one executor failure to trigger many 
FetchFailures in rapid succession,
+  // but all of those task failures can typically be handled 
by a single resubmission of
+  // the failed stage.  We avoid flooding the scheduler's 
event queue with resubmit
+  // messages by checking whether a resubmit is already in the 
event queue for the
+  // failed stage.  If there is already a resubmit enqueued 
for a different failed
+  // stage, that event would also be sufficient to handle the 
current failed stage, but
+  // producing a resubmit for each failed stage makes 
debugging and logging a little
+  // simpler while not producing an overwhelming number of 
scheduler events.
+  logInfo(
+s"Resubmitting $mapStage (${mapStage.name}) and " +
+s"$failedStage (${failedStage.name}) due to fetch failure"
+  )
+  messageScheduler.schedule(
--- End diff --

Ah, sorry for ascribing the prior comment to your preferences.  That 
comment actually did make sense a long time ago when the resubmitting of stages 
really was done periodically by an Akka scheduled event that fired every 
something seconds.  I'm pretty sure the RESUBMIT_TIMEOUT stuff is also legacy 
code that doesn't make sense and isn't necessary any more.

So, do you want to do the follow-up PR to get rid of it, or shall I? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15441: [SPARK-4411] [Web UI] Add "kill" link for jobs in...

2016-10-11 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/15441#discussion_r82925875
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala ---
@@ -35,4 +37,18 @@ private[ui] class JobsTab(parent: SparkUI) extends 
SparkUITab(parent, "jobs") {
 
   attachPage(new AllJobsPage(this))
   attachPage(new JobPage(this))
+
+  def handleKillRequest(request: HttpServletRequest): Unit = {
+if (killEnabled && 
(parent.securityManager.checkModifyPermissions(request.getRemoteUser))) {
+  val killFlag = 
Option(request.getParameter("terminate")).getOrElse("false").toBoolean
+  val jobId = Option(request.getParameter("id")).getOrElse("-1").toInt
+  if (jobId >= 0 && killFlag && 
jobProgresslistener.activeJobs.contains(jobId)) {
+sc.get.cancelJob(jobId)
+  }
--- End diff --

And if the Job isn't actually going to be canceled, then there is no need 
to delay the page refresh (which I'm not entirely happy with, but I'm not going 
to try to resolve that issue right now.)  So...
```scala
sc.foreach { sparkContext =>
  sparkContext.cancelJob(id)
  Thread.sleep(100)
}
```

And we better make that `jobId` is an `Option[Int]` instead of an 
`Option[String]`, so...
```scala
val jobId = Option(request.getParameter("id")).map(_.toInt)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15441: [SPARK-4411] [Web UI] Add "kill" link for jobs in...

2016-10-11 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/15441#discussion_r82924909
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala ---
@@ -35,4 +37,18 @@ private[ui] class JobsTab(parent: SparkUI) extends 
SparkUITab(parent, "jobs") {
 
   attachPage(new AllJobsPage(this))
   attachPage(new JobPage(this))
+
+  def handleKillRequest(request: HttpServletRequest): Unit = {
+if (killEnabled && 
(parent.securityManager.checkModifyPermissions(request.getRemoteUser))) {
+  val killFlag = 
Option(request.getParameter("terminate")).getOrElse("false").toBoolean
+  val jobId = Option(request.getParameter("id")).getOrElse("-1").toInt
+  if (jobId >= 0 && killFlag && 
jobProgresslistener.activeJobs.contains(jobId)) {
+sc.get.cancelJob(jobId)
+  }
--- End diff --

Similarly, there is no need for `sc.get`.  In fact, that's a bug if 
`parent.sc` really should be an `Option` and thus could be `None`.
```scala
sc.foreach(_.cancelJob(id))
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15441: [SPARK-4411] [Web UI] Add "kill" link for jobs in...

2016-10-11 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/15441#discussion_r82923313
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala ---
@@ -35,4 +37,18 @@ private[ui] class JobsTab(parent: SparkUI) extends 
SparkUITab(parent, "jobs") {
 
   attachPage(new AllJobsPage(this))
   attachPage(new JobPage(this))
+
+  def handleKillRequest(request: HttpServletRequest): Unit = {
+if (killEnabled && 
(parent.securityManager.checkModifyPermissions(request.getRemoteUser))) {
+  val killFlag = 
Option(request.getParameter("terminate")).getOrElse("false").toBoolean
+  val jobId = Option(request.getParameter("id")).getOrElse("-1").toInt
+  if (jobId >= 0 && killFlag && 
jobProgresslistener.activeJobs.contains(jobId)) {
+sc.get.cancelJob(jobId)
+  }
--- End diff --

Creating an `Option` only to immediately `get` the value out of it is poor 
style, and unnecessary.
```scala
val jobId = Option(request.getParameter("id"))
jobId.foreach { id =>
  if (killFlag && jobProgresslistener.activeJobs.contains(id)) {
sc.get.cancelJob(id)
  }
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...

2016-10-11 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/15249#discussion_r82891721
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -98,6 +84,14 @@ private[spark] class TaskSetManager(
   var totalResultSize = 0L
   var calculatedTasks = 0
 
+  val taskSetBlacklistOpt: Option[TaskSetBlacklist] = {
--- End diff --

I too go back and forth on naming `Option`s, and haven't yet hit upon a 
convention that is entirely satisfactory.  Another `Option` option (uh 
yeah) is `maybeFoo`, which we've also used within the Spark codebase.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15335: [SPARK-17769][Core][Scheduler]Some FetchFailure r...

2016-10-11 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/15335#discussion_r82869819
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1255,27 +1255,46 @@ class DAGScheduler(
   s"longer running")
   }
 
-  if (disallowStageRetryForTest) {
-abortStage(failedStage, "Fetch failure will not retry stage 
due to testing config",
-  None)
-  } else if 
(failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
-abortStage(failedStage, s"$failedStage (${failedStage.name}) " 
+
-  s"has failed the maximum allowable number of " +
-  s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
-  s"Most recent failure reason: ${failureMessage}", None)
-  } else {
-if (failedStages.isEmpty) {
-  // Don't schedule an event to resubmit failed stages if 
failed isn't empty, because
-  // in that case the event will already have been scheduled.
-  // TODO: Cancel running tasks in the stage
-  logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
-s"$failedStage (${failedStage.name}) due to fetch failure")
-  messageScheduler.schedule(new Runnable {
-override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
-  }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+  val shouldAbortStage =
+failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) ||
+disallowStageRetryForTest
+
+  if (shouldAbortStage) {
+val abortMessage = if (disallowStageRetryForTest) {
+  "Fetch failure will not retry stage due to testing config"
+} else {
+  s"""$failedStage (${failedStage.name})
+ |has failed the maximum allowable number of
+ |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}.
+ |Most recent failure reason: 
$failureMessage""".stripMargin.replaceAll("\n", " ")
 }
+abortStage(failedStage, abortMessage, None)
+  } else { // update failedStages and make sure a 
ResubmitFailedStages event is enqueued
+// TODO: Cancel running tasks in the failed stage -- cf. 
SPARK-17064
+val noResubmitEnqueued = !failedStages.contains(failedStage)
 failedStages += failedStage
 failedStages += mapStage
+if (noResubmitEnqueued) {
+  // We expect one executor failure to trigger many 
FetchFailures in rapid succession,
+  // but all of those task failures can typically be handled 
by a single resubmission of
+  // the failed stage.  We avoid flooding the scheduler's 
event queue with resubmit
+  // messages by checking whether a resubmit is already in the 
event queue for the
+  // failed stage.  If there is already a resubmit enqueued 
for a different failed
+  // stage, that event would also be sufficient to handle the 
current failed stage, but
+  // producing a resubmit for each failed stage makes 
debugging and logging a little
+  // simpler while not producing an overwhelming number of 
scheduler events.
+  logInfo(
+s"Resubmitting $mapStage (${mapStage.name}) and " +
+s"$failedStage (${failedStage.name}) due to fetch failure"
+  )
+  messageScheduler.schedule(
--- End diff --

1. I don't like "Periodically" in your suggested comment, since this is a 
one-shot action after a delay of RESUBMIT_TIMEOUT milliseconds.

2. I agree that this delay-before-resubmit logic is suspect.  If we are 
both thinking correctly that a 200 ms delay on top of the time to re-run the 
`mapStage` is all but inconsequential, then removing it in this PR would be 
fine.  If there are unanticipated consequences, though, I'd prefer to have that 
change in a separate PR.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15335: [SPARK-17769][Core][Scheduler]Some FetchFailure r...

2016-10-11 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/15335#discussion_r82864060
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1255,27 +1255,46 @@ class DAGScheduler(
   s"longer running")
   }
 
-  if (disallowStageRetryForTest) {
-abortStage(failedStage, "Fetch failure will not retry stage 
due to testing config",
-  None)
-  } else if 
(failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
-abortStage(failedStage, s"$failedStage (${failedStage.name}) " 
+
-  s"has failed the maximum allowable number of " +
-  s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
-  s"Most recent failure reason: ${failureMessage}", None)
-  } else {
-if (failedStages.isEmpty) {
-  // Don't schedule an event to resubmit failed stages if 
failed isn't empty, because
-  // in that case the event will already have been scheduled.
-  // TODO: Cancel running tasks in the stage
-  logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
-s"$failedStage (${failedStage.name}) due to fetch failure")
-  messageScheduler.schedule(new Runnable {
-override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
-  }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+  val shouldAbortStage =
+failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) ||
+disallowStageRetryForTest
+
+  if (shouldAbortStage) {
+val abortMessage = if (disallowStageRetryForTest) {
+  "Fetch failure will not retry stage due to testing config"
+} else {
+  s"""$failedStage (${failedStage.name})
+ |has failed the maximum allowable number of
+ |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}.
+ |Most recent failure reason: 
$failureMessage""".stripMargin.replaceAll("\n", " ")
 }
+abortStage(failedStage, abortMessage, None)
+  } else { // update failedStages and make sure a 
ResubmitFailedStages event is enqueued
+// TODO: Cancel running tasks in the failed stage -- cf. 
SPARK-17064
+val noResubmitEnqueued = !failedStages.contains(failedStage)
--- End diff --

Right here is the only place we put anything into `failedStages`, so 
`failedStage` and `mapStage` always go in as pairs.  The only places where we 
remove things from `failedStages` are `resubmitFailedStages` and  
`DAGScheduler#cleanupStateForJobAndIndependentStages#removeStage`.  We clear 
`failedStages` in `resubmitFailedStages`, so the only place where `failedStage` 
and `mapStage` could get unpaired in `failedStages` is in 
`cleanupStateForJobAndIndependentStages#removeStage`.  That would happen if the 
number of Jobs that use `failedStage` and `mapStage` is unequal.  If I'm 
thinking correctly, that could only happen if the `mapStage` is used by more 
Jobs than is the `failedStage`.  In that case, cleaning up the last Job that 
uses `failedStage` would remove `failedStage` from `failedStages` while 
`mapStage` would remain.

To fall into your proposed `|| !failedStages.contains(mapStage)` branch, 
another `failedStage` needing `mapStage`, this time coming from one of the 
remaining Jobs using `mapStage`, would need to fail.  If that is the case, then 
we still want to log the failure of the new `failedStage`, so I don't think we 
want `|| !failedStages.contains(mapStage)` -- without it, we'll get a duplicate 
of `mapStage` added to `failedStages`, but that's no big deal. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12775: [SPARK-14958][Core] Failed task not handled when ...

2016-10-05 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/12775#discussion_r82079358
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala ---
@@ -135,8 +135,9 @@ private[spark] class TaskResultGetter(sparkEnv: 
SparkEnv, scheduler: TaskSchedul
   logError(
 "Could not deserialize TaskEndReason: ClassNotFound with 
classloader " + loader)
 case ex: Exception => // No-op
+  } finally {
--- End diff --

makes sense


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15350: [SPARK-17778][Tests]Mock SparkContext to reduce memory u...

2016-10-05 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/15350
  
@zsxwing `build/mvn -U -Pyarn -Phadoop-2.7 -Pkinesis-asl -Phive 
-Phive-thriftserver -Dpyspark -Dsparkr test` completely succeeded on one of my 
machines where it was previously failing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15326: [SPARK-17759] [CORE] FairSchedulableBuilder should avoid...

2016-10-03 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/15326
  
@erenavsarogullari Your concern was entirely legitimate, and is also why I 
called in @kayousterhout to double-check my claim that other duplicate 
Schedulables would also be a problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15326: [SPARK-17759] [CORE] FairSchedulableBuilder should avoid...

2016-10-03 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/15326
  
...and it would be 2.0.2 at this point. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15326: [SPARK-17759] [CORE] FairSchedulableBuilder should avoid...

2016-10-03 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/15326
  
@kayousterhout @rxin Ok, but if we're going to change the behavior, then we 
need to be sure that change at least makes it into the release notes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15335: Some FetchFailure refactoring

2016-10-03 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/15335
  
@squito the promised follow-up PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15335: Some FetchFailure refactoring

2016-10-03 Thread markhamstra
GitHub user markhamstra opened a pull request:

https://github.com/apache/spark/pull/15335

Some FetchFailure refactoring

## What changes were proposed in this pull request?

Readability rewrites.
Changed order of 
`failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)` and 
`disallowStageRetryForTest` evaluation.
Stage resubmission guard condition changed from `failedStages.isEmpty` to 
`!failedStages.contains(failedStage)`
Log all resubmission of stages

## How was this patch tested?

existing tests

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)


(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/markhamstra/spark SPARK-17769

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15335.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15335






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15301: [SPARK-17717][SQL] Add exist/find methods to Cata...

2016-09-29 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/15301#discussion_r81270990
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala ---
@@ -102,6 +102,83 @@ abstract class Catalog {
   def listColumns(dbName: String, tableName: String): Dataset[Column]
 
   /**
+   * Find the database with the specified name. This returns [[None]] when 
no [[Database]] can be
+   * found.
+   *
+   * @since 2.1.0
+   */
+  def findDatabase(dbName: String): Option[Database]
--- End diff --

Sorry, I'm a bit late to this party.  In terms of Scala vs. Java API, 
"find*" doesn't really strike me as the best naming, especially not after these 
"find*" methods no longer produce an Option.  In Scala, I expect a `find` to 
take a predicate and to produce an Option that may contain the first element to 
satisfy the predicate.  Something that throws an exception when the one thing 
asked for doesn't exist feels more like a `get` than a `find` to me.

Can we reconsider before we get to the point where we find that this API is 
set in stone? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...

2016-09-28 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/15249#discussion_r81036771
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -592,34 +587,54 @@ private[spark] class TaskSetManager(
* failures (this is because the method picks on unscheduled task, and 
then iterates through each
* executor until it finds one that the task hasn't failed on already).
*/
-  private[scheduler] def abortIfCompletelyBlacklisted(executors: 
Iterable[String]): Unit = {
-
-val pendingTask: Option[Int] = {
-  // usually this will just take the last pending task, but because of 
the lazy removal
-  // from each list, we may need to go deeper in the list.  We poll 
from the end because
-  // failed tasks are put back at the end of allPendingTasks, so we're 
more likely to find
-  // an unschedulable task this way.
-  val indexOffset = allPendingTasks.lastIndexWhere { indexInTaskSet =>
-copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet)
-  }
-  if (indexOffset == -1) {
-None
-  } else {
-Some(allPendingTasks(indexOffset))
-  }
-}
+  private[scheduler] def abortIfCompletelyBlacklisted(
+  hostToExecutors: HashMap[String, HashSet[String]]): Unit = {
+taskSetBlacklistOpt.foreach { taskSetBlacklist =>
+// If no executors have registered yet, don't abort the stage, 
just wait.  We probably
+// got here because a task set was added before the executors 
registered.
+  if (hostToExecutors.nonEmpty) {
+// take any task that needs to be scheduled, and see if we can 
find some executor it *could*
+// run on
+val pendingTask: Option[Int] = {
+  // usually this will just take the last pending task, but 
because of the lazy removal
+  // from each list, we may need to go deeper in the list.  We 
poll from the end because
+  // failed tasks are put back at the end of allPendingTasks, so 
we're more likely to find
+  // an unschedulable task this way.
+  val indexOffset = allPendingTasks.lastIndexWhere { 
indexInTaskSet =>
+copiesRunning(indexInTaskSet) == 0 && 
!successful(indexInTaskSet)
+  }
+  if (indexOffset == -1) {
+None
+  } else {
+Some(allPendingTasks(indexOffset))
+  }
+}
 
-// If no executors have registered yet, don't abort the stage, just 
wait.  We probably
-// got here because a task set was added before the executors 
registered.
-if (executors.nonEmpty) {
-  // take any task that needs to be scheduled, and see if we can find 
some executor it *could*
-  // run on
-  pendingTask.foreach { taskId =>
-if (executors.forall(executorIsBlacklisted(_, taskId))) {
-  val execs = executors.toIndexedSeq.sorted.mkString("(", ",", ")")
-  val partition = tasks(taskId).partitionId
-  abort(s"Aborting ${taskSet} because task $taskId (partition 
$partition)" +
-s" has already failed on executors $execs, and no other 
executors are available.")
+pendingTask.foreach { indexInTaskSet =>
+  // try to find some executor this task can run on.  Its possible 
that some *other*
+  // task isn't schedulable anywhere, but we will discover that in 
some later call,
+  // when that unschedulable task is the last task remaining.
+  val blacklistedEverywhere = hostToExecutors.forall { case (host, 
execs) =>
+// Check if the task can run on the node
+val nodeBlacklisted =
+  taskSetBlacklist.isNodeBlacklistedForTaskSet(host) ||
+  taskSetBlacklist.isNodeBlacklistedForTask(host, 
indexInTaskSet)
+if (nodeBlacklisted) {
+  true
+} else {
+  // Check if the task can run on any of the executors
+  execs.forall { exec =>
+  taskSetBlacklist.isExecutorBlacklistedForTaskSet(exec) ||
+  taskSetBlacklist.isExecutorBlacklistedForTask(exec, 
indexInTaskSet)
+  }
+}
+  }
+  if (blacklistedEverywhere) {
+val partition = tasks(indexInTaskSet).partitionId
+abort(s"Aborting ${taskSet} because task $indexInTaskSet 
(partition $partition) " +
--- End diff --

nit: `$taskSet`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does 

[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...

2016-09-28 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/15249#discussion_r81035029
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -478,8 +473,8 @@ private[spark] class TaskSetManager(
   // a good proxy to task serialization time.
   // val timeTaken = clock.getTime() - startTime
   val taskName = s"task ${info.id} in stage ${taskSet.id}"
-  logInfo(s"Starting $taskName (TID $taskId, $host, partition 
${task.partitionId}," +
-s" $taskLocality, ${serializedTask.limit} bytes)")
+  logInfo(s"Starting $taskName (TID $taskId, $host, executor 
${info.executorId}, " +
+s"partition ${task.partitionId}, $taskLocality, 
${serializedTask.limit} bytes)")
 
   sched.dagScheduler.taskStarted(task, info)
   return Some(new TaskDescription(taskId = taskId, attemptNumber = 
attemptNum, execId,
--- End diff --

Not part of your code, but I'm not seeing a good reason for the non-local 
return here.  The last lines of `resourceOffer` can just as easily be...
```scala
  sched.dagScheduler.taskStarted(task, info)
  Some(new TaskDescription(taskId = taskId, attemptNumber = 
attemptNum, execId,
taskName, index, serializedTask))
case _ => None
  }
} else None
  }
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...

2016-09-28 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/15249#discussion_r81024744
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -421,7 +412,11 @@ private[spark] class TaskSetManager(
   maxLocality: TaskLocality.TaskLocality)
 : Option[TaskDescription] =
   {
-if (!isZombie) {
+val offerBlacklisted = taskSetBlacklistOpt.map { blacklist =>
+  blacklist.isNodeBlacklistedForTaskSet(host) ||
+blacklist.isExecutorBlacklistedForTaskSet(execId)
+}.getOrElse(false)
--- End diff --

Another place to use `.exists { p }` instead of `.map { p 
}.getOrElse(false)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...

2016-09-28 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/15249#discussion_r81022074
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -266,19 +263,11 @@ private[spark] class TaskSetManager(
 taskAttempts(taskIndex).exists(_.host == host)
   }
 
-  /**
-   * Is this re-execution of a failed task on an executor it already 
failed in before
-   * EXECUTOR_TASK_BLACKLIST_TIMEOUT has elapsed ?
-   */
-  private[scheduler] def executorIsBlacklisted(execId: String, taskId: 
Int): Boolean = {
-if (failedExecutors.contains(taskId)) {
-  val failed = failedExecutors.get(taskId).get
-
-  return failed.contains(execId) &&
-clock.getTimeMillis() - failed.get(execId).get < 
EXECUTOR_TASK_BLACKLIST_TIMEOUT
-}
-
-false
+  private def isTaskBlacklistedOnExecOrNode(index: Int, execId: String, 
host: String): Boolean = {
+taskSetBlacklistOpt.map { blacklist =>
+  blacklist.isNodeBlacklistedForTask(host, index) ||
+blacklist.isExecutorBlacklistedForTask(execId, index)
+}.getOrElse(false)
--- End diff --

```scala
taskSetBlacklistOpt.exists { blacklist =>
  blacklist.isNodeBlacklistedForTask(host, index) ||
  blacklist.isExecutorBlacklistedForTask(execId, index)
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...

2016-09-28 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/15249#discussion_r81020674
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[scheduler] object BlacklistTracker extends Logging {
+
+  private val DEFAULT_TIMEOUT = "1h"
+
+  /**
+   * Returns true if the blacklist is enabled, based on checking the 
configuration in the following
+   * order:
+   * 1. Is it specifically enabled or disabled?
+   * 2. Is it enabled via the legacy timeout conf?
+   * 3. Use the default for the spark-master:
+   *   - off for local mode
+   *   - on for distributed modes (including local-cluster)
+   */
+  def isBlacklistEnabled(conf: SparkConf): Boolean = {
+conf.get(config.BLACKLIST_ENABLED) match {
+  case Some(isEnabled) =>
+isEnabled
+  case None =>
+// if they've got a non-zero setting for the legacy conf, always 
enable the blacklist,
+// otherwise, use the default based on the cluster-mode (off for 
local-mode, on otherwise).
+val legacyKey = config.BLACKLIST_LEGACY_TIMEOUT_CONF.key
+conf.get(config.BLACKLIST_LEGACY_TIMEOUT_CONF) match {
+  case Some(legacyTimeout) =>
+if (legacyTimeout == 0) {
+  logWarning(s"Turning off blacklisting due to legacy 
configuaration:" +
--- End diff --

...and the line break isn't necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...

2016-09-28 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/15249#discussion_r81020284
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[scheduler] object BlacklistTracker extends Logging {
+
+  private val DEFAULT_TIMEOUT = "1h"
+
+  /**
+   * Returns true if the blacklist is enabled, based on checking the 
configuration in the following
+   * order:
+   * 1. Is it specifically enabled or disabled?
+   * 2. Is it enabled via the legacy timeout conf?
+   * 3. Use the default for the spark-master:
+   *   - off for local mode
+   *   - on for distributed modes (including local-cluster)
+   */
+  def isBlacklistEnabled(conf: SparkConf): Boolean = {
+conf.get(config.BLACKLIST_ENABLED) match {
+  case Some(isEnabled) =>
+isEnabled
+  case None =>
+// if they've got a non-zero setting for the legacy conf, always 
enable the blacklist,
+// otherwise, use the default based on the cluster-mode (off for 
local-mode, on otherwise).
+val legacyKey = config.BLACKLIST_LEGACY_TIMEOUT_CONF.key
+conf.get(config.BLACKLIST_LEGACY_TIMEOUT_CONF) match {
+  case Some(legacyTimeout) =>
+if (legacyTimeout == 0) {
+  logWarning(s"Turning off blacklisting due to legacy 
configuaration:" +
--- End diff --

typo: configuration


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15249: [SPARK-17675] [CORE] Expand Blacklist for TaskSet...

2016-09-28 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/15249#discussion_r81018983
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -50,22 +48,12 @@ import org.apache.spark.util.{AccumulatorV2, Clock, 
SystemClock, Utils}
  *task set will be aborted
  */
 private[spark] class TaskSetManager(
-sched: TaskSchedulerImpl,
+val sched: TaskSchedulerImpl,
 val taskSet: TaskSet,
 val maxTaskFailures: Int,
-clock: Clock = new SystemClock())
-  extends Schedulable with Logging {
+val clock: Clock = new SystemClock()) extends Schedulable with Logging 
{
--- End diff --

Why are `sched` and `clock` being made fields?  It doesn't seem necessary 
to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15213: [SPARK-17644] [CORE] Do not add failedStages when abortS...

2016-09-27 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/15213
  
> Honestly, I think just getting the fix in is important enough that I'm 
fine w/ putting in the minimally invasive thing now.

That's fine, @squito -- go ahead and merge when you're happy with your 
requested changes, and then I'll follow up in short order with a separate 
refactoring PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15221: [SPARK-17648][CORE] TaskScheduler really needs offers to...

2016-09-27 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/15221
  
I'm glad Jenkins finally got it together. LGTM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15213: [SPARK-17644] [CORE] Do not add failedStages when abortS...

2016-09-26 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/15213
  
@scwf I understand that you were trying to make the least invasive fix 
possible to deal with the problem.  That's usually a good thing to do, but even 
when that kind of fix is getting to the root of the problem it can still result 
in layers of patches that are hard to make sense of.  That's not really the 
fault of any one patch; rather, the blame lies more with those of us who often 
didn't produce clear, maintainable code in the first place.  When it's possible 
to see re-organizing principles that will make the code clearer, reduce 
duplication, make future maintenance less error prone, etc., then it's usually 
a good idea to do a little larger refactoring instead of just a minimally 
invasive fix.

I think this is a small example of where that kind of refactoring makes 
sense, so that's why I made my code suggestion.  If you can see ways to make 
things even clearer, then feel free to suggest them.  I'm sure that Kay, Imran 
and others who also have been trying to make these kinds of clarifying changes 
in the DAGScheduler will also chime in if they have further suggestions. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15213: [SPARK-17644] [CORE] Do not add failedStages when abortS...

2016-09-26 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/15213
  
The fix is logically correct; however, the prior code is needlessly complex 
and not as easy to understand as it should be, and the proposed fix doesn't 
improve on that.  I'd like to take the opportunity to make the code easier to 
understand and maintain.  Something like this:
```scala
  // It is likely that we receive multiple FetchFailed for a single 
stage (because we have
  // multiple tasks running concurrently on different executors). 
In that case, it is
  // possible the fetch failure has already been handled by the 
scheduler.
  if (runningStages.contains(failedStage)) {
logInfo(s"Marking $failedStage (${failedStage.name}) as failed 
" +
  s"due to a fetch failure from $mapStage (${mapStage.name})")
markStageAsFinished(failedStage, Some(failureMessage))
  } else {
logDebug(s"Received fetch failure from $task, but its from 
$failedStage which is no " +
  s"longer running")
  }

  val shouldAbortStage =
failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) ||
disallowStageRetryForTest

  if (shouldAbortStage) {
val abortMessage = if (disallowStageRetryForTest) {
  "Fetch failure will not retry stage due to testing config"
} else {
  s"$failedStage (${failedStage.name}) " +
  s"has failed the maximum allowable number of " +
  s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
  s"Most recent failure reason: $failureMessage"
}
abortStage(failedStage, abortMessage, None)
  } else { // update failedStages and make sure a 
ResubmitFailedStages event is enqueued
// TODO: Cancel running tasks in the failed stage -- cf. 
SPARK-17064
val noResubmitEnqueued = failedStages.isEmpty
failedStages += failedStage
failedStages += mapStage
if (noResubmitEnqueued) {
  // If failedStages is not empty, then a previous FetchFailed 
already went through
  // this block of code and queued up a ResubmitFailedStages 
event that has not yet
  // run.  We, therefore, only need to queue up a new 
ResubmitFailedStages event when
  // failedStages.isEmpty.
  logInfo(
s"Resubmitting $mapStage (${mapStage.name}) and " +
s"$failedStage (${failedStage.name}) due to fetch failure"
  )
  messageScheduler.schedule(
new Runnable {
  override def run(): Unit = 
eventProcessLoop.post(ResubmitFailedStages)
},
DAGScheduler.RESUBMIT_TIMEOUT,
TimeUnit.MILLISECONDS
  )
}
  }

  // Mark the map whose fetch failed as broken in the map stage
```

This should be equivalent to what you have, @scwf, with the exception that 
`fetchFailedAttemptIds.add(stageAttemptId)` is done even when 
`disallowStageRetryForTest` is `true` -- which seems like a better idea to me.

Also available here:

https://github.com/markhamstra/spark/commit/368f82d9789ec04565af835e7cb80d1cdb0ccf0c

@squito 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15213: [SPARK-17644] [CORE] Fix the race condition when DAGSche...

2016-09-23 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/15213
  
Right, but `abortStage` occurs elsewhere.  "When abort stage" seems to 
imply that this fix is necessary for all usages of `abortStage` when the actual 
problem is not in `abortStage` but rather in improper additions to 
`failedStages`.  I've got to go now, but I'll come back to this soon(ish).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15213: [SPARK-17644] [CORE] Fix the race condition when DAGSche...

2016-09-23 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/15213
  
@scwf That description would actually be at least as bad since there are 
multiple routes to `abortStage` and this issue of adding to `failedStages` only 
applies to these two.  I'll take another look soon and see if I can come up 
with a clean refactoring and a better description for the commit message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15213: [SPARK-17644] [CORE] Fix the race condition when DAGSche...

2016-09-23 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/15213
  
Ok, that makes better sense.

The `disallowStageRetryForTest` case doesn't worry me too much since it is 
only used in tests.  If we can fix this case, great; else if it remains 
possible to create failing tests that can never happen outside of the tests, 
then that is not all that important (but should at least be noted in comments 
in the test suite.)

Yes, not adding to `failedStages` after going down either of those two 
paths to `abortStage` is a correct fix even if the description of the problem 
wasn't really accurate.  I'll take another look over the weekend to see if the 
logic can be expressed a bit more clearly. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15213: [SPARK-17644] [CORE] Fix the race condition when DAGSche...

2016-09-23 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/15213
  
This doesn't make sense to me.  The DAGSchedulerEventProcessLoop runs on a 
single thread and processes a single event from its queue at a time.

When the first CompletionEvent is run as a result of a fetch failure, 
failedStages is added to and a ResubmitFailedStages event is queued.  After 
handleTaskCompletion is done, the next event from the queue will be processed.  
As events are sequentially dequeued and handled, either the 
ResubmitFailedStages event will be handled before the CompletionEvent for the 
second fetch failure, or the CompletionEvent will be handled before the 
ResubmitFailedStages event.  If the ResubmitFailedStages is handled first, then 
failedStages will be cleared in resubmitFailedStages, and there will be nothing 
preventing the subsequent CompletionEvent from queueing another 
ResubmitFailedStages event to handle additional fetch failures.  In the 
alternative that the second CompletionEvent is queued and handled before the 
ResubmitFailedStages event, then the additional stages are added to the 
non-empty failedStages, but there is no need to schedule another 
ResubmitFailedStages event because the one from 
 the first CompletionEvent is still on the queue and the handling of that 
queued event will also handle the newly added failedStages from the second 
CompletionEvent.  In either ordering, all the failedStages are handled and 
there is no race condition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15084: [SPARK-17529][core] Implement BitSet.clearUntil and use ...

2016-09-13 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/15084
  
@rxin @jegonzal looking for a third-party review


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15084: [SPARK-17529][core] Implement BitSet.clearUntil and use ...

2016-09-13 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/15084
  
test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14952: [SPARK-17110] Fix StreamCorruptionException in Bl...

2016-09-06 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/14952#discussion_r77697794
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -520,10 +520,11 @@ private[spark] class BlockManager(
*
* This does not acquire a lock on this block in this JVM.
*/
-  private def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
+  private def getRemoteValues[T: ClassTag](blockId: BlockId): 
Option[BlockResult] = {
+val ct = implicitly[ClassTag[T]]
 getRemoteBytes(blockId).map { data =>
   val values =
-serializerManager.dataDeserializeStream(blockId, 
data.toInputStream(dispose = true))
+serializerManager.dataDeserializeStream(blockId, 
data.toInputStream(dispose = true))(ct)
--- End diff --

How do you forget to pass a correct ClassTag when the compiler is enforcing 
its presence via the context bound?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14958: [SPARK-17378] [BUILD] Upgrade snappy-java to 1.1.2.6

2016-09-06 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/14958
  
@srowen The bug does manifest in Spark.  I've been running this upgrade in 
production for a few weeks with no issues.  This should be merged IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14952: [SPARK-17110] Fix StreamCorruptionException in Bl...

2016-09-04 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/14952#discussion_r77459444
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -520,10 +520,11 @@ private[spark] class BlockManager(
*
* This does not acquire a lock on this block in this JVM.
*/
-  private def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
+  private def getRemoteValues[T: ClassTag](blockId: BlockId): 
Option[BlockResult] = {
+val ct = implicitly[ClassTag[T]]
 getRemoteBytes(blockId).map { data =>
   val values =
-serializerManager.dataDeserializeStream(blockId, 
data.toInputStream(dispose = true))
+serializerManager.dataDeserializeStream(blockId, 
data.toInputStream(dispose = true))(ct)
--- End diff --

I'm not saying this should definitely be done one way or the other, but I'm 
curious why you have a preference for the extra code and more verbose API that 
come with making the classTag an explicit parameter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14737: [SPARK-17171][WEB UI] DAG will list all partition...

2016-08-21 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/14737#discussion_r75602737
  
--- Diff: core/src/main/scala/org/apache/spark/ui/SparkUI.scala ---
@@ -141,6 +141,7 @@ private[spark] object SparkUI {
   val DEFAULT_POOL_NAME = "default"
   val DEFAULT_RETAINED_STAGES = 1000
   val DEFAULT_RETAINED_JOBS = 1000
+  val DEFAULT_RETAINED_NODES = 2
--- End diff --

`NODES`, both here and in `spark.ui.retainedNodes` if far too ambiguous and 
non-specific for this configuration value -- "node" is already overloaded too 
many times in the existing Spark code and documentation; we don't need or want 
to add another overload.

Additionally, the default behavior should be the same as current behavior, 
since the change in behavior would be unexpected and it is far from clear to me 
that the overwhelming majority of users would prefer the proposed new behavior. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14737: [SPARK-17171][WEB UI] DAG will list all partition...

2016-08-21 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/14737#discussion_r75602575
  
--- Diff: 
core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala ---
@@ -119,18 +119,47 @@ private[ui] object RDDOperationGraph extends Logging {
   { if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" 
}
 val rootCluster = new RDDOperationCluster(stageClusterId, 
stageClusterName)
 
+var rootNodeCount = 0
+val addRDDIds = new mutable.HashSet[Int]()
+val dropRDDIds = new mutable.HashSet[Int]()
+
+def isAllowed(ids: mutable.HashSet[Int], rdd: RDDInfo): Boolean = {
+  val parentIds = rdd.parentIds
+  if (parentIds.size == 0) {
+rootNodeCount < retainedNodes
+  } else {
+if (ids.size > 0) {
+parentIds.exists(id => ids.contains(id) || 
!dropRDDIds.contains(id))
+} else {
+true
+}
+  }
+}
+
 // Find nodes, edges, and operation scopes that belong to this stage
-stage.rddInfos.foreach { rdd =>
-  edges ++= rdd.parentIds.map { parentId => RDDOperationEdge(parentId, 
rdd.id) }
+stage.rddInfos.sortBy(_.id).foreach { rdd =>
+  val keepNode: Boolean = isAllowed(addRDDIds, rdd)
+  if (keepNode) {
+addRDDIds.add(rdd.id)
+edges ++= rdd.parentIds.filter(id => !dropRDDIds.contains(id))
+  .map { parentId => RDDOperationEdge(parentId, rdd.id) }
--- End diff --

This isn't just a question of whether `{ }` is necessary, but also whether 
using them to delimit closures with `map`, `filter`, etc. has become the 
defacto and accepted style in Spark code.  It has -- for just one of many, many 
examples, see the code that this diff is replacing.  Delimiting closures in 
this way even when not strictly necessary improves consistency and readability 
by not requiring more parsing of parentheses levels.  I wouldn't have 
recommended that this diff be changed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14557: [SPARK-16709][CORE] Kill the running task if stage faile...

2016-08-15 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/14557
  
There are multiple issues with this PR.  Some are at a more stylistic 
level, but some include deeper issues -- e.g. see SPARK-17064.  Most 
fundamentally, this PR is the wrong solution at least in the sense that it does 
not implement a minimal fix without other side effects.  The problem is that 
TaskCommitDenied is not being handled properly when a duplicate Task tries to 
commit a result that has already been successfully committed by another attempt 
of this Task.  The proper fix needs to be at that point of committing duplicate 
results, not by making the larger, unnecessary change in how we handle 
cancellation/interruption of other Tasks in a TaskSet when one of them produces 
a FetchFailed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #12436: [SPARK-14649][CORE] DagScheduler should not run duplicat...

2016-08-15 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/12436
  
See https://issues.apache.org/jira/browse/SPARK-17064


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14534: [SPARK-16941]Add SynchronizedMap trait with Map i...

2016-08-08 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/14534#discussion_r73908742
  
--- Diff: 
sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
 ---
@@ -39,8 +39,10 @@ private[thriftserver] class SparkSQLOperationManager()
   val handleToOperation = ReflectionUtils
 .getSuperField[JMap[OperationHandle, Operation]](this, 
"handleToOperation")
 
-  val sessionToActivePool = Map[SessionHandle, String]()
-  val sessionToContexts = Map[SessionHandle, SQLContext]()
+  val sessionToActivePool = new mutable.HashMap[SessionHandle, String]()
--- End diff --

Correct; SynchronizedMap has been deprecated since Scala 2.11.0 with this 
comment in the API docs: "Synchronization via traits is deprecated as it is 
inherently unreliable. Consider java.util.concurrent.ConcurrentHashMap as an 
alternative."

The title of this PR must be updated to match what is actually being done 
after the switch to use ConcurrentHashMap since we don't want the misleading 
"Add SynchronizedMap trait" to persist in the commit history.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14533: [SPARK-16606] [CORE] isleading warning for SparkContext....

2016-08-08 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/14533
  
PR title typo?  Intended "misleading"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14332: [SPARK-16694] [CORE] Use for/foreach rather than map for...

2016-07-25 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/14332
  
The Scala API docs clearly specify that the most idiomatic usage of Option 
is to treat it as a collection or monad and use map, flatMap, filter, or 
foreach.  Those docs also clearly specify that using Option values via pattern 
matching is less idiomatic.  There is nothing fundamentally wrong with `map` 
applying a function that produces results of type `Unit`, nor is such an 
operation some kind of special not-a-transformation that must always be treated 
differently from operations that have been blessed as legitimate 
transformations.  Sure, when you eventually come to the boundaries of purely 
functional code and it is time to deal with side effects, then the distinctness 
of `Unit` comes into play.  But it is a basic FP principle or best practice to 
not treat types as specific instead of abstract until you logically must deal 
with the specifics -- and that is just not the general case with `map` 
functions that produce `Unit`, nor with folding an Option.

Usage of `Unit` consistent with usage of other types in things like `map` 
and `fold` can be confusing to the many Spark contributors who aren't 
completely at home with idiomatic FP, and that is why, e.g., we chose not to 
allow folding of Option within the Spark code.  That's not going to change 
based on any argument in this PR's comments.

But neither should we shy away from any and all usages of `Unit` producing 
functions within `map` -- especially not when that usage is the preferred, 
idiomatic usage of Option.  The issue that SPARK-16694 should be dealing with 
is not a fundamental problem with `map` and `Unit`, but rather with problems 
caused by the not completely sound implementation of `view` in Scala.  There 
are known problems and ugly corner cases associated with `view`, and that's why 
I think Scala programmers would be well advised to avoid it when they can, or 
to use it with great caution when they must.  Alternatively, you can more 
safely use something like Josh Sureth's Viewduction work: 
https://github.com/jsuereth/viewducers  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14332: [SPARK-16694] [CORE] Use for/foreach rather than ...

2016-07-25 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/14332#discussion_r72014373
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala ---
@@ -54,14 +54,13 @@ object DataFrameExample {
   }
 }
 
-parser.parse(args, defaultParams).map { params =>
-  run(params)
-}.getOrElse {
-  sys.exit(1)
+parser.parse(args, defaultParams) match {
+  case Some(params) => run(params)
+  case _ => sys.exit(1)
 }
--- End diff --

It appears that you are reading what you want to see instead of what is 
really there.  All of the examples in the Scala API doc involve side-effects.  
The three examples are essentially equivalent in their intent and effects, and 
using pattern matching with `Option` is clearly called out as the least 
idiomatic.

`fold` isn't in question since it has already been ruled out by other Spark 
committers as beyond the ken of those unfamiliar with idiomatic functional 
programming.  I see no reason, however, to do anything different in Spark's 
code than what the Scala API declares to be "most idiomatic way".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14332: [SPARK-16694] [CORE] Use for/foreach rather than ...

2016-07-24 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/14332#discussion_r7120
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala ---
@@ -54,14 +54,13 @@ object DataFrameExample {
   }
 }
 
-parser.parse(args, defaultParams).map { params =>
-  run(params)
-}.getOrElse {
-  sys.exit(1)
+parser.parse(args, defaultParams) match {
+  case Some(params) => run(params)
+  case _ => sys.exit(1)
 }
--- End diff --

The argument from consistency says to treat `Option` as a collection or 
monad, not as something special, and to treat `Unit` as just another type, not 
as something special.  Treated consistently, the return of the zero-value in a 
fold over a None is not more surprising than folding over an empty List 
producing the zero-value.  To functional programmer aware of the semantics of 
fold, producing zeroes from folding over empty collections is every bit as 
explicit as an if-else or pattern match.

The argument about using `fold` with `Option` in Spark isn't going anywhere 
at this point, but you should look at the Scala API docs, which include the 
comment that "[a] less-idiomatic way to use scala.Option values is via pattern 
matching."  Also see other commentary, such as 
http://blog.originate.com/blog/2014/06/15/idiomatic-scala-your-options-do-not-match/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14332: [SPARK-16694] [CORE] Use for/foreach rather than ...

2016-07-24 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/14332#discussion_r71993558
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala ---
@@ -54,14 +54,13 @@ object DataFrameExample {
   }
 }
 
-parser.parse(args, defaultParams).map { params =>
-  run(params)
-}.getOrElse {
-  sys.exit(1)
+parser.parse(args, defaultParams) match {
+  case Some(params) => run(params)
+  case _ => sys.exit(1)
 }
--- End diff --

Oh, and the "less obvious" argument is why my opinion on `fold` with 
`Option` was rejected -- even though it is a perfectly logical and obvious 
thing to do for a functional programmer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14332: [SPARK-16694] [CORE] Use for/foreach rather than ...

2016-07-24 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/14332#discussion_r71993490
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala ---
@@ -54,14 +54,13 @@ object DataFrameExample {
   }
 }
 
-parser.parse(args, defaultParams).map { params =>
-  run(params)
-}.getOrElse {
-  sys.exit(1)
+parser.parse(args, defaultParams) match {
+  case Some(params) => run(params)
+  case _ => sys.exit(1)
 }
--- End diff --

There is nothing incorrect about generating an expression of type Unit; 
what I am talking about is only pattern matching on Options, not the `view` 
issue; IJ has lots of opinions, not all of them correct.

What theoretical problem are you talking about with `fold` over `Option`?  
The `view` issue does not come into play since the pattern matching with Option 
construct is not even valid if you are dealing with `anOption.view`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14332: [SPARK-16694] [CORE] Use for/foreach rather than ...

2016-07-24 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/14332#discussion_r71993163
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/ml/DataFrameExample.scala ---
@@ -54,14 +54,13 @@ object DataFrameExample {
   }
 }
 
-parser.parse(args, defaultParams).map { params =>
-  run(params)
-}.getOrElse {
-  sys.exit(1)
+parser.parse(args, defaultParams) match {
+  case Some(params) => run(params)
+  case _ => sys.exit(1)
 }
--- End diff --

I wouldn't change these kinds of constructs.  It's almost purely a 
stylistic issue (for which I don't believe we have an officially declared 
preference), and 1) pattern matching over Options is poor form, IMO; 2) the 
pattern matching alternative here doesn't any more unambiguously imply or 
signal use of side effects.

If I had my way, we'd `fold` Options, but that's an argument I lost long 
ago:
```scala
val anOption: Option[T] = ...
anOption.fold { handleNone } { t => ... }
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14330: [SPARK-16693][SPARKR] Remove methods deprecated in 2.0.0...

2016-07-23 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/14330
  
See JIRA comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14039: [SPARK-15896][SQL] Clean up shuffle files just after job...

2016-07-05 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/14039
  
I haven't got anything more concrete to offer at this time than the 
descriptions in the relevant JIRA's, but I do have this running in production 
with 1.6, and it does work.  Essentially, you build a cache in your application 
whose keys are a canonicalization of query fragments and whose values are RDDs 
associated with that fragment of the logical plan, and which produce the 
shuffle files.  For as long as you hold the references to those RDDs in your 
cache, Spark won't remove the shuffle files.  For as long as you have 
sufficient memory available to the OS, those shuffle files will be accessed via 
the OS buffer cache, which is actually pretty quick and doesn't require any of 
Java heap management and garbage collection.  That was the original motivation 
behind using shuffle files in this way and before off-heap caching and unified 
memory management were available.  It's less necessary now (at least once I 
figure out how to do the mapping between logical plan fragments and tables c
 ached off-heap), but it is still a valid alternative caching mechanism.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14039: [SPARK-15896][SQL] Clean up shuffle files just after job...

2016-07-04 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/14039
  
Actually, they can be reused -- not in Spark as distributed, but it is an 
open question whether reusing shuffle files within Spark SQL is something that 
we should be doing and want to support.  It can be an effective alternative 
means of caching.  https://issues.apache.org/jira/browse/SPARK-13756

Until that issue is definitively decided, we should not pre-empt the 
possibility with this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13685: [SPARK-15963][CORE] Catch `TaskKilledException` correctl...

2016-06-23 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/13685
  
LGTM.  Good work!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13677: [SPARK 15926] Improve readability of DAGScheduler stage ...

2016-06-15 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/13677
  
LGTM, and runs without flakiness for me when rebased onto master with the 
https://github.com/apache/spark/pull/13688 HOTFIX.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13688: [HOTFIX] [CORE] fix flaky BasicSchedulerIntegrationTest

2016-06-15 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/13688
  
As a HOTIFX, LGTM; but I agree that there is room for a follow-up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13677: [SPARK 15926] Improve readability of DAGScheduler stage ...

2016-06-14 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/13677
  
> One goal of this change it to make it clearer which functions may create 
new stages (as opposed to looking up stages that already exist).

Something that I have been looking at of late, and I know that @squito has 
looked at some, too.  In short, I'm pretty confident that we doing some 
silliness around creating new stages instead of reusing already existing 
stages, then recognizing that all the task for the "new" stages are already 
completed (at least we're smart enough to reuse the map outputs), so the "new" 
stages just become "skipped".

I'll take a closer look at this tomorrow, and may have a follow-on PR in 
the not too distant future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13646: [SPARK-15927] Eliminate redundant DAGScheduler code.

2016-06-14 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/13646
  
LGTM, but I agree with Imran's renaming suggestion, and his new test looks 
good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13591: [Minor] Replace all occurrences of None: Option[X] with ...

2016-06-10 Thread markhamstra
Github user markhamstra commented on the issue:

https://github.com/apache/spark/pull/13591
  
Yeah, I wouldn't bother.  The way this PR does it is arguably better 
because it can be done consistently across all collections:
```scala
val l0 = List.empty[String]
val s0 = Set.empty[Int]
val o0 = Option.empty[Byte]
```
...but it's pretty trivial, and I don't buy the readability argument.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9876][SQL]: Update Parquet to 1.8.1.

2016-06-01 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/13280
  
Oh, wait... sorry, I just realized that @liancheng said he also merged to 
branch-2.0.  +1 on reverting that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9876][SQL]: Update Parquet to 1.8.1.

2016-06-01 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/13280
  
@rxin Huh?  The merge was to master, not branch-2.0.  Doesn't that put it 
on the 2.1 track and not into 2.0.0?  I think that is all that Yin was saying, 
that @rdblue was mistaken about this change going into 2.0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15176][Core] Add maxShares setting to P...

2016-05-27 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/12951#issuecomment-30643
  
Added my comments to the JIRA.  In short, I think there is a legitimate use 
case for this, and there is a significant gap in our current fair-scheduling 
pool API.  Implementing a maxShare property is actually something that has been 
on my todo list for awhile.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10372] [CORE] basic test framework for ...

2016-05-24 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/8559#issuecomment-221414334
  
@squito Yeah, that's fine.  I haven't gone through the new tests closely to 
make sure that they are doing what they say they are doing, but the changes to 
both non-test code and previous tests look safe. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10372] [CORE] basic test framework for ...

2016-05-23 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/8559#issuecomment-221001489
  
@squito I don't feel really strongly about it, either.  My only concern was 
for others adding tests needing a mock ExternalClusterManager in the future and 
not knowing which one to use, why they are different, whether any new mocking 
needs to be added to both, etc.

If someone does feel strongly about maintaining the separation, then we can 
put things back the way you had them. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10372] [CORE] basic test framework for ...

2016-05-22 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/8559#discussion_r64170026
  
--- Diff: 
core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
 ---
@@ -1 +1,2 @@
-org.apache.spark.scheduler.DummyExternalClusterManager
\ No newline at end of file
+org.apache.spark.scheduler.DummyExternalClusterManager
+org.apache.spark.scheduler.MockExternalClusterManager
--- End diff --

Did you ever look at combining DummyExternalClusterManager and 
MockExternalClusterManager?  They are just two variations on a fake 
ExternalClusterManager for use in tests.  I realize that the focus of the tests 
for Dummy... and Mock... are different, so the two variations may not be easy 
or clean to combine, but if we could have just one fake ExternalClusterManager 
that still had a relatively clean implementation, I think that would be better 
than maintaining two.  OTOH, if combining them gets messy, then just go with 
what you've already got.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   7   >