[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-27 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r141482741
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -512,6 +535,9 @@ private[spark] class TaskSetManager(
   serializedTask)
   }
 } else {
+  if (runningTasks >= maxConcurrentTasks) {
+logDebug("Already running max. no. of concurrent tasks.")
--- End diff --

I'll make the change for this and also update any comments to explain the 
behavior so far. Also I am not clear on the earlier reply as to what was the 
resolution for accounting the activeJobId. Do you still have any inputs ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r141475205
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -512,6 +535,9 @@ private[spark] class TaskSetManager(
   serializedTask)
   }
 } else {
+  if (runningTasks >= maxConcurrentTasks) {
+logDebug("Already running max. no. of concurrent tasks.")
--- End diff --

can you update this msg to include the jobGroup and full config name?

also I think it would be good to have this log at the info level one time 
(not on every `resourceOffer`, just the first time we hit the limit for this 
TaskSet).  I just see users hitting this inadvertently and so it would be good 
to make it clear.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-21 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r140361162
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager(
 // place the executors.
 private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
 
+override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+  jobStart.stageInfos.foreach(stageInfo => 
stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
+
+  var jobGroupId = if (jobStart.properties != null) {
+jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+  } else {
+null
+  }
+
+  val maxConTasks = if (jobGroupId != null &&
+conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
+  } else {
+Int.MaxValue
+  }
+
+  if (maxConTasks <= 0) {
+throw new IllegalArgumentException(
+  "Maximum Concurrent Tasks should be set greater than 0 for the 
job to progress.")
+  }
+
+  if (jobGroupId == null || 
!conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+jobGroupId = DEFAULT_JOB_GROUP
+  }
+
+  jobIdToJobGroup(jobStart.jobId) = jobGroupId
+  if (!jobGroupToMaxConTasks.contains(jobGroupId)) {
--- End diff --

@squito Thanks for elaborating on this. I believe the last para of your 
explanation summarized your viewpoint. In a way that makes some sense because 
if you want to increase your max concurrent tasks, you know what you are doing, 
so if you see weird thing with other threads that you created, its fine. 
However, when you don't control the thread creation, I feel its best to set it 
just once to avoid the weirdness. Its much easier to use a different job group 
than explain one more weird behavior.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-21 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r140340018
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager(
 // place the executors.
 private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
 
+override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+  jobStart.stageInfos.foreach(stageInfo => 
stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
+
+  var jobGroupId = if (jobStart.properties != null) {
+jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+  } else {
+null
+  }
+
+  val maxConTasks = if (jobGroupId != null &&
+conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
+  } else {
+Int.MaxValue
+  }
+
+  if (maxConTasks <= 0) {
+throw new IllegalArgumentException(
+  "Maximum Concurrent Tasks should be set greater than 0 for the 
job to progress.")
+  }
+
+  if (jobGroupId == null || 
!conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+jobGroupId = DEFAULT_JOB_GROUP
+  }
+
+  jobIdToJobGroup(jobStart.jobId) = jobGroupId
+  if (!jobGroupToMaxConTasks.contains(jobGroupId)) {
--- End diff --

Do we even need mutable state associated with a job group? Some things 
would be a lot simpler if maxConTasks could only be set when the job group is 
created; and if you need a different number of maxConTasks, then you have to 
use a different job group.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-21 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r140337893
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager(
 // place the executors.
 private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
 
+override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+  jobStart.stageInfos.foreach(stageInfo => 
stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
+
+  var jobGroupId = if (jobStart.properties != null) {
+jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+  } else {
+null
+  }
+
+  val maxConTasks = if (jobGroupId != null &&
+conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
+  } else {
+Int.MaxValue
+  }
+
+  if (maxConTasks <= 0) {
+throw new IllegalArgumentException(
+  "Maximum Concurrent Tasks should be set greater than 0 for the 
job to progress.")
+  }
+
+  if (jobGroupId == null || 
!conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+jobGroupId = DEFAULT_JOB_GROUP
+  }
+
+  jobIdToJobGroup(jobStart.jobId) = jobGroupId
+  if (!jobGroupToMaxConTasks.contains(jobGroupId)) {
--- End diff --

Sorry I am doing a really bad job explaining my concerns.

Of course, if you have multiple threads racing to set the value for the 
same job group, its unpredictable which value you'll get.  Nothing we can do 
about that, its not the scenario I'm talking about.

Here's the step-by-step (hope I understand the proposed change correctly):

(1) T1: Set jobgroup = "foo".  Set maxConTasks=10.
(2) T1: Launch job 1.1  Uses maxConTasks=10
(3) T2: Set jobgroup = "foo".
(4) T2: Launch job 2.1. Uses maxConTasks=10
(5) T1:  Finish job 1.1.  Do *not* remove the entry for 
`jobGroupToMaxConTasks("foo")`, because there is still another job running for 
this job group in T2 
(https://github.com/apache/spark/pull/19194/files#diff-b096353602813e47074ace09a3890d56R664)
(6) T1: Set maxConTasks=20
(7) T1: Launch job 1.2.  Uses maxConTasks=10, because 
`jobGroupToMaxConTasks.contains("foo")`, so we don't reset the value.
(8) T2: Finish job 2.1.  Again, do not remove 
`jobGroupToMaxConTasks("foo")`, as T1 is still running a job in this job group.
(9) Set maxConTasks=20
(10) T2: Run job 2.2.  Uses maxConTasks=10, because 
`jobGroupToMaxConTasks.contains("foo")`

As long as there a job running in T2 when T1 finishes its job (or vice 
versa), we never remove the prior value, and so never update it when starting a 
new job.  We could be stuck with maxConTasks=10 in both threads indefinitely, 
even though both threads have set maxConTasks=20.

If you remove that `if (!jobGroupToMaxConTasks.contains(jobGroupId))`, then 
when job 1.2 starts, you'd use the new value of maxConTasks=20.  The only weird 
thing is that job 2.1 suddenly switches mid-flight to using maxConTasks=20 as 
well.  But that seems more reasonable to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-21 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r140332294
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager(
 // place the executors.
 private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
 
+override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+  jobStart.stageInfos.foreach(stageInfo => 
stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
+
+  var jobGroupId = if (jobStart.properties != null) {
+jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+  } else {
+null
+  }
+
+  val maxConTasks = if (jobGroupId != null &&
+conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
+  } else {
+Int.MaxValue
+  }
+
+  if (maxConTasks <= 0) {
+throw new IllegalArgumentException(
+  "Maximum Concurrent Tasks should be set greater than 0 for the 
job to progress.")
+  }
+
+  if (jobGroupId == null || 
!conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+jobGroupId = DEFAULT_JOB_GROUP
+  }
+
+  jobIdToJobGroup(jobStart.jobId) = jobGroupId
+  if (!jobGroupToMaxConTasks.contains(jobGroupId)) {
--- End diff --

 If we are talking jobs within the same job groups, it seems like this 
would be very timing dependent as to what number you would get if you start 
allowing it to be changed real time.  Lets say you have 1 thread and set the 
job group.  Now if all the jobs within that group are launched serial then 
everything is easy, allowing it to be changed can make sense. But if from that 
thread you spawn other threads to launch jobs in parallel (which would still be 
in that same job group) and each of those is setting it differently, how do you 
know you will get the right number for each of those jobs?   the 2 threads 
could race to set the conf and if both set it right before launching you are 
going to get one of the settings for both launches whereas one might have 
expected a different setting.

@squito  does this cover the scenario you are referring to?  

while both of those cases might be rare, I would lean towards making sure 
its more predictable and only setting it once rather then having user get 
something they don't expect.  But either could probably be documented away if 
we see the serial type scenario being more beneficial.  

ideally it would be nice to set at the stage level but that is a lot more 
difficult.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-21 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r140321640
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager(
 // place the executors.
 private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
 
+override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+  jobStart.stageInfos.foreach(stageInfo => 
stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
+
+  var jobGroupId = if (jobStart.properties != null) {
+jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+  } else {
+null
+  }
+
+  val maxConTasks = if (jobGroupId != null &&
+conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
+  } else {
+Int.MaxValue
+  }
+
+  if (maxConTasks <= 0) {
+throw new IllegalArgumentException(
+  "Maximum Concurrent Tasks should be set greater than 0 for the 
job to progress.")
+  }
+
+  if (jobGroupId == null || 
!conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+jobGroupId = DEFAULT_JOB_GROUP
+  }
+
+  jobIdToJobGroup(jobStart.jobId) = jobGroupId
+  if (!jobGroupToMaxConTasks.contains(jobGroupId)) {
--- End diff --

Okay. We did consider the job-server style deployments, however I am not 
able to follow the example/scenario you mentioned.

So when you say that different set of users are assigned different groups, 
they have to be served by "different threads", because we can set the job group 
on a per thread basis only. 

While it is possible & valid for different threads to belong to the same 
job group, you cannot have multiple job groups for the same thread active 
simultaneously. 

So in this scenario where you would want to change the maxConcTasks for a 
given jobGroup in one thread, the changes would be visible in the other thread 
as well. Because both of them belong to the same job group. In this case, it 
seems that it wasn't configured correctly. I am still not able to follow how 
setting the value every single time will help here. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-21 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r140296863
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager(
 // place the executors.
 private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
 
+override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+  jobStart.stageInfos.foreach(stageInfo => 
stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
+
+  var jobGroupId = if (jobStart.properties != null) {
+jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+  } else {
+null
+  }
+
+  val maxConTasks = if (jobGroupId != null &&
+conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
+  } else {
+Int.MaxValue
+  }
+
+  if (maxConTasks <= 0) {
+throw new IllegalArgumentException(
+  "Maximum Concurrent Tasks should be set greater than 0 for the 
job to progress.")
+  }
+
+  if (jobGroupId == null || 
!conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+jobGroupId = DEFAULT_JOB_GROUP
+  }
+
+  jobIdToJobGroup(jobStart.jobId) = jobGroupId
+  if (!jobGroupToMaxConTasks.contains(jobGroupId)) {
--- End diff --

You could submit jobs concurrently, from two different threads.  I didn't 
describe it very well -- say both threads are in the same job group.  Each 
thread can only do one job at a time, but maybe between the two threads, there 
is always some active job for the job group the entire time.

There are real use cases which are like this -- in "job-server" style 
deployments, there is a long-running spark context (most likely with cached 
RDDs), accepting requests from multiple users (perhaps sitting behind an http 
server).  Maybe some group of users are always put into one job group (eg., 
there is a low-priority group and a high priority group of users).  You might 
process more than one job for each group at a time, and there is a never ending 
stream of jobs.

(again, its not the most common scenario, but might as well have it behave 
correctly.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-21 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r140293577
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager(
 // place the executors.
 private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
 
+override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+  jobStart.stageInfos.foreach(stageInfo => 
stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
+
+  var jobGroupId = if (jobStart.properties != null) {
+jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+  } else {
+null
+  }
+
+  val maxConTasks = if (jobGroupId != null &&
+conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
+  } else {
+Int.MaxValue
+  }
+
+  if (maxConTasks <= 0) {
+throw new IllegalArgumentException(
+  "Maximum Concurrent Tasks should be set greater than 0 for the 
job to progress.")
+  }
+
+  if (jobGroupId == null || 
!conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+jobGroupId = DEFAULT_JOB_GROUP
+  }
+
+  jobIdToJobGroup(jobStart.jobId) = jobGroupId
+  if (!jobGroupToMaxConTasks.contains(jobGroupId)) {
--- End diff --

I understand your reasoning about setting the maxConTasks every time the 
job is set. However, I am not able to understand the scenario which you 
described. If a job completes and new one kicks off immediately, how does the 
new job partially overlap? Its only when all the stages & underlying tasks for 
the previous job have finished we would mark it as complete. So a new job won't 
overlap with a completed one. Am I missing something here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-20 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r140136101
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager(
 // place the executors.
 private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
 
+override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+  jobStart.stageInfos.foreach(stageInfo => 
stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
+
+  var jobGroupId = if (jobStart.properties != null) {
+jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+  } else {
+null
+  }
+
+  val maxConTasks = if (jobGroupId != null &&
+conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
+  } else {
+Int.MaxValue
+  }
+
+  if (maxConTasks <= 0) {
+throw new IllegalArgumentException(
+  "Maximum Concurrent Tasks should be set greater than 0 for the 
job to progress.")
+  }
+
+  if (jobGroupId == null || 
!conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+jobGroupId = DEFAULT_JOB_GROUP
+  }
+
+  jobIdToJobGroup(jobStart.jobId) = jobGroupId
+  if (!jobGroupToMaxConTasks.contains(jobGroupId)) {
--- End diff --

lemme put my concern another way:

why don't you remove the `if 
(!jobGroupToMaxConTasks.contains(jobGroupId))`, and just unconditionally always 
make the assignment `jobGroupToMaxConTasks(jobGroupId) = maxConTasks`?

that is simpler to reason about, and has all the properties we want.  I 
agree the scenario I'm describing is pretty weird, but the only difference I 
see between your version and this is in that scenario.  And its probably not 
the behavior we want in that scenario.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-20 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r140122744
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -1255,6 +1255,97 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
   }
 
+  test("limit max concurrent running tasks in a job group when configured 
") {
+val conf = new SparkConf().
+  set(config.BLACKLIST_ENABLED, true).
+  set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max 
concurrent tasks to 2
+
+sc = new SparkContext("local", "test", conf)
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+val props = new Properties();
+props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // 
set the job group
+
+val tasks = Array.tabulate[Task[_]](10) { i =>
+  new FakeTask(0, i, Nil)
+}
+val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, 
props), 2)
+
+// make some offers to our taskset
+var taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  // offer each executor twice (simulating 2 cores per executor)
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up 
to maxConcurrentTasks.
+
+// make 4 more offers
+val taskDescs2 = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs2.size === 0) // tsm doesn't accept any as it is 
already running at max tasks
+
+// inform tsm that one task has completed
+val directTaskResult = createTaskResult(0)
+tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult)
+
+// make 4 more offers after previous task completed
+taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 1) // tsm accepts one as it can run one more 
task
+  }
+
+  test("do not limit max concurrent running tasks in a job group by 
default") {
--- End diff --

The previous test covers all the necessary checks introduced by the change. 
I added this to cover the default scenario when no job group is specified. Can 
do away with this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-20 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r140124886
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager(
 // place the executors.
 private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
 
+override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+  jobStart.stageInfos.foreach(stageInfo => 
stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
+
+  var jobGroupId = if (jobStart.properties != null) {
+jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+  } else {
+null
+  }
+
+  val maxConTasks = if (jobGroupId != null &&
+conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
+  } else {
+Int.MaxValue
+  }
+
+  if (maxConTasks <= 0) {
+throw new IllegalArgumentException(
+  "Maximum Concurrent Tasks should be set greater than 0 for the 
job to progress.")
+  }
+
+  if (jobGroupId == null || 
!conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+jobGroupId = DEFAULT_JOB_GROUP
+  }
+
+  jobIdToJobGroup(jobStart.jobId) = jobGroupId
+  if (!jobGroupToMaxConTasks.contains(jobGroupId)) {
--- End diff --

From my understanding and in context of the current code, you would group 
jobs together when you want their concurrency to be limited. If you want 
different concurrency limits for different jobs, you would set them in a 
different jobgroup altogether. 

If there are multiple jobs in the same job group which run concurrently and 
one of them sets a value different, then which one wins for the existing jobs 
and the new job? If we want to have a different value for every job then the 
user would need a way to know and identify a spark job in his application code 
, probably by a job id. Only by means of identifying a job, would the user be 
able to set the config for that job. This cannot be known apriori and I don't 
know if there is an easy way that the user can know about the underlying spark 
job corresponding to the action. Hence we apply a setting at the jobgroup level 
which allows the user to allow him to control the concurrency without knowing 
the underlying job related details specific to spark in an easy manner.

Let me know if anything is unclear here or if you have more questions. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-20 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r140122769
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -1255,6 +1255,97 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
   }
 
+  test("limit max concurrent running tasks in a job group when configured 
") {
+val conf = new SparkConf().
+  set(config.BLACKLIST_ENABLED, true).
+  set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max 
concurrent tasks to 2
+
+sc = new SparkContext("local", "test", conf)
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+val props = new Properties();
+props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // 
set the job group
+
+val tasks = Array.tabulate[Task[_]](10) { i =>
+  new FakeTask(0, i, Nil)
+}
+val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, 
props), 2)
+
+// make some offers to our taskset
+var taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  // offer each executor twice (simulating 2 cores per executor)
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up 
to maxConcurrentTasks.
+
+// make 4 more offers
+val taskDescs2 = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs2.size === 0) // tsm doesn't accept any as it is 
already running at max tasks
+
+// inform tsm that one task has completed
+val directTaskResult = createTaskResult(0)
+tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult)
+
+// make 4 more offers after previous task completed
+taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 1) // tsm accepts one as it can run one more 
task
+  }
+
+  test("do not limit max concurrent running tasks in a job group by 
default") {
+val conf = new SparkConf().
+  set(config.BLACKLIST_ENABLED, true)
+
+sc = new SparkContext("local", "test", conf)
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+
+val tasks = Array.tabulate[Task[_]](10) { i =>
+  new FakeTask(0, i, Nil)
+}
+val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, null), 
2)
+
+// make 5 offers to our taskset
+var taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host2"
+).flatMap { case (exec, host) =>
+  // offer each executor twice (simulating 3 cores per executor)
--- End diff --

okay. Won't be needed as will be removing the test case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-20 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r140123047
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -758,11 +825,52 @@ private[spark] class ExecutorAllocationManager(
   allocationManager.synchronized {
 stageIdToNumSpeculativeTasks(stageId) =
   stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1
+maxConcurrentTasks = getMaxConTasks
+logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on 
spec. task submitted.")
 allocationManager.onSchedulerBacklogged()
   }
 }
 
 /**
+ * Calculate the maximum no. of concurrent tasks that can run 
currently.
+ */
+def getMaxConTasks(): Int = {
+  // We can limit the no. of concurrent tasks by a job group. A job 
group can have multiple jobs
+  // with multiple stages. We need to get all the active stages 
belonging to a job group to
+  // calculate the total no. of pending + running tasks to decide the 
maximum no. of executors
+  // we need at that time to serve the outstanding tasks. This is 
capped by the minimum no. of
+  // outstanding tasks and the max concurrent limit specified for the 
job group if any.
+
+  def getIncompleteTasksForStage(stageId: Int, numTasks: Int): Int = {
+totalPendingTasks(stageId) + totalRunningTasks(stageId)
+  }
+
+  def sumIncompleteTasksForStages: (Int, (Int, Int)) => Int = 
(totalTasks, stageToNumTasks) => {
+val activeTasks = getIncompleteTasksForStage(stageToNumTasks._1, 
stageToNumTasks._2)
+sumOrMax(totalTasks, activeTasks)
+  }
+  // Get the total running & pending tasks for all stages in a job 
group.
+  def getIncompleteTasksForJobGroup(stagesItr: mutable.HashMap[Int, 
Int]): Int = {
+stagesItr.foldLeft(0)(sumIncompleteTasksForStages)
+  }
+
+  def sumIncompleteTasksForJobGroup: (Int, (String, 
mutable.HashMap[Int, Int])) => Int = {
+(maxConTasks, x) => {
+  val totalIncompleteTasksForJobGroup = 
getIncompleteTasksForJobGroup(x._2)
+  val maxTasks = Math.min(jobGroupToMaxConTasks(x._1), 
totalIncompleteTasksForJobGroup)
+  sumOrMax(maxConTasks, maxTasks)
+}
+  }
+
+  def sumOrMax(a: Int, b: Int): Int = if (doesSumOverflow(a, b)) 
Int.MaxValue else (a + b)
+
+  def doesSumOverflow(a: Int, b: Int): Boolean = b > (Int.MaxValue - a)
+
+  val stagesByJobGroup = stageIdToNumTasks.groupBy(x => 
jobIdToJobGroup(stageIdToJobId(x._1)))
--- End diff --

I like the idea. I think this can be done. Will update the PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139819360
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -1255,6 +1255,97 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
   }
 
+  test("limit max concurrent running tasks in a job group when configured 
") {
+val conf = new SparkConf().
+  set(config.BLACKLIST_ENABLED, true).
+  set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max 
concurrent tasks to 2
+
+sc = new SparkContext("local", "test", conf)
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+val props = new Properties();
+props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // 
set the job group
+
+val tasks = Array.tabulate[Task[_]](10) { i =>
+  new FakeTask(0, i, Nil)
+}
+val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, 
props), 2)
+
+// make some offers to our taskset
+var taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  // offer each executor twice (simulating 2 cores per executor)
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up 
to maxConcurrentTasks.
+
+// make 4 more offers
+val taskDescs2 = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs2.size === 0) // tsm doesn't accept any as it is 
already running at max tasks
+
+// inform tsm that one task has completed
+val directTaskResult = createTaskResult(0)
+tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult)
+
+// make 4 more offers after previous task completed
+taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 1) // tsm accepts one as it can run one more 
task
+  }
+
+  test("do not limit max concurrent running tasks in a job group by 
default") {
+val conf = new SparkConf().
+  set(config.BLACKLIST_ENABLED, true)
+
+sc = new SparkContext("local", "test", conf)
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+
+val tasks = Array.tabulate[Task[_]](10) { i =>
+  new FakeTask(0, i, Nil)
+}
+val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, null), 
2)
+
+// make 5 offers to our taskset
+var taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host2"
+).flatMap { case (exec, host) =>
+  // offer each executor twice (simulating 3 cores per executor)
--- End diff --

update comments
5 offers -> 6 offers
twice -> three times


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139819793
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -1255,6 +1255,97 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
   }
 
+  test("limit max concurrent running tasks in a job group when configured 
") {
+val conf = new SparkConf().
+  set(config.BLACKLIST_ENABLED, true).
+  set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max 
concurrent tasks to 2
+
+sc = new SparkContext("local", "test", conf)
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+val props = new Properties();
+props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // 
set the job group
+
+val tasks = Array.tabulate[Task[_]](10) { i =>
+  new FakeTask(0, i, Nil)
+}
+val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, 
props), 2)
+
+// make some offers to our taskset
+var taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  // offer each executor twice (simulating 2 cores per executor)
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up 
to maxConcurrentTasks.
+
+// make 4 more offers
+val taskDescs2 = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs2.size === 0) // tsm doesn't accept any as it is 
already running at max tasks
+
+// inform tsm that one task has completed
+val directTaskResult = createTaskResult(0)
+tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult)
+
+// make 4 more offers after previous task completed
+taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 1) // tsm accepts one as it can run one more 
task
+  }
+
+  test("do not limit max concurrent running tasks in a job group by 
default") {
--- End diff --

I don't think this test really adds anything beyond other tests in this 
suite.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139821899
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager(
 // place the executors.
 private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
 
+override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+  jobStart.stageInfos.foreach(stageInfo => 
stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
+
+  var jobGroupId = if (jobStart.properties != null) {
+jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+  } else {
+null
+  }
+
+  val maxConTasks = if (jobGroupId != null &&
+conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
+  } else {
+Int.MaxValue
+  }
+
+  if (maxConTasks <= 0) {
+throw new IllegalArgumentException(
+  "Maximum Concurrent Tasks should be set greater than 0 for the 
job to progress.")
+  }
+
+  if (jobGroupId == null || 
!conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+jobGroupId = DEFAULT_JOB_GROUP
+  }
+
+  jobIdToJobGroup(jobStart.jobId) = jobGroupId
+  if (!jobGroupToMaxConTasks.contains(jobGroupId)) {
--- End diff --

this is probably a weird / unusual situation, but is this really the 
behavior you want if there are multiple jobs submitted for the same job group?  
Wouldn't you just take the conf for the job group at the time each job was 
submitted?

Worst case with this approach: say you are *always* submitting multiple 
jobs for each job group; when one finishes, you immediately start another one, 
so that the new one partially overlaps the old one.  Then even if you change 
the conf, all jobs will keep using the old value forever.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139818031
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -758,11 +825,52 @@ private[spark] class ExecutorAllocationManager(
   allocationManager.synchronized {
 stageIdToNumSpeculativeTasks(stageId) =
   stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1
+maxConcurrentTasks = getMaxConTasks
+logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on 
spec. task submitted.")
 allocationManager.onSchedulerBacklogged()
   }
 }
 
 /**
+ * Calculate the maximum no. of concurrent tasks that can run 
currently.
+ */
+def getMaxConTasks(): Int = {
+  // We can limit the no. of concurrent tasks by a job group. A job 
group can have multiple jobs
+  // with multiple stages. We need to get all the active stages 
belonging to a job group to
+  // calculate the total no. of pending + running tasks to decide the 
maximum no. of executors
+  // we need at that time to serve the outstanding tasks. This is 
capped by the minimum no. of
+  // outstanding tasks and the max concurrent limit specified for the 
job group if any.
+
+  def getIncompleteTasksForStage(stageId: Int, numTasks: Int): Int = {
+totalPendingTasks(stageId) + totalRunningTasks(stageId)
+  }
+
+  def sumIncompleteTasksForStages: (Int, (Int, Int)) => Int = 
(totalTasks, stageToNumTasks) => {
+val activeTasks = getIncompleteTasksForStage(stageToNumTasks._1, 
stageToNumTasks._2)
+sumOrMax(totalTasks, activeTasks)
+  }
+  // Get the total running & pending tasks for all stages in a job 
group.
+  def getIncompleteTasksForJobGroup(stagesItr: mutable.HashMap[Int, 
Int]): Int = {
+stagesItr.foldLeft(0)(sumIncompleteTasksForStages)
+  }
+
+  def sumIncompleteTasksForJobGroup: (Int, (String, 
mutable.HashMap[Int, Int])) => Int = {
+(maxConTasks, x) => {
+  val totalIncompleteTasksForJobGroup = 
getIncompleteTasksForJobGroup(x._2)
+  val maxTasks = Math.min(jobGroupToMaxConTasks(x._1), 
totalIncompleteTasksForJobGroup)
+  sumOrMax(maxConTasks, maxTasks)
+}
+  }
+
+  def sumOrMax(a: Int, b: Int): Int = if (doesSumOverflow(a, b)) 
Int.MaxValue else (a + b)
+
+  def doesSumOverflow(a: Int, b: Int): Boolean = b > (Int.MaxValue - a)
+
+  val stagesByJobGroup = stageIdToNumTasks.groupBy(x => 
jobIdToJobGroup(stageIdToJobId(x._1)))
--- End diff --

you could just store `stageIdToJobGroupId`.  Simplifies this a bit, and 
then you dont' need to store `jobIdToJobGroup` at all, I think


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-15 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139163830
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -1255,6 +1255,97 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
   }
 
+  test("limit max concurrent running tasks in a job group when configured 
") {
+val conf = new SparkConf().
+  set(config.BLACKLIST_ENABLED, true).
+  set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max 
concurrent tasks to 2
+
+sc = new SparkContext("local", "test", conf)
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+val props = new Properties();
+props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // 
set the job group
+
+val tasks = Array.tabulate[Task[_]](10) { i =>
+  new FakeTask(0, i, Nil)
+}
+val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, 
props), 2)
+
+// make some offers to our taskset
+var taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  // offer each executor twice (simulating 2 cores per executor)
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up 
to maxConcurrentTasks.
+
+// make 4 more offers
+val taskDescs2 = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs2.size === 0) // tsm doesn't accept any as it is 
already running at max tasks
+
+// inform tsm that one task has completed
+val directTaskResult = createTaskResult(0)
+tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult)
+
+// make 4 more offers after previous task completed
+taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 1) // tsm accepts one as it can run one more 
task
+  }
+
+  test("do not limit max concurrent running tasks in a job group by 
default") {
+val conf = new SparkConf().
+  set(config.BLACKLIST_ENABLED, true)
+
+sc = new SparkContext("local", "test", conf)
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+
+val tasks = Array.tabulate[Task[_]](10) { i =>
+  new FakeTask(0, i, Nil)
+}
+val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, null), 
2)
+
+// make 5 offers to our taskset
+var taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host2"
+).flatMap { case (exec, host) =>
+  // offer each executor twice (simulating 2 cores per executor)
--- End diff --

yes. It should be 2.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-15 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139163463
  
--- Diff: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
@@ -210,22 +216,282 @@ class ExecutorAllocationManagerSuite
 assert(numExecutorsToAdd(manager) === 1)
 
 // Verify that running a task doesn't affect the target
-sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, 
createTaskInfo(0, 0, "executor-1")))
+sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, 
createTaskInfo(0, 0, "executor-1")))
 assert(numExecutorsTarget(manager) === 5)
 assert(addExecutors(manager) === 0)
 assert(numExecutorsToAdd(manager) === 1)
 
 // Verify that running a speculative task doesn't affect the target
-sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, 
createTaskInfo(0, 0, "executor-2", true)))
+sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, 
createTaskInfo(0, 0, "executor-2", true)))
 assert(numExecutorsTarget(manager) === 5)
 assert(addExecutors(manager) === 0)
 assert(numExecutorsToAdd(manager) === 1)
   }
 
+  test("add executors capped by max concurrent tasks for a job group with 
single core executors") {
+val conf = new SparkConf()
+  .setMaster("myDummyLocalExternalClusterManager")
+  .setAppName("test-executor-allocation-manager")
+  .set("spark.dynamicAllocation.enabled", "true")
+  .set("spark.dynamicAllocation.testing", "true")
+  .set("spark.job.group1.maxConcurrentTasks", "2")
+  .set("spark.job.group2.maxConcurrentTasks", "5")
+val sc = new SparkContext(conf)
+contexts += sc
+sc.setJobGroup("group1", "", false)
+
+val manager = sc.executorAllocationManager.get
+val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10))
+// Submit the job and stage start/submit events
+sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0)))
+
+// Verify that we're capped at number of max concurrent tasks in the 
stage
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+// Submit another stage in the same job
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1)))
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0)))
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1)))
+sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded))
+
+// Submit a new job in the same job group
+val stage2 = createStageInfo(2, 20)
+sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2), 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2))
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2))
+sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded))
+
+// Set another jobGroup
+sc.setJobGroup("group2", "", false)
+
+val stage3 = createStageInfo(3, 20)
+sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3), 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3))
+assert(maxNumExecutorsNeeded(manager) === 5)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3))
+sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded))
+
+// Clear jobGroup
+sc.clearJobGroup()
+
+val stage4 = createStageInfo(4, 50)
+sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage4), 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4))
+assert(maxNumExecutorsNeeded(manager) === 50)
+  }
+
+  test("add executors capped by max concurrent tasks for a job group with 
multi cores executors") {
+val conf = new SparkConf()
+  .setMaster("myDummyLocalExternalClusterManager")
+  .setAppName("test-executor-allocation-manager")
+  .set("spark.dynamicAllocation.enabled", "true")
+  .set("spark.dynamicAllocation.testing", "true")
+  .set("spark.job.group1.maxConcurrentTasks", "2")
+  .set("spark.job.group2.maxConcurrentTasks", "5")
+  .set("spark.executor.cores", "3")
+val sc = new SparkContext(conf)
+contexts += sc
+sc.setJobGroup("group1", "", false)
+
+val manager = sc.executorAllocationManager.get
+val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10))
+// Submit the job and stage start/submit events
+sc.listenerBus.postToAll(SparkListenerJobStart(0, 0

[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-15 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139163529
  
--- Diff: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
@@ -210,22 +216,282 @@ class ExecutorAllocationManagerSuite
 assert(numExecutorsToAdd(manager) === 1)
 
 // Verify that running a task doesn't affect the target
-sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, 
createTaskInfo(0, 0, "executor-1")))
+sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, 
createTaskInfo(0, 0, "executor-1")))
 assert(numExecutorsTarget(manager) === 5)
 assert(addExecutors(manager) === 0)
 assert(numExecutorsToAdd(manager) === 1)
 
 // Verify that running a speculative task doesn't affect the target
-sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, 
createTaskInfo(0, 0, "executor-2", true)))
+sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, 
createTaskInfo(0, 0, "executor-2", true)))
 assert(numExecutorsTarget(manager) === 5)
 assert(addExecutors(manager) === 0)
 assert(numExecutorsToAdd(manager) === 1)
   }
 
+  test("add executors capped by max concurrent tasks for a job group with 
single core executors") {
+val conf = new SparkConf()
+  .setMaster("myDummyLocalExternalClusterManager")
+  .setAppName("test-executor-allocation-manager")
+  .set("spark.dynamicAllocation.enabled", "true")
+  .set("spark.dynamicAllocation.testing", "true")
+  .set("spark.job.group1.maxConcurrentTasks", "2")
+  .set("spark.job.group2.maxConcurrentTasks", "5")
+val sc = new SparkContext(conf)
+contexts += sc
+sc.setJobGroup("group1", "", false)
+
+val manager = sc.executorAllocationManager.get
+val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10))
+// Submit the job and stage start/submit events
+sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0)))
+
+// Verify that we're capped at number of max concurrent tasks in the 
stage
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+// Submit another stage in the same job
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1)))
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0)))
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1)))
+sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded))
+
+// Submit a new job in the same job group
+val stage2 = createStageInfo(2, 20)
+sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2), 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2))
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2))
+sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded))
+
+// Set another jobGroup
+sc.setJobGroup("group2", "", false)
+
+val stage3 = createStageInfo(3, 20)
+sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3), 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3))
+assert(maxNumExecutorsNeeded(manager) === 5)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3))
+sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded))
+
+// Clear jobGroup
+sc.clearJobGroup()
+
+val stage4 = createStageInfo(4, 50)
+sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage4), 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4))
+assert(maxNumExecutorsNeeded(manager) === 50)
+  }
+
+  test("add executors capped by max concurrent tasks for a job group with 
multi cores executors") {
+val conf = new SparkConf()
+  .setMaster("myDummyLocalExternalClusterManager")
+  .setAppName("test-executor-allocation-manager")
+  .set("spark.dynamicAllocation.enabled", "true")
+  .set("spark.dynamicAllocation.testing", "true")
+  .set("spark.job.group1.maxConcurrentTasks", "2")
+  .set("spark.job.group2.maxConcurrentTasks", "5")
+  .set("spark.executor.cores", "3")
+val sc = new SparkContext(conf)
+contexts += sc
+sc.setJobGroup("group1", "", false)
+
+val manager = sc.executorAllocationManager.get
+val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10))
+// Submit the job and stage start/submit events
+sc.listenerBus.postToAll(SparkListenerJobStart(0, 0

[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-15 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139163109
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -758,11 +812,58 @@ private[spark] class ExecutorAllocationManager(
   allocationManager.synchronized {
 stageIdToNumSpeculativeTasks(stageId) =
   stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1
+maxConcurrentTasks = getMaxConTasks
+logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on 
spec. task submitted.")
 allocationManager.onSchedulerBacklogged()
   }
 }
 
 /**
+ * Calculate the maximum no. of concurrent tasks that can run 
currently.
+ */
+def getMaxConTasks(): Int = {
+  // We can limit the no. of concurrent tasks by a job group. A job 
group can have multiple jobs
+  // with multiple stages. We need to get all the active stages 
belonging to a job group to
+  // calculate the total no. of pending + running tasks to decide the 
maximum no. of executors
+  // we need at that time to serve the outstanding tasks. This is 
capped by the minimum of no.
+  // of outstanding tasks and the max concurrent limit specified for 
the job group if any.
+
+  def getIncompleteTasksForStage(stageId: Int, numTasks: Int): Int = {
+var runningTasks = 0
+if (stageIdToTaskIndices.contains(stageId)) {
+  runningTasks =
+stageIdToTaskIndices(stageId).size - 
stageIdToCompleteTaskCount.getOrElse(stageId, 0)
--- End diff --

Yes. Nice catch. We do need to account for all the tasks for a stage. And 
this should include speculative ones as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-15 Thread dhruve
Github user dhruve commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139162871
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -758,11 +812,58 @@ private[spark] class ExecutorAllocationManager(
   allocationManager.synchronized {
 stageIdToNumSpeculativeTasks(stageId) =
   stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1
+maxConcurrentTasks = getMaxConTasks
+logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on 
spec. task submitted.")
 allocationManager.onSchedulerBacklogged()
   }
 }
 
 /**
+ * Calculate the maximum no. of concurrent tasks that can run 
currently.
+ */
+def getMaxConTasks(): Int = {
+  // We can limit the no. of concurrent tasks by a job group. A job 
group can have multiple jobs
+  // with multiple stages. We need to get all the active stages 
belonging to a job group to
+  // calculate the total no. of pending + running tasks to decide the 
maximum no. of executors
+  // we need at that time to serve the outstanding tasks. This is 
capped by the minimum of no.
--- End diff --

okay.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-15 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139157280
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -1255,6 +1255,97 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
   }
 
+  test("limit max concurrent running tasks in a job group when configured 
") {
+val conf = new SparkConf().
+  set(config.BLACKLIST_ENABLED, true).
+  set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max 
concurrent tasks to 2
+
+sc = new SparkContext("local", "test", conf)
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+val props = new Properties();
+props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // 
set the job group
+
+val tasks = Array.tabulate[Task[_]](10) { i =>
+  new FakeTask(0, i, Nil)
+}
+val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, 
props), 2)
+
+// make some offers to our taskset
+var taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  // offer each executor twice (simulating 2 cores per executor)
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up 
to maxConcurrentTasks.
+
+// make 4 more offers
+val taskDescs2 = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs2.size === 0) // tsm doesn't accept any as it is 
already running at max tasks
+
+// inform tsm that one task has completed
+val directTaskResult = createTaskResult(0)
+tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult)
+
+// make 4 more offers after previous task completed
+taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 1) // tsm accepts one as it can run one more 
task
+  }
+
+  test("do not limit max concurrent running tasks in a job group by 
default") {
+val conf = new SparkConf().
+  set(config.BLACKLIST_ENABLED, true)
+
+sc = new SparkContext("local", "test", conf)
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+
+val tasks = Array.tabulate[Task[_]](10) { i =>
+  new FakeTask(0, i, Nil)
+}
+val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, null), 
2)
+
+// make 5 offers to our taskset
+var taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host2"
+).flatMap { case (exec, host) =>
+  // offer each executor twice (simulating 2 cores per executor)
--- End diff --

I think you are doing 3 core per executor here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-15 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139156205
  
--- Diff: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
@@ -210,22 +216,282 @@ class ExecutorAllocationManagerSuite
 assert(numExecutorsToAdd(manager) === 1)
 
 // Verify that running a task doesn't affect the target
-sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, 
createTaskInfo(0, 0, "executor-1")))
+sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, 
createTaskInfo(0, 0, "executor-1")))
 assert(numExecutorsTarget(manager) === 5)
 assert(addExecutors(manager) === 0)
 assert(numExecutorsToAdd(manager) === 1)
 
 // Verify that running a speculative task doesn't affect the target
-sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, 
createTaskInfo(0, 0, "executor-2", true)))
+sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, 
createTaskInfo(0, 0, "executor-2", true)))
 assert(numExecutorsTarget(manager) === 5)
 assert(addExecutors(manager) === 0)
 assert(numExecutorsToAdd(manager) === 1)
   }
 
+  test("add executors capped by max concurrent tasks for a job group with 
single core executors") {
+val conf = new SparkConf()
+  .setMaster("myDummyLocalExternalClusterManager")
+  .setAppName("test-executor-allocation-manager")
+  .set("spark.dynamicAllocation.enabled", "true")
+  .set("spark.dynamicAllocation.testing", "true")
+  .set("spark.job.group1.maxConcurrentTasks", "2")
+  .set("spark.job.group2.maxConcurrentTasks", "5")
+val sc = new SparkContext(conf)
+contexts += sc
+sc.setJobGroup("group1", "", false)
+
+val manager = sc.executorAllocationManager.get
+val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10))
+// Submit the job and stage start/submit events
+sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0)))
+
+// Verify that we're capped at number of max concurrent tasks in the 
stage
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+// Submit another stage in the same job
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1)))
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0)))
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1)))
+sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded))
+
+// Submit a new job in the same job group
+val stage2 = createStageInfo(2, 20)
+sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2), 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2))
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2))
+sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded))
+
+// Set another jobGroup
+sc.setJobGroup("group2", "", false)
+
+val stage3 = createStageInfo(3, 20)
+sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3), 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3))
+assert(maxNumExecutorsNeeded(manager) === 5)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3))
+sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded))
+
+// Clear jobGroup
+sc.clearJobGroup()
+
+val stage4 = createStageInfo(4, 50)
+sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage4), 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4))
+assert(maxNumExecutorsNeeded(manager) === 50)
+  }
+
+  test("add executors capped by max concurrent tasks for a job group with 
multi cores executors") {
+val conf = new SparkConf()
+  .setMaster("myDummyLocalExternalClusterManager")
+  .setAppName("test-executor-allocation-manager")
+  .set("spark.dynamicAllocation.enabled", "true")
+  .set("spark.dynamicAllocation.testing", "true")
+  .set("spark.job.group1.maxConcurrentTasks", "2")
+  .set("spark.job.group2.maxConcurrentTasks", "5")
+  .set("spark.executor.cores", "3")
+val sc = new SparkContext(conf)
+contexts += sc
+sc.setJobGroup("group1", "", false)
+
+val manager = sc.executorAllocationManager.get
+val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10))
+// Submit the job and stage start/submit events
+sc.listenerBus.postToAll(SparkListenerJobStart(0

[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-15 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139156014
  
--- Diff: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
@@ -210,22 +216,282 @@ class ExecutorAllocationManagerSuite
 assert(numExecutorsToAdd(manager) === 1)
 
 // Verify that running a task doesn't affect the target
-sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, 
createTaskInfo(0, 0, "executor-1")))
+sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, 
createTaskInfo(0, 0, "executor-1")))
 assert(numExecutorsTarget(manager) === 5)
 assert(addExecutors(manager) === 0)
 assert(numExecutorsToAdd(manager) === 1)
 
 // Verify that running a speculative task doesn't affect the target
-sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, 
createTaskInfo(0, 0, "executor-2", true)))
+sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, 
createTaskInfo(0, 0, "executor-2", true)))
 assert(numExecutorsTarget(manager) === 5)
 assert(addExecutors(manager) === 0)
 assert(numExecutorsToAdd(manager) === 1)
   }
 
+  test("add executors capped by max concurrent tasks for a job group with 
single core executors") {
+val conf = new SparkConf()
+  .setMaster("myDummyLocalExternalClusterManager")
+  .setAppName("test-executor-allocation-manager")
+  .set("spark.dynamicAllocation.enabled", "true")
+  .set("spark.dynamicAllocation.testing", "true")
+  .set("spark.job.group1.maxConcurrentTasks", "2")
+  .set("spark.job.group2.maxConcurrentTasks", "5")
+val sc = new SparkContext(conf)
+contexts += sc
+sc.setJobGroup("group1", "", false)
+
+val manager = sc.executorAllocationManager.get
+val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10))
+// Submit the job and stage start/submit events
+sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0)))
+
+// Verify that we're capped at number of max concurrent tasks in the 
stage
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+// Submit another stage in the same job
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1)))
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0)))
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1)))
+sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded))
+
+// Submit a new job in the same job group
+val stage2 = createStageInfo(2, 20)
+sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2), 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2))
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2))
+sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded))
+
+// Set another jobGroup
+sc.setJobGroup("group2", "", false)
+
+val stage3 = createStageInfo(3, 20)
+sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3), 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3))
+assert(maxNumExecutorsNeeded(manager) === 5)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3))
+sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded))
+
+// Clear jobGroup
+sc.clearJobGroup()
+
+val stage4 = createStageInfo(4, 50)
+sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage4), 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4))
+assert(maxNumExecutorsNeeded(manager) === 50)
+  }
+
+  test("add executors capped by max concurrent tasks for a job group with 
multi cores executors") {
+val conf = new SparkConf()
+  .setMaster("myDummyLocalExternalClusterManager")
+  .setAppName("test-executor-allocation-manager")
+  .set("spark.dynamicAllocation.enabled", "true")
+  .set("spark.dynamicAllocation.testing", "true")
+  .set("spark.job.group1.maxConcurrentTasks", "2")
+  .set("spark.job.group2.maxConcurrentTasks", "5")
+  .set("spark.executor.cores", "3")
+val sc = new SparkContext(conf)
+contexts += sc
+sc.setJobGroup("group1", "", false)
+
+val manager = sc.executorAllocationManager.get
+val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10))
+// Submit the job and stage start/submit events
+sc.listenerBus.postToAll(SparkListenerJobStart(0

[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-15 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139155530
  
--- Diff: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
@@ -210,22 +216,282 @@ class ExecutorAllocationManagerSuite
 assert(numExecutorsToAdd(manager) === 1)
 
 // Verify that running a task doesn't affect the target
-sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, 
createTaskInfo(0, 0, "executor-1")))
+sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, 
createTaskInfo(0, 0, "executor-1")))
 assert(numExecutorsTarget(manager) === 5)
 assert(addExecutors(manager) === 0)
 assert(numExecutorsToAdd(manager) === 1)
 
 // Verify that running a speculative task doesn't affect the target
-sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, 
createTaskInfo(0, 0, "executor-2", true)))
+sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, 
createTaskInfo(0, 0, "executor-2", true)))
 assert(numExecutorsTarget(manager) === 5)
 assert(addExecutors(manager) === 0)
 assert(numExecutorsToAdd(manager) === 1)
   }
 
+  test("add executors capped by max concurrent tasks for a job group with 
single core executors") {
+val conf = new SparkConf()
+  .setMaster("myDummyLocalExternalClusterManager")
+  .setAppName("test-executor-allocation-manager")
+  .set("spark.dynamicAllocation.enabled", "true")
+  .set("spark.dynamicAllocation.testing", "true")
+  .set("spark.job.group1.maxConcurrentTasks", "2")
+  .set("spark.job.group2.maxConcurrentTasks", "5")
+val sc = new SparkContext(conf)
+contexts += sc
+sc.setJobGroup("group1", "", false)
+
+val manager = sc.executorAllocationManager.get
+val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10))
+// Submit the job and stage start/submit events
+sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0)))
+
+// Verify that we're capped at number of max concurrent tasks in the 
stage
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+// Submit another stage in the same job
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1)))
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0)))
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1)))
+sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded))
+
+// Submit a new job in the same job group
+val stage2 = createStageInfo(2, 20)
+sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2), 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2))
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2))
+sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded))
+
+// Set another jobGroup
+sc.setJobGroup("group2", "", false)
+
+val stage3 = createStageInfo(3, 20)
+sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3), 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3))
+assert(maxNumExecutorsNeeded(manager) === 5)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3))
+sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded))
+
+// Clear jobGroup
+sc.clearJobGroup()
+
+val stage4 = createStageInfo(4, 50)
+sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage4), 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4))
+assert(maxNumExecutorsNeeded(manager) === 50)
+  }
+
+  test("add executors capped by max concurrent tasks for a job group with 
multi cores executors") {
+val conf = new SparkConf()
+  .setMaster("myDummyLocalExternalClusterManager")
+  .setAppName("test-executor-allocation-manager")
+  .set("spark.dynamicAllocation.enabled", "true")
+  .set("spark.dynamicAllocation.testing", "true")
+  .set("spark.job.group1.maxConcurrentTasks", "2")
+  .set("spark.job.group2.maxConcurrentTasks", "5")
+  .set("spark.executor.cores", "3")
+val sc = new SparkContext(conf)
+contexts += sc
+sc.setJobGroup("group1", "", false)
+
+val manager = sc.executorAllocationManager.get
+val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10))
+// Submit the job and stage start/submit events
+sc.listenerBus.postToAll(SparkListenerJobStart(0

[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-15 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139149660
  
--- Diff: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
@@ -210,22 +216,282 @@ class ExecutorAllocationManagerSuite
 assert(numExecutorsToAdd(manager) === 1)
 
 // Verify that running a task doesn't affect the target
-sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, 
createTaskInfo(0, 0, "executor-1")))
+sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, 
createTaskInfo(0, 0, "executor-1")))
 assert(numExecutorsTarget(manager) === 5)
 assert(addExecutors(manager) === 0)
 assert(numExecutorsToAdd(manager) === 1)
 
 // Verify that running a speculative task doesn't affect the target
-sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, 
createTaskInfo(0, 0, "executor-2", true)))
+sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, 
createTaskInfo(0, 0, "executor-2", true)))
 assert(numExecutorsTarget(manager) === 5)
 assert(addExecutors(manager) === 0)
 assert(numExecutorsToAdd(manager) === 1)
   }
 
+  test("add executors capped by max concurrent tasks for a job group with 
single core executors") {
+val conf = new SparkConf()
+  .setMaster("myDummyLocalExternalClusterManager")
+  .setAppName("test-executor-allocation-manager")
+  .set("spark.dynamicAllocation.enabled", "true")
+  .set("spark.dynamicAllocation.testing", "true")
+  .set("spark.job.group1.maxConcurrentTasks", "2")
+  .set("spark.job.group2.maxConcurrentTasks", "5")
+val sc = new SparkContext(conf)
+contexts += sc
+sc.setJobGroup("group1", "", false)
+
+val manager = sc.executorAllocationManager.get
+val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10))
+// Submit the job and stage start/submit events
+sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0)))
+
+// Verify that we're capped at number of max concurrent tasks in the 
stage
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+// Submit another stage in the same job
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1)))
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0)))
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1)))
+sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded))
+
+// Submit a new job in the same job group
+val stage2 = createStageInfo(2, 20)
+sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2), 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2))
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2))
+sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded))
+
+// Set another jobGroup
+sc.setJobGroup("group2", "", false)
+
+val stage3 = createStageInfo(3, 20)
+sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3), 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3))
+assert(maxNumExecutorsNeeded(manager) === 5)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3))
+sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded))
+
+// Clear jobGroup
+sc.clearJobGroup()
+
+val stage4 = createStageInfo(4, 50)
+sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage4), 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4))
+assert(maxNumExecutorsNeeded(manager) === 50)
+  }
+
+  test("add executors capped by max concurrent tasks for a job group with 
multi cores executors") {
+val conf = new SparkConf()
+  .setMaster("myDummyLocalExternalClusterManager")
+  .setAppName("test-executor-allocation-manager")
+  .set("spark.dynamicAllocation.enabled", "true")
+  .set("spark.dynamicAllocation.testing", "true")
+  .set("spark.job.group1.maxConcurrentTasks", "2")
+  .set("spark.job.group2.maxConcurrentTasks", "5")
+  .set("spark.executor.cores", "3")
+val sc = new SparkContext(conf)
+contexts += sc
+sc.setJobGroup("group1", "", false)
+
+val manager = sc.executorAllocationManager.get
+val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10))
+// Submit the job and stage start/submit events
+sc.listenerBus.postToAll(SparkListenerJobStart(0

[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-14 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r138993979
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -758,11 +812,58 @@ private[spark] class ExecutorAllocationManager(
   allocationManager.synchronized {
 stageIdToNumSpeculativeTasks(stageId) =
   stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1
+maxConcurrentTasks = getMaxConTasks
+logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on 
spec. task submitted.")
 allocationManager.onSchedulerBacklogged()
   }
 }
 
 /**
+ * Calculate the maximum no. of concurrent tasks that can run 
currently.
+ */
+def getMaxConTasks(): Int = {
+  // We can limit the no. of concurrent tasks by a job group. A job 
group can have multiple jobs
+  // with multiple stages. We need to get all the active stages 
belonging to a job group to
+  // calculate the total no. of pending + running tasks to decide the 
maximum no. of executors
+  // we need at that time to serve the outstanding tasks. This is 
capped by the minimum of no.
+  // of outstanding tasks and the max concurrent limit specified for 
the job group if any.
+
+  def getIncompleteTasksForStage(stageId: Int, numTasks: Int): Int = {
+var runningTasks = 0
+if (stageIdToTaskIndices.contains(stageId)) {
+  runningTasks =
+stageIdToTaskIndices(stageId).size - 
stageIdToCompleteTaskCount.getOrElse(stageId, 0)
+}
+
+totalPendingTasks(stageId) + runningTasks
+  }
+
+  def sumIncompleteTasksForStages: (Int, (Int, Int)) => Int = (a, b) 
=> {
--- End diff --

it would be nice to name variables something more useful then a and b.  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-14 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r138987948
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -758,11 +812,58 @@ private[spark] class ExecutorAllocationManager(
   allocationManager.synchronized {
 stageIdToNumSpeculativeTasks(stageId) =
   stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1
+maxConcurrentTasks = getMaxConTasks
+logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on 
spec. task submitted.")
 allocationManager.onSchedulerBacklogged()
   }
 }
 
 /**
+ * Calculate the maximum no. of concurrent tasks that can run 
currently.
+ */
+def getMaxConTasks(): Int = {
+  // We can limit the no. of concurrent tasks by a job group. A job 
group can have multiple jobs
+  // with multiple stages. We need to get all the active stages 
belonging to a job group to
+  // calculate the total no. of pending + running tasks to decide the 
maximum no. of executors
+  // we need at that time to serve the outstanding tasks. This is 
capped by the minimum of no.
+  // of outstanding tasks and the max concurrent limit specified for 
the job group if any.
+
+  def getIncompleteTasksForStage(stageId: Int, numTasks: Int): Int = {
+var runningTasks = 0
+if (stageIdToTaskIndices.contains(stageId)) {
+  runningTasks =
+stageIdToTaskIndices(stageId).size - 
stageIdToCompleteTaskCount.getOrElse(stageId, 0)
--- End diff --

missing speculative tasks here in running.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-14 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r138980299
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -758,11 +812,58 @@ private[spark] class ExecutorAllocationManager(
   allocationManager.synchronized {
 stageIdToNumSpeculativeTasks(stageId) =
   stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1
+maxConcurrentTasks = getMaxConTasks
+logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on 
spec. task submitted.")
 allocationManager.onSchedulerBacklogged()
   }
 }
 
 /**
+ * Calculate the maximum no. of concurrent tasks that can run 
currently.
+ */
+def getMaxConTasks(): Int = {
+  // We can limit the no. of concurrent tasks by a job group. A job 
group can have multiple jobs
+  // with multiple stages. We need to get all the active stages 
belonging to a job group to
+  // calculate the total no. of pending + running tasks to decide the 
maximum no. of executors
+  // we need at that time to serve the outstanding tasks. This is 
capped by the minimum of no.
--- End diff --

remove of in capped by the minimum of no.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-11 Thread dhruve
GitHub user dhruve opened a pull request:

https://github.com/apache/spark/pull/19194

[SPARK-20589] Allow limiting task concurrency per stage

## What changes were proposed in this pull request?
This change allows the user to specify the maximum no. of tasks running in 
a given job group. (Kindly see the jira comments section for more context on 
why this is implemented at a job group level rather than a stage level). This 
change is beneficial where the user wants to avoid having a DoS while trying to 
access an eternal service from multiple executors without having the need to 
repartition or coalesce existing RDDs.

This code change introduces a new user level configuration: 
`spark.job.[userJobGroup].maxConcurrentTasks` which is used to set the active 
no. of tasks executing at a given point in time.

The user can use the feature by setting the appropriate jobGroup and 
passing the conf:

```
conf.set("spark.job.group1.maxConcurrentTasks", "10")
...
sc.setJobGroup("group1", "", false)
sc.parallelize(1 to 10, 10).map(x => x + 1).count
sc.clearJobGroup
```

 changes proposed in this fix 
This change limits the no. of tasks (in turn also the no. of executors to 
be acquired) than can run simultaneously in a given job group and its 
subsequent job/s and stage/s if the appropriate job group and max concurrency 
configs are set.

## How was this patch tested?
Ran unit tests and multiple manual tests with various combinations of:
- single/multiple/no job groups
- executors with single/multi cores
- dynamic allocation on/off


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dhruve/spark impr/SPARK-20589

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19194.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19194


commit 4281151df9010b4e9fe91e588c07e872b8e0dd69
Author: Dhruve Ashar 
Date:   2017-09-11T16:45:49Z

[SPARK-20589] Allow limiting task concurrency per stage




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org