[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-07-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/900


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-07-14 Thread tgravescs
Github user tgravescs commented on the pull request:

https://github.com/apache/spark/pull/900#issuecomment-48955561
  
Looks good.  Thanks @li-zhihui 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-07-13 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/900#issuecomment-48869936
  
@tgravescs add a commit according to comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-07-11 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14825405
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -244,6 +257,17 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 throw new SparkException("Error notifying standalone scheduler's 
driver actor", e)
 }
   }
+
+  override def isReady(): Boolean = {
+if (ready) {
+  return true
+}
+if ((System.currentTimeMillis() - createTime) >= 
maxRegisteredWaitingTime) {
--- End diff --

it might be nice to have a log statement here saying max time hit so we 
know when the scheduling began if debugging a job.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-07-11 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14823260
  
--- Diff: docs/configuration.md ---
@@ -699,6 +699,22 @@ Apart from these, the following properties are also 
available, and may be useful
 (in milliseconds)
   
 
+
+  spark.scheduler.minRegisteredExecutorsRatio
+  0
+  
+Submit tasks only after (registered executors / total expected 
executors)
+is equal to at least this value, which is double between 0 and 1.
+  
+
+
+  spark.scheduler.maxRegisteredExecutorsWaitingTime
+  3
+  
+Whatever (registered executors / total expected executors) is reached 
--- End diff --

I think we should clarify both of these a bit because its really you start 
when either one is hit so I think adding reference to 
maxRegisteredExecutorsWaitingTime from the description of 
minRegisteredExecutorsRatio would be good.   

How about something like below?  Note I'm not a doc writer so I'm fine with 
changing.

for spark.scheduler.minRegisteredExecutorsRatio:
The minimum ratio of registered executors (registered executors / total 
expected executors) to wait for before scheduling begins. Specified as a double 
between 0 and 1. Regardless of whether the minimum ratio of executors has been 
reached, the maximum amount of time it will wait before scheduling begins is 
controlled by config 
spark.scheduler.maxRegisteredExecutorsWaitingTime . 

Then for spark.scheduler.maxRegisteredExecutorsWaitingTime:
Maximum amount of time to wait for executors to register before scheduling 
begins (in milliseconds). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-07-11 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/900#issuecomment-48714143
  
Thanks @tgravescs 
I will file a new jira for handling mesos and follow it after the PR merged.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-07-11 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14813843
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -46,9 +46,19 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 {
   // Use an atomic variable to track total number of cores in the cluster 
for simplicity and speed
   var totalCoreCount = new AtomicInteger(0)
+  var totalExpectedExecutors = new AtomicInteger(0)
   val conf = scheduler.sc.conf
   private val timeout = AkkaUtils.askTimeout(conf)
   private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+  // Submit tasks only after (registered executors / total expected 
executors) 
+  // is equal to at least this value, that is double between 0 and 1.
+  var minRegisteredRatio = 
conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
--- End diff --

@tgravescs Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-07-10 Thread tgravescs
Github user tgravescs commented on the pull request:

https://github.com/apache/spark/pull/900#issuecomment-48604243
  
postStartHook seems like a good place to put it.  It will only be called 
once there and as you said Yarn cluster mode actually waits for the 
sparkcontext to be initialized before allocating executors. 

It looks like we aren't handling mesos.  We should atleast file a jira for 
this.
@li-zhihui did you look at mesos at all?

For the yarn side where you added the TODO's about the sleep. I think we 
can leave them here as there is another jira to remove them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-07-10 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14764078
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -46,9 +46,19 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 {
   // Use an atomic variable to track total number of cores in the cluster 
for simplicity and speed
   var totalCoreCount = new AtomicInteger(0)
+  var totalExpectedExecutors = new AtomicInteger(0)
   val conf = scheduler.sc.conf
   private val timeout = AkkaUtils.askTimeout(conf)
   private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+  // Submit tasks only after (registered executors / total expected 
executors) 
+  // is equal to at least this value, that is double between 0 and 1.
+  var minRegisteredRatio = 
conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
--- End diff --

Please add the new configs to the user docs - see docs/configuration.md


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-29 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/900#issuecomment-47490304
  
@tgravescs @kayousterhout 
It will lead to a logic deadlock in yarn-cluster mode, if waitBackendReady 
is in TaskSchedulerImpl.start.

How about move it (waitBackendReady) to postStartHook() ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-27 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/900#issuecomment-47331430
  
@tgravescs @kayousterhout 
I move waitBackendReady back to submitTasks method, because it 
(waitBackendReady in start method) dose not work on yarn-cluster mode 
(NullPointException because SparkContext initialize timeout) (yarn-client is 
ok).



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-27 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/900#issuecomment-47330805
  
@tgravescs @kayousterhout 
I add a new commit according to comments.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-26 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14280510
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.IntParam
+
+private[spark] class YarnClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+
+  override def start() {
+super.start()
+var numExecutors = 2
+if (sc.getConf.contains("spark.executor.instances")) {
+  numExecutors = sc.getConf.getInt("spark.executor.instances", 2)
--- End diff --

Cool !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-26 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14280473
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.IntParam
+
+private[spark] class YarnClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+
+  override def start() {
+super.start()
+var numExecutors = 2
+if (sc.getConf.contains("spark.executor.instances")) {
+  numExecutors = sc.getConf.getInt("spark.executor.instances", 2)
--- End diff --

(so you don't override it if spark.executor.instances is not already set)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-26 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14280463
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.IntParam
+
+private[spark] class YarnClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+
+  override def start() {
+super.start()
+var numExecutors = 2
+if (sc.getConf.contains("spark.executor.instances")) {
+  numExecutors = sc.getConf.getInt("spark.executor.instances", 2)
--- End diff --

numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-26 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14280444
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.IntParam
+
+private[spark] class YarnClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+
+  override def start() {
+super.start()
+var numExecutors = 2
+if (sc.getConf.contains("spark.executor.instances")) {
+  numExecutors = sc.getConf.getInt("spark.executor.instances", 2)
--- End diff --

If we set numExecutors based on SPARK_EXECUTORS_INSTANCE firstly, because 
sc.getConf.getInt("spark.executor.instances", 2) will always return a value, it 
will lead to SPARK_EXECUTORS_INSTANCE be override whatever 
spark.executor.instances is configured.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-26 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14280315
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.IntParam
+
+private[spark] class YarnClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+
+  override def start() {
+super.start()
+var numExecutors = 2
--- End diff --

I agree that those other constants could be defined in a better way -- but 
my understanding is that this case is more extreme, because if you defined 
numExecutors to be something other than 2 here, there would actually be a bug 
right, because the scheduler would be waiting for the wrong number of executors 
to start?

In any case, this PR doesn't need to fix all of the constants in the 
project, but any new constants added should be added properly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-26 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14280284
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.IntParam
+
+private[spark] class YarnClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+
+  override def start() {
+super.start()
+var numExecutors = 2
+if (sc.getConf.contains("spark.executor.instances")) {
+  numExecutors = sc.getConf.getInt("spark.executor.instances", 2)
+} else if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
+  
IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).map(_.toInt).getOrElse(2)
--- End diff --

Ah cool that makes sense


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-26 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14280169
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.IntParam
+
+private[spark] class YarnClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+
+  override def start() {
+super.start()
+var numExecutors = 2
--- End diff --

@kayousterhout The constant (default value of numExecutors) is not only 
defined in this class and  ApplicationMasterArguments. It is also defined in 
ClientArguments. 

Even I guess the below default values are from same consideration like 
numExecurots.

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1232

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1232

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1232

If we want to make the value to be constant, we should considerate all of 
them.
So maybe we can add an object org.apache.spark.Constants to manange all 
constants.
@tgravescs 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-26 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14280133
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.IntParam
+
+private[spark] class YarnClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+
+  override def start() {
+super.start()
+var numExecutors = 2
+if (sc.getConf.contains("spark.executor.instances")) {
+  numExecutors = sc.getConf.getInt("spark.executor.instances", 2)
--- End diff --

Ok but you can just reverse the order right?  Like, first set numExecutors 
based on SPARK_EXECUTOR_INSTANCES, and then after that have my proposed line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-26 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14279974
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.IntParam
+
+private[spark] class YarnClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+
+  override def start() {
+super.start()
+var numExecutors = 2
+if (sc.getConf.contains("spark.executor.instances")) {
+  numExecutors = sc.getConf.getInt("spark.executor.instances", 2)
--- End diff --

@kayousterhout There is rule: system properties override environment 
variables. To eliminate the "if" will lead to environment variable override 
system property.

https://github.com/apache/spark/blob/master/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala#L62

BTW @tgravescs It seems these codes  against the rule

https://github.com/apache/spark/blob/master/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala#L36


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-26 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14265985
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.IntParam
+
+private[spark] class YarnClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+
+  override def start() {
+super.start()
+var numExecutors = 2
+if (sc.getConf.contains("spark.executor.instances")) {
+  numExecutors = sc.getConf.getInt("spark.executor.instances", 2)
+} else if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
+  
IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).map(_.toInt).getOrElse(2)
--- End diff --

well the SPARK_WORKER_INSTANCES env variables was only using in yarn-client 
mode so isn't strictly needed here since this class is backend for yarn-cluster 
mode..  I assume that is why it was removed.   Something I hadn't thought of 
when you first asked.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-26 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14252164
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.IntParam
+
+private[spark] class YarnClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+
+  override def start() {
+super.start()
+var numExecutors = 2
+if (sc.getConf.contains("spark.executor.instances")) {
+  numExecutors = sc.getConf.getInt("spark.executor.instances", 2)
+} else if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
+  
IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).map(_.toInt).getOrElse(2)
--- End diff --

Also what about SPARK_WORKER_INSTANCES? My understanding is that checking 
that environment variable is necessary for backwards compatibility (correct me 
if I'm wrong here @tgravescs)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-26 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14252092
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.IntParam
+
+private[spark] class YarnClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+
+  override def start() {
+super.start()
+var numExecutors = 2
+if (sc.getConf.contains("spark.executor.instances")) {
+  numExecutors = sc.getConf.getInt("spark.executor.instances", 2)
--- End diff --

can you eliminate the "if" here and just have
numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors)

(and same for the case below)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-26 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14252031
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.IntParam
+
+private[spark] class YarnClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+
+  override def start() {
+super.start()
+var numExecutors = 2
--- End diff --

Can you read this from a constant (you'll need to extract the constant) in 
ApplicationMasterArguments, rather than defining the default in both places?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-26 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14245269
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -46,9 +46,18 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 {
   // Use an atomic variable to track total number of cores in the cluster 
for simplicity and speed
   var totalCoreCount = new AtomicInteger(0)
+  var totalExpectedExecutors = new AtomicInteger(0)
   val conf = scheduler.sc.conf
   private val timeout = AkkaUtils.askTimeout(conf)
   private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+  // Submit tasks only after (registered executors / total expected 
executors) 
+  // is equal to at least this value.
+  var minRegisteredRatio = 
conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
--- End diff --

 We should add a check for > 1 and set to 1 if over. I initially set it to 
40 in a test thinking that meant 40%.  I guess the documentation will also 
clarify.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-26 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14238510
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.IntParam
+
+private[spark] class YarnClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+
+  override def start() {
+super.start()
+var numExecutors = 2
+if (sc.getConf.contains("spark.executor.instances")) {
+  numExecutors = sc.getConf.getInt("spark.executor.instances", 2)
+} else if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
+  
IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).map(_.toInt).getOrElse(2)
--- End diff --

you aren't setting numExectuors.  Also doesn't IntParam already give you an 
int (Option[Int]) so no need to do toInt again.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-26 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14232018
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.deploy.yarn.ApplicationMasterArguments
+import org.apache.spark.scheduler.TaskSchedulerImpl
+
+import scala.collection.mutable.ArrayBuffer
+
+private[spark] class YarnClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
+  with Logging {
+
+  private[spark] def addArg(optionName: String, envVar: String, sysProp: 
String,
+  arrayBuf: ArrayBuffer[String]) {
+if (System.getenv(envVar) != null) {
+  arrayBuf += (optionName, System.getenv(envVar))
+} else if (sc.getConf.contains(sysProp)) {
+  arrayBuf += (optionName, sc.getConf.get(sysProp))
+}
+  }
+
+  override def start() {
+super.start()
+val argsArrayBuf = new ArrayBuffer[String]()
+List(("--num-executors", "SPARK_EXECUTOR_INSTANCES", 
"spark.executor.instances"),
+  ("--num-executors", "SPARK_WORKER_INSTANCES", 
"spark.worker.instances"))
+  .foreach { case (optName, envVar, sysProp) => addArg(optName, 
envVar, sysProp, argsArrayBuf) }
+val args = new ApplicationMasterArguments(argsArrayBuf.toArray)
+totalExecutors.set(args.numExecutors)
--- End diff --

@kayousterhout Done. 
About constants, maybe we can take another PR to manage constants for the 
whole project.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-26 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/900#issuecomment-47204723
  
@tgravescs @kayousterhout 
I add a new commit

* Move waitBackendReady to TaskSchedulerImpl.start
* Code refactor by @kayousterhout 's comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-26 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14231480
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -244,6 +255,17 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 throw new SparkException("Error notifying standalone scheduler's 
driver actor", e)
 }
   }
+
+  override def isReady(): Boolean = {
+if (ready) {
--- End diff --

Thanks @pwendell @kayousterhout I am more thoughtful about these code's 
performance. ^_^
But we can't simply inline the code because executorActor is a member of 
inner class DriverActor. Although we can get the member by adding some code, I 
don't sure it cost to do.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-25 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14225947
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -244,6 +255,17 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 throw new SparkException("Error notifying standalone scheduler's 
driver actor", e)
 }
   }
+
+  override def isReady(): Boolean = {
+if (ready) {
--- End diff --

It would be simpler to just inline the following code. There is no valid 
performance argument for separating it.

```
(executorActor.size >= totalExecutors.get() * minRegisteredRatio)
```

Referencing the size of a HashMap in scala is a constant time operation. See


https://github.com/scala/scala/blob/v2.10.4/src/library/scala/collection/mutable/HashTable.scala#L48

`totalExecutors.get()` is also a constant time operation.

I don't see any performance argument for the current approach.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-25 Thread kayousterhout
Github user kayousterhout commented on the pull request:

https://github.com/apache/spark/pull/900#issuecomment-47189335
  
That seems like a good plan


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-25 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14225862
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.deploy.yarn.ApplicationMasterArguments
+import org.apache.spark.scheduler.TaskSchedulerImpl
+
+import scala.collection.mutable.ArrayBuffer
+
+private[spark] class YarnClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
+  with Logging {
+
+  private[spark] def addArg(optionName: String, envVar: String, sysProp: 
String,
+  arrayBuf: ArrayBuffer[String]) {
+if (System.getenv(envVar) != null) {
+  arrayBuf += (optionName, System.getenv(envVar))
+} else if (sc.getConf.contains(sysProp)) {
+  arrayBuf += (optionName, sc.getConf.get(sysProp))
+}
+  }
+
+  override def start() {
+super.start()
+val argsArrayBuf = new ArrayBuffer[String]()
+List(("--num-executors", "SPARK_EXECUTOR_INSTANCES", 
"spark.executor.instances"),
+  ("--num-executors", "SPARK_WORKER_INSTANCES", 
"spark.worker.instances"))
+  .foreach { case (optName, envVar, sysProp) => addArg(optName, 
envVar, sysProp, argsArrayBuf) }
+val args = new ApplicationMasterArguments(argsArrayBuf.toArray)
+totalExecutors.set(args.numExecutors)
--- End diff --

Yeah can you just do that (extract the value as a static constant)?  Then 
this file can be reduced to just checking for the value of the two environment 
variables and for the spark.executor.instances conf variable and setting 
totalExecutors accordingly (spark.worker.instances shouldn't be used I don't 
think -- see #1214)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-25 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/900#issuecomment-47188350
  
@tgravescs @kayousterhout 
How about move waitBackendReady to TaskSchedulerImpl.start. It will be 
called only once at spark initialization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-25 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14225520
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
 ---
@@ -95,6 +95,7 @@ private[spark] class SparkDeploySchedulerBackend(
 
   override def executorAdded(fullId: String, workerId: String, hostPort: 
String, cores: Int,
 memory: Int) {
+totalExecutors.addAndGet(1)
--- End diff --

Yes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-25 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14225485
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -244,6 +255,17 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 throw new SparkException("Error notifying standalone scheduler's 
driver actor", e)
 }
   }
+
+  override def isReady(): Boolean = {
+if (ready) {
--- End diff --

Based on my experience profiling the Spark scheduler, things like this do 
not affect performance in any significant way and in practice are often 
optimized out by JIT anyway, so we should opt for the more readable version


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-25 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14225397
  
--- Diff: 
yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
---
@@ -164,6 +164,7 @@ class ApplicationMaster(args: 
ApplicationMasterArguments, conf: Configuration,
 
   private def startUserClass(): Thread = {
 logInfo("Starting the user JAR in a separate Thread")
+System.setProperty("spark.executor.instances", 
args.numExecutors.toString)
--- End diff --

It's for yarn-cluster mode.
In yarn-cluster mode, Driver run in yarn container and lost System config 
which set in client.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-25 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14225319
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -244,6 +255,17 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 throw new SparkException("Error notifying standalone scheduler's 
driver actor", e)
 }
   }
+
+  override def isReady(): Boolean = {
+if (ready) {
--- End diff --

@kayousterhout Now, the method is called per submitting tasks, it can 
return quickly by saving the value of  "ready".  
If we moved waitBackendReady to BackendScheduler.start, the method will be 
called only once, and we should follow the idea.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-25 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14225172
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.deploy.yarn.ApplicationMasterArguments
+import org.apache.spark.scheduler.TaskSchedulerImpl
+
+import scala.collection.mutable.ArrayBuffer
+
+private[spark] class YarnClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
+  with Logging {
+
+  private[spark] def addArg(optionName: String, envVar: String, sysProp: 
String,
+  arrayBuf: ArrayBuffer[String]) {
+if (System.getenv(envVar) != null) {
+  arrayBuf += (optionName, System.getenv(envVar))
+} else if (sc.getConf.contains(sysProp)) {
+  arrayBuf += (optionName, sc.getConf.get(sysProp))
+}
+  }
+
+  override def start() {
+super.start()
+val argsArrayBuf = new ArrayBuffer[String]()
+List(("--num-executors", "SPARK_EXECUTOR_INSTANCES", 
"spark.executor.instances"),
+  ("--num-executors", "SPARK_WORKER_INSTANCES", 
"spark.worker.instances"))
+  .foreach { case (optName, envVar, sysProp) => addArg(optName, 
envVar, sysProp, argsArrayBuf) }
+val args = new ApplicationMasterArguments(argsArrayBuf.toArray)
+totalExecutors.set(args.numExecutors)
--- End diff --

@kayousterhout If we removed creating ApplicationMasterArguments, we must 
assign default value (=2) of numExecutors in this class, that will lead to 
duplicate setting. Unless, we extract the value as a static constant. 
BTW, I think constants reference is a little confused in Spark.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-25 Thread kayousterhout
Github user kayousterhout commented on the pull request:

https://github.com/apache/spark/pull/900#issuecomment-47148880
  
@tgravescs @li-zhihui I would be in favor of moving the wait earlier -- 
because currently every job will call waitBackendReady(), but really this 
should just be called once when the app is initialized (since once it is ready, 
it will never become un-ready with the current code).  If at some point this 
gets used to check the state of things between stages, as @tgravescs suggested, 
we can then move it to be checked before each stage.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-25 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14205738
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
 ---
@@ -95,6 +95,7 @@ private[spark] class SparkDeploySchedulerBackend(
 
   override def executorAdded(fullId: String, workerId: String, hostPort: 
String, cores: Int,
 memory: Int) {
+totalExecutors.addAndGet(1)
--- End diff --

Is there a race condition here, where isReady() can return true because 
totalExecutors has not been correctly set yet?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-25 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14205832
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -46,9 +46,17 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 {
   // Use an atomic variable to track total number of cores in the cluster 
for simplicity and speed
   var totalCoreCount = new AtomicInteger(0)
+  var totalExecutors = new AtomicInteger(0)
--- End diff --

Or maybe it would be better to rename this totalExpectedExecutors?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-25 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14205805
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -46,9 +46,17 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 {
   // Use an atomic variable to track total number of cores in the cluster 
for simplicity and speed
   var totalCoreCount = new AtomicInteger(0)
+  var totalExecutors = new AtomicInteger(0)
--- End diff --

Can you add a comment here saying this is the total number of executors we 
expect to be launched for this cluster?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-25 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14205252
  
--- Diff: 
yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
---
@@ -164,6 +164,7 @@ class ApplicationMaster(args: 
ApplicationMasterArguments, conf: Configuration,
 
   private def startUserClass(): Thread = {
 logInfo("Starting the user JAR in a separate Thread")
+System.setProperty("spark.executor.instances", 
args.numExecutors.toString)
--- End diff --

Why do you need to set this here?  Is this for the case when 
args.numExecutors was set by SPARK_EXECUTOR_INSTANCES (since otherwise it seems 
like spark.executor.instances will already be set, right)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-25 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14203837
  
--- Diff: 
yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.deploy.yarn.ApplicationMasterArguments
+import org.apache.spark.scheduler.TaskSchedulerImpl
+
+import scala.collection.mutable.ArrayBuffer
+
+private[spark] class YarnClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
+  with Logging {
+
+  private[spark] def addArg(optionName: String, envVar: String, sysProp: 
String,
+  arrayBuf: ArrayBuffer[String]) {
+if (System.getenv(envVar) != null) {
+  arrayBuf += (optionName, System.getenv(envVar))
+} else if (sc.getConf.contains(sysProp)) {
+  arrayBuf += (optionName, sc.getConf.get(sysProp))
+}
+  }
+
+  override def start() {
+super.start()
+val argsArrayBuf = new ArrayBuffer[String]()
+List(("--num-executors", "SPARK_EXECUTOR_INSTANCES", 
"spark.executor.instances"),
+  ("--num-executors", "SPARK_WORKER_INSTANCES", 
"spark.worker.instances"))
+  .foreach { case (optName, envVar, sysProp) => addArg(optName, 
envVar, sysProp, argsArrayBuf) }
+val args = new ApplicationMasterArguments(argsArrayBuf.toArray)
+totalExecutors.set(args.numExecutors)
--- End diff --

Yeah so my point was can you just replace this code and the addArg() 
function with something simpler that avoids creating this list of options and 
passing them to ApplicationMasterArguments?  So, just check to see if 
SPARK_EXECUTOR_INSTANCES/SPARK_WORKER_INSTANCES/spark.executor.instances/spark.worker.instances
 are set and if so, set totalExecutors accordingly?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-25 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14202548
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -244,6 +255,17 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 throw new SparkException("Error notifying standalone scheduler's 
driver actor", e)
 }
   }
+
+  override def isReady(): Boolean = {
+if (ready) {
--- End diff --

I think saving the value of "ready" makes the code a bit difficult to read 
here, in part because it doesn't actually signal whether the backend is ready 
(since isReady() could return true even when ready is false).  Can you just 
eliminate "ready" and move this line:

if (executorActor.size >= totalExecutors.get() * minRegisteredRatio) {

to here?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-25 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14202320
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -46,9 +46,17 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 {
   // Use an atomic variable to track total number of cores in the cluster 
for simplicity and speed
   var totalCoreCount = new AtomicInteger(0)
+  var totalExecutors = new AtomicInteger(0)
   val conf = scheduler.sc.conf
   private val timeout = AkkaUtils.askTimeout(conf)
   private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+  // Submit tasks only after (registered executors / total executors) 
arrived the ratio.
--- End diff --

"arrived the ratio" --> "is equal to at least this value"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-06-25 Thread tgravescs
Github user tgravescs commented on the pull request:

https://github.com/apache/spark/pull/900#issuecomment-47104091
  
thanks @li-zhihui. I was actually referring to modifying the user docs to 
add the new configs.  look in docs/configuration.md.   

It makes sense to move it down and get as much initialization stuff out of 
the way before waiting.  To me exactly which class it goes in depends on how we 
see it fitting and potentially being used in the future.  You could for 
instance move it down into submitMissingTasks before the call to submitTasks 
and leave it in DAGScheduler instead.

I think for this pr where we are just checking initially (job submission) 
that we have enough executors it doesn't matter to much.  But in the future if 
we would want to check between stages or potentially when adding tasks then it 
matters where it goes.  

perhaps @kayousterhout has opinion on where it better fits?   



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---