[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/900 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user tgravescs commented on the pull request: https://github.com/apache/spark/pull/900#issuecomment-48955561 Looks good. Thanks @li-zhihui --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user li-zhihui commented on the pull request: https://github.com/apache/spark/pull/900#issuecomment-48869936 @tgravescs add a commit according to comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14825405 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -244,6 +257,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A throw new SparkException("Error notifying standalone scheduler's driver actor", e) } } + + override def isReady(): Boolean = { +if (ready) { + return true +} +if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) { --- End diff -- it might be nice to have a log statement here saying max time hit so we know when the scheduling began if debugging a job. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14823260 --- Diff: docs/configuration.md --- @@ -699,6 +699,22 @@ Apart from these, the following properties are also available, and may be useful (in milliseconds) + + spark.scheduler.minRegisteredExecutorsRatio + 0 + +Submit tasks only after (registered executors / total expected executors) +is equal to at least this value, which is double between 0 and 1. + + + + spark.scheduler.maxRegisteredExecutorsWaitingTime + 3 + +Whatever (registered executors / total expected executors) is reached --- End diff -- I think we should clarify both of these a bit because its really you start when either one is hit so I think adding reference to maxRegisteredExecutorsWaitingTime from the description of minRegisteredExecutorsRatio would be good. How about something like below? Note I'm not a doc writer so I'm fine with changing. for spark.scheduler.minRegisteredExecutorsRatio: The minimum ratio of registered executors (registered executors / total expected executors) to wait for before scheduling begins. Specified as a double between 0 and 1. Regardless of whether the minimum ratio of executors has been reached, the maximum amount of time it will wait before scheduling begins is controlled by config spark.scheduler.maxRegisteredExecutorsWaitingTime . Then for spark.scheduler.maxRegisteredExecutorsWaitingTime: Maximum amount of time to wait for executors to register before scheduling begins (in milliseconds). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user li-zhihui commented on the pull request: https://github.com/apache/spark/pull/900#issuecomment-48714143 Thanks @tgravescs I will file a new jira for handling mesos and follow it after the PR merged. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user li-zhihui commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14813843 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -46,9 +46,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) + var totalExpectedExecutors = new AtomicInteger(0) val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) + // Submit tasks only after (registered executors / total expected executors) + // is equal to at least this value, that is double between 0 and 1. + var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0) --- End diff -- @tgravescs Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user tgravescs commented on the pull request: https://github.com/apache/spark/pull/900#issuecomment-48604243 postStartHook seems like a good place to put it. It will only be called once there and as you said Yarn cluster mode actually waits for the sparkcontext to be initialized before allocating executors. It looks like we aren't handling mesos. We should atleast file a jira for this. @li-zhihui did you look at mesos at all? For the yarn side where you added the TODO's about the sleep. I think we can leave them here as there is another jira to remove them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14764078 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -46,9 +46,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) + var totalExpectedExecutors = new AtomicInteger(0) val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) + // Submit tasks only after (registered executors / total expected executors) + // is equal to at least this value, that is double between 0 and 1. + var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0) --- End diff -- Please add the new configs to the user docs - see docs/configuration.md --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user li-zhihui commented on the pull request: https://github.com/apache/spark/pull/900#issuecomment-47490304 @tgravescs @kayousterhout It will lead to a logic deadlock in yarn-cluster mode, if waitBackendReady is in TaskSchedulerImpl.start. How about move it (waitBackendReady) to postStartHook() ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user li-zhihui commented on the pull request: https://github.com/apache/spark/pull/900#issuecomment-47331430 @tgravescs @kayousterhout I move waitBackendReady back to submitTasks method, because it (waitBackendReady in start method) dose not work on yarn-cluster mode (NullPointException because SparkContext initialize timeout) (yarn-client is ok). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user li-zhihui commented on the pull request: https://github.com/apache/spark/pull/900#issuecomment-47330805 @tgravescs @kayousterhout I add a new commit according to comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user li-zhihui commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14280510 --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.util.IntParam + +private[spark] class YarnClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +sc: SparkContext) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { + + override def start() { +super.start() +var numExecutors = 2 +if (sc.getConf.contains("spark.executor.instances")) { + numExecutors = sc.getConf.getInt("spark.executor.instances", 2) --- End diff -- Cool ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14280473 --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.util.IntParam + +private[spark] class YarnClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +sc: SparkContext) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { + + override def start() { +super.start() +var numExecutors = 2 +if (sc.getConf.contains("spark.executor.instances")) { + numExecutors = sc.getConf.getInt("spark.executor.instances", 2) --- End diff -- (so you don't override it if spark.executor.instances is not already set) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14280463 --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.util.IntParam + +private[spark] class YarnClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +sc: SparkContext) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { + + override def start() { +super.start() +var numExecutors = 2 +if (sc.getConf.contains("spark.executor.instances")) { + numExecutors = sc.getConf.getInt("spark.executor.instances", 2) --- End diff -- numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user li-zhihui commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14280444 --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.util.IntParam + +private[spark] class YarnClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +sc: SparkContext) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { + + override def start() { +super.start() +var numExecutors = 2 +if (sc.getConf.contains("spark.executor.instances")) { + numExecutors = sc.getConf.getInt("spark.executor.instances", 2) --- End diff -- If we set numExecutors based on SPARK_EXECUTORS_INSTANCE firstly, because sc.getConf.getInt("spark.executor.instances", 2) will always return a value, it will lead to SPARK_EXECUTORS_INSTANCE be override whatever spark.executor.instances is configured. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14280315 --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.util.IntParam + +private[spark] class YarnClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +sc: SparkContext) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { + + override def start() { +super.start() +var numExecutors = 2 --- End diff -- I agree that those other constants could be defined in a better way -- but my understanding is that this case is more extreme, because if you defined numExecutors to be something other than 2 here, there would actually be a bug right, because the scheduler would be waiting for the wrong number of executors to start? In any case, this PR doesn't need to fix all of the constants in the project, but any new constants added should be added properly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14280284 --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.util.IntParam + +private[spark] class YarnClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +sc: SparkContext) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { + + override def start() { +super.start() +var numExecutors = 2 +if (sc.getConf.contains("spark.executor.instances")) { + numExecutors = sc.getConf.getInt("spark.executor.instances", 2) +} else if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) { + IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).map(_.toInt).getOrElse(2) --- End diff -- Ah cool that makes sense --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user li-zhihui commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14280169 --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.util.IntParam + +private[spark] class YarnClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +sc: SparkContext) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { + + override def start() { +super.start() +var numExecutors = 2 --- End diff -- @kayousterhout The constant (default value of numExecutors) is not only defined in this class and ApplicationMasterArguments. It is also defined in ClientArguments. Even I guess the below default values are from same consideration like numExecurots. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1232 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1232 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1232 If we want to make the value to be constant, we should considerate all of them. So maybe we can add an object org.apache.spark.Constants to manange all constants. @tgravescs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14280133 --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.util.IntParam + +private[spark] class YarnClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +sc: SparkContext) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { + + override def start() { +super.start() +var numExecutors = 2 +if (sc.getConf.contains("spark.executor.instances")) { + numExecutors = sc.getConf.getInt("spark.executor.instances", 2) --- End diff -- Ok but you can just reverse the order right? Like, first set numExecutors based on SPARK_EXECUTOR_INSTANCES, and then after that have my proposed line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user li-zhihui commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14279974 --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.util.IntParam + +private[spark] class YarnClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +sc: SparkContext) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { + + override def start() { +super.start() +var numExecutors = 2 +if (sc.getConf.contains("spark.executor.instances")) { + numExecutors = sc.getConf.getInt("spark.executor.instances", 2) --- End diff -- @kayousterhout There is rule: system properties override environment variables. To eliminate the "if" will lead to environment variable override system property. https://github.com/apache/spark/blob/master/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala#L62 BTW @tgravescs It seems these codes against the rule https://github.com/apache/spark/blob/master/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala#L36 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14265985 --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.util.IntParam + +private[spark] class YarnClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +sc: SparkContext) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { + + override def start() { +super.start() +var numExecutors = 2 +if (sc.getConf.contains("spark.executor.instances")) { + numExecutors = sc.getConf.getInt("spark.executor.instances", 2) +} else if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) { + IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).map(_.toInt).getOrElse(2) --- End diff -- well the SPARK_WORKER_INSTANCES env variables was only using in yarn-client mode so isn't strictly needed here since this class is backend for yarn-cluster mode.. I assume that is why it was removed. Something I hadn't thought of when you first asked. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14252164 --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.util.IntParam + +private[spark] class YarnClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +sc: SparkContext) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { + + override def start() { +super.start() +var numExecutors = 2 +if (sc.getConf.contains("spark.executor.instances")) { + numExecutors = sc.getConf.getInt("spark.executor.instances", 2) +} else if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) { + IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).map(_.toInt).getOrElse(2) --- End diff -- Also what about SPARK_WORKER_INSTANCES? My understanding is that checking that environment variable is necessary for backwards compatibility (correct me if I'm wrong here @tgravescs) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14252092 --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.util.IntParam + +private[spark] class YarnClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +sc: SparkContext) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { + + override def start() { +super.start() +var numExecutors = 2 +if (sc.getConf.contains("spark.executor.instances")) { + numExecutors = sc.getConf.getInt("spark.executor.instances", 2) --- End diff -- can you eliminate the "if" here and just have numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors) (and same for the case below) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14252031 --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.util.IntParam + +private[spark] class YarnClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +sc: SparkContext) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { + + override def start() { +super.start() +var numExecutors = 2 --- End diff -- Can you read this from a constant (you'll need to extract the constant) in ApplicationMasterArguments, rather than defining the default in both places? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14245269 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -46,9 +46,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) + var totalExpectedExecutors = new AtomicInteger(0) val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) + // Submit tasks only after (registered executors / total expected executors) + // is equal to at least this value. + var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0) --- End diff -- We should add a check for > 1 and set to 1 if over. I initially set it to 40 in a test thinking that meant 40%. I guess the documentation will also clarify. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14238510 --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.util.IntParam + +private[spark] class YarnClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +sc: SparkContext) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { + + override def start() { +super.start() +var numExecutors = 2 +if (sc.getConf.contains("spark.executor.instances")) { + numExecutors = sc.getConf.getInt("spark.executor.instances", 2) +} else if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) { + IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).map(_.toInt).getOrElse(2) --- End diff -- you aren't setting numExectuors. Also doesn't IntParam already give you an int (Option[Int]) so no need to do toInt again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user li-zhihui commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14232018 --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + + +import org.apache.spark.{Logging, SparkContext} +import org.apache.spark.deploy.yarn.ApplicationMasterArguments +import org.apache.spark.scheduler.TaskSchedulerImpl + +import scala.collection.mutable.ArrayBuffer + +private[spark] class YarnClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +sc: SparkContext) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) + with Logging { + + private[spark] def addArg(optionName: String, envVar: String, sysProp: String, + arrayBuf: ArrayBuffer[String]) { +if (System.getenv(envVar) != null) { + arrayBuf += (optionName, System.getenv(envVar)) +} else if (sc.getConf.contains(sysProp)) { + arrayBuf += (optionName, sc.getConf.get(sysProp)) +} + } + + override def start() { +super.start() +val argsArrayBuf = new ArrayBuffer[String]() +List(("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"), + ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.worker.instances")) + .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) } +val args = new ApplicationMasterArguments(argsArrayBuf.toArray) +totalExecutors.set(args.numExecutors) --- End diff -- @kayousterhout Done. About constants, maybe we can take another PR to manage constants for the whole project. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user li-zhihui commented on the pull request: https://github.com/apache/spark/pull/900#issuecomment-47204723 @tgravescs @kayousterhout I add a new commit * Move waitBackendReady to TaskSchedulerImpl.start * Code refactor by @kayousterhout 's comments --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user li-zhihui commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14231480 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -244,6 +255,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A throw new SparkException("Error notifying standalone scheduler's driver actor", e) } } + + override def isReady(): Boolean = { +if (ready) { --- End diff -- Thanks @pwendell @kayousterhout I am more thoughtful about these code's performance. ^_^ But we can't simply inline the code because executorActor is a member of inner class DriverActor. Although we can get the member by adding some code, I don't sure it cost to do. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14225947 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -244,6 +255,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A throw new SparkException("Error notifying standalone scheduler's driver actor", e) } } + + override def isReady(): Boolean = { +if (ready) { --- End diff -- It would be simpler to just inline the following code. There is no valid performance argument for separating it. ``` (executorActor.size >= totalExecutors.get() * minRegisteredRatio) ``` Referencing the size of a HashMap in scala is a constant time operation. See https://github.com/scala/scala/blob/v2.10.4/src/library/scala/collection/mutable/HashTable.scala#L48 `totalExecutors.get()` is also a constant time operation. I don't see any performance argument for the current approach. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user kayousterhout commented on the pull request: https://github.com/apache/spark/pull/900#issuecomment-47189335 That seems like a good plan --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14225862 --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + + +import org.apache.spark.{Logging, SparkContext} +import org.apache.spark.deploy.yarn.ApplicationMasterArguments +import org.apache.spark.scheduler.TaskSchedulerImpl + +import scala.collection.mutable.ArrayBuffer + +private[spark] class YarnClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +sc: SparkContext) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) + with Logging { + + private[spark] def addArg(optionName: String, envVar: String, sysProp: String, + arrayBuf: ArrayBuffer[String]) { +if (System.getenv(envVar) != null) { + arrayBuf += (optionName, System.getenv(envVar)) +} else if (sc.getConf.contains(sysProp)) { + arrayBuf += (optionName, sc.getConf.get(sysProp)) +} + } + + override def start() { +super.start() +val argsArrayBuf = new ArrayBuffer[String]() +List(("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"), + ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.worker.instances")) + .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) } +val args = new ApplicationMasterArguments(argsArrayBuf.toArray) +totalExecutors.set(args.numExecutors) --- End diff -- Yeah can you just do that (extract the value as a static constant)? Then this file can be reduced to just checking for the value of the two environment variables and for the spark.executor.instances conf variable and setting totalExecutors accordingly (spark.worker.instances shouldn't be used I don't think -- see #1214) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user li-zhihui commented on the pull request: https://github.com/apache/spark/pull/900#issuecomment-47188350 @tgravescs @kayousterhout How about move waitBackendReady to TaskSchedulerImpl.start. It will be called only once at spark initialization. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user li-zhihui commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14225520 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala --- @@ -95,6 +95,7 @@ private[spark] class SparkDeploySchedulerBackend( override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int) { +totalExecutors.addAndGet(1) --- End diff -- Yes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14225485 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -244,6 +255,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A throw new SparkException("Error notifying standalone scheduler's driver actor", e) } } + + override def isReady(): Boolean = { +if (ready) { --- End diff -- Based on my experience profiling the Spark scheduler, things like this do not affect performance in any significant way and in practice are often optimized out by JIT anyway, so we should opt for the more readable version --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user li-zhihui commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14225397 --- Diff: yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala --- @@ -164,6 +164,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") +System.setProperty("spark.executor.instances", args.numExecutors.toString) --- End diff -- It's for yarn-cluster mode. In yarn-cluster mode, Driver run in yarn container and lost System config which set in client. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user li-zhihui commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14225319 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -244,6 +255,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A throw new SparkException("Error notifying standalone scheduler's driver actor", e) } } + + override def isReady(): Boolean = { +if (ready) { --- End diff -- @kayousterhout Now, the method is called per submitting tasks, it can return quickly by saving the value of "ready". If we moved waitBackendReady to BackendScheduler.start, the method will be called only once, and we should follow the idea. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user li-zhihui commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14225172 --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + + +import org.apache.spark.{Logging, SparkContext} +import org.apache.spark.deploy.yarn.ApplicationMasterArguments +import org.apache.spark.scheduler.TaskSchedulerImpl + +import scala.collection.mutable.ArrayBuffer + +private[spark] class YarnClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +sc: SparkContext) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) + with Logging { + + private[spark] def addArg(optionName: String, envVar: String, sysProp: String, + arrayBuf: ArrayBuffer[String]) { +if (System.getenv(envVar) != null) { + arrayBuf += (optionName, System.getenv(envVar)) +} else if (sc.getConf.contains(sysProp)) { + arrayBuf += (optionName, sc.getConf.get(sysProp)) +} + } + + override def start() { +super.start() +val argsArrayBuf = new ArrayBuffer[String]() +List(("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"), + ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.worker.instances")) + .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) } +val args = new ApplicationMasterArguments(argsArrayBuf.toArray) +totalExecutors.set(args.numExecutors) --- End diff -- @kayousterhout If we removed creating ApplicationMasterArguments, we must assign default value (=2) of numExecutors in this class, that will lead to duplicate setting. Unless, we extract the value as a static constant. BTW, I think constants reference is a little confused in Spark. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user kayousterhout commented on the pull request: https://github.com/apache/spark/pull/900#issuecomment-47148880 @tgravescs @li-zhihui I would be in favor of moving the wait earlier -- because currently every job will call waitBackendReady(), but really this should just be called once when the app is initialized (since once it is ready, it will never become un-ready with the current code). If at some point this gets used to check the state of things between stages, as @tgravescs suggested, we can then move it to be checked before each stage. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14205738 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala --- @@ -95,6 +95,7 @@ private[spark] class SparkDeploySchedulerBackend( override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int) { +totalExecutors.addAndGet(1) --- End diff -- Is there a race condition here, where isReady() can return true because totalExecutors has not been correctly set yet? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14205832 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -46,9 +46,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) + var totalExecutors = new AtomicInteger(0) --- End diff -- Or maybe it would be better to rename this totalExpectedExecutors? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14205805 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -46,9 +46,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) + var totalExecutors = new AtomicInteger(0) --- End diff -- Can you add a comment here saying this is the total number of executors we expect to be launched for this cluster? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14205252 --- Diff: yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala --- @@ -164,6 +164,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") +System.setProperty("spark.executor.instances", args.numExecutors.toString) --- End diff -- Why do you need to set this here? Is this for the case when args.numExecutors was set by SPARK_EXECUTOR_INSTANCES (since otherwise it seems like spark.executor.instances will already be set, right)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14203837 --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + + +import org.apache.spark.{Logging, SparkContext} +import org.apache.spark.deploy.yarn.ApplicationMasterArguments +import org.apache.spark.scheduler.TaskSchedulerImpl + +import scala.collection.mutable.ArrayBuffer + +private[spark] class YarnClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +sc: SparkContext) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) + with Logging { + + private[spark] def addArg(optionName: String, envVar: String, sysProp: String, + arrayBuf: ArrayBuffer[String]) { +if (System.getenv(envVar) != null) { + arrayBuf += (optionName, System.getenv(envVar)) +} else if (sc.getConf.contains(sysProp)) { + arrayBuf += (optionName, sc.getConf.get(sysProp)) +} + } + + override def start() { +super.start() +val argsArrayBuf = new ArrayBuffer[String]() +List(("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"), + ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.worker.instances")) + .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) } +val args = new ApplicationMasterArguments(argsArrayBuf.toArray) +totalExecutors.set(args.numExecutors) --- End diff -- Yeah so my point was can you just replace this code and the addArg() function with something simpler that avoids creating this list of options and passing them to ApplicationMasterArguments? So, just check to see if SPARK_EXECUTOR_INSTANCES/SPARK_WORKER_INSTANCES/spark.executor.instances/spark.worker.instances are set and if so, set totalExecutors accordingly? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14202548 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -244,6 +255,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A throw new SparkException("Error notifying standalone scheduler's driver actor", e) } } + + override def isReady(): Boolean = { +if (ready) { --- End diff -- I think saving the value of "ready" makes the code a bit difficult to read here, in part because it doesn't actually signal whether the backend is ready (since isReady() could return true even when ready is false). Can you just eliminate "ready" and move this line: if (executorActor.size >= totalExecutors.get() * minRegisteredRatio) { to here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14202320 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -46,9 +46,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) + var totalExecutors = new AtomicInteger(0) val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) + // Submit tasks only after (registered executors / total executors) arrived the ratio. --- End diff -- "arrived the ratio" --> "is equal to at least this value" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user tgravescs commented on the pull request: https://github.com/apache/spark/pull/900#issuecomment-47104091 thanks @li-zhihui. I was actually referring to modifying the user docs to add the new configs. look in docs/configuration.md. It makes sense to move it down and get as much initialization stuff out of the way before waiting. To me exactly which class it goes in depends on how we see it fitting and potentially being used in the future. You could for instance move it down into submitMissingTasks before the call to submitTasks and leave it in DAGScheduler instead. I think for this pr where we are just checking initially (job submission) that we have enough executors it doesn't matter to much. But in the future if we would want to check between stages or potentially when adding tasks then it matters where it goes. perhaps @kayousterhout has opinion on where it better fits? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---