spark git commit: Revert "[SPARK-17637][SCHEDULER] Packed scheduling for Spark tasks across executors"
Repository: spark Updated Branches: refs/heads/master ed1463341 -> 72a6e7a57 Revert "[SPARK-17637][SCHEDULER] Packed scheduling for Spark tasks across executors" This reverts commit ed1463341455830b8867b721a1b34f291139baf3. The patch merged had obvious quality and documentation issue. The idea is useful, and we should work towards improving its quality and merging it in again. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72a6e7a5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72a6e7a5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72a6e7a5 Branch: refs/heads/master Commit: 72a6e7a57a63aba69f26c84bf68a5fb213d2a521 Parents: ed14633 Author: Reynold XinAuthored: Sat Oct 15 22:31:37 2016 -0700 Committer: Reynold Xin Committed: Sat Oct 15 22:31:37 2016 -0700 -- .../apache/spark/scheduler/TaskAssigner.scala | 154 --- .../spark/scheduler/TaskSchedulerImpl.scala | 53 +++ .../scheduler/TaskSchedulerImplSuite.scala | 67 docs/configuration.md | 11 -- 4 files changed, 19 insertions(+), 266 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/72a6e7a5/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala deleted file mode 100644 index 62df965..000 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler - -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.PriorityQueue -import scala.util.Random - -import org.apache.spark.SparkConf - -case class OfferState(workOffer: WorkerOffer, var cores: Int) { - // Build a list of tasks to assign to each worker. - val tasks = new ArrayBuffer[TaskDescription](cores) -} - -abstract class TaskAssigner(conf: SparkConf) { - var offer: Seq[OfferState] = _ - val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1) - - // The final assigned offer returned to TaskScheduler. - def tasks(): Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) - - // construct the assigner by the workoffer. - def construct(workOffer: Seq[WorkerOffer]): Unit = { -offer = workOffer.map(o => OfferState(o, o.cores)) - } - - // Invoked in each round of Taskset assignment to initialize the internal structure. - def init(): Unit - - // Indicating whether there is offer available to be used by one round of Taskset assignment. - def hasNext(): Boolean - - // Next available offer returned to one round of Taskset assignment. - def getNext(): OfferState - - // Called by the TaskScheduler to indicate whether the current offer is accepted - // In order to decide whether the current is valid for the next offering. - def taskAssigned(assigned: Boolean): Unit - - // Release internally maintained resources. Subclass is responsible to - // release its own private resources. - def reset: Unit = { -offer = null - } -} - -class RoundRobinAssigner(conf: SparkConf) extends TaskAssigner(conf) { - var i = 0 - override def construct(workOffer: Seq[WorkerOffer]): Unit = { -offer = Random.shuffle(workOffer.map(o => OfferState(o, o.cores))) - } - override def init(): Unit = { -i = 0 - } - override def hasNext: Boolean = { -i < offer.size - } - override def getNext(): OfferState = { -offer(i) - } - override def taskAssigned(assigned: Boolean): Unit = { -i += 1 - } - override def reset: Unit = { -super.reset -i = 0 - } -} - -class BalancedAssigner(conf: SparkConf) extends TaskAssigner(conf) { - var maxHeap: PriorityQueue[OfferState] = _ - var current: OfferState = _ - - override def construct(workOffer: Seq[WorkerOffer]): Unit = { -offer =
spark git commit: [SPARK-17637][SCHEDULER] Packed scheduling for Spark tasks across executors
Repository: spark Updated Branches: refs/heads/master 36d81c2c6 -> ed1463341 [SPARK-17637][SCHEDULER] Packed scheduling for Spark tasks across executors ## What changes were proposed in this pull request? Restructure the code and implement two new task assigner. PackedAssigner: try to allocate tasks to the executors with least available cores, so that spark can release reserved executors when dynamic allocation is enabled. BalancedAssigner: try to allocate tasks to the executors with more available cores in order to balance the workload across all executors. By default, the original round robin assigner is used. We test a pipeline, and new PackedAssigner save around 45% regarding the reserved cpu and memory with dynamic allocation enabled. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) Both unit test in TaskSchedulerImplSuite and manual tests in production pipeline. Author: Zhan ZhangCloses #15218 from zhzhan/packed-scheduler. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ed146334 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ed146334 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ed146334 Branch: refs/heads/master Commit: ed1463341455830b8867b721a1b34f291139baf3 Parents: 36d81c2 Author: Zhan Zhang Authored: Sat Oct 15 18:45:04 2016 -0700 Committer: Mridul Muralidharan Committed: Sat Oct 15 18:45:04 2016 -0700 -- .../apache/spark/scheduler/TaskAssigner.scala | 154 +++ .../spark/scheduler/TaskSchedulerImpl.scala | 53 --- .../scheduler/TaskSchedulerImplSuite.scala | 67 docs/configuration.md | 11 ++ 4 files changed, 266 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ed146334/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala new file mode 100644 index 000..62df965 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.SparkConf + +case class OfferState(workOffer: WorkerOffer, var cores: Int) { + // Build a list of tasks to assign to each worker. + val tasks = new ArrayBuffer[TaskDescription](cores) +} + +abstract class TaskAssigner(conf: SparkConf) { + var offer: Seq[OfferState] = _ + val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1) + + // The final assigned offer returned to TaskScheduler. + def tasks(): Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + // construct the assigner by the workoffer. + def construct(workOffer: Seq[WorkerOffer]): Unit = { +offer = workOffer.map(o => OfferState(o, o.cores)) + } + + // Invoked in each round of Taskset assignment to initialize the internal structure. + def init(): Unit + + // Indicating whether there is offer available to be used by one round of Taskset assignment. + def hasNext(): Boolean + + // Next available offer returned to one round of Taskset assignment. + def getNext(): OfferState + + // Called by the TaskScheduler to indicate whether the current offer is accepted + // In order to decide whether the current is valid for the next offering. + def taskAssigned(assigned: Boolean): Unit + + // Release internally maintained resources. Subclass is responsible to + // release its own private resources. + def reset: Unit = { +offer = null + } +} + +class RoundRobinAssigner(conf: SparkConf) extends TaskAssigner(conf) { + var i = 0 + override def
spark git commit: [SPARK-17953][DOCUMENTATION] Fix typo in SparkSession scaladoc
Repository: spark Updated Branches: refs/heads/branch-2.0 c53b83749 -> 2a1b10b64 [SPARK-17953][DOCUMENTATION] Fix typo in SparkSession scaladoc ## What changes were proposed in this pull request? ### Before: ```scala SparkSession.builder() .master("local") .appName("Word Count") .config("spark.some.config.option", "some-value"). .getOrCreate() ``` ### After: ```scala SparkSession.builder() .master("local") .appName("Word Count") .config("spark.some.config.option", "some-value") .getOrCreate() ``` There was one unexpected dot! Author: Jun KimCloses #15498 from tae-jun/SPARK-17953. (cherry picked from commit 36d81c2c68ef4114592b069287743eb5cb078318) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a1b10b6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a1b10b6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a1b10b6 Branch: refs/heads/branch-2.0 Commit: 2a1b10b649a8d4c077a0e19df976f1fd36b7e266 Parents: c53b837 Author: Jun Kim Authored: Sat Oct 15 00:36:55 2016 -0700 Committer: Reynold Xin Committed: Sat Oct 15 00:37:04 2016 -0700 -- sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2a1b10b6/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index c88206c..a7de115 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -64,7 +64,7 @@ import org.apache.spark.util.Utils * SparkSession.builder() * .master("local") * .appName("Word Count") - * .config("spark.some.config.option", "some-value"). + * .config("spark.some.config.option", "some-value") * .getOrCreate() * }}} */ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17953][DOCUMENTATION] Fix typo in SparkSession scaladoc
Repository: spark Updated Branches: refs/heads/master 6ce1b675e -> 36d81c2c6 [SPARK-17953][DOCUMENTATION] Fix typo in SparkSession scaladoc ## What changes were proposed in this pull request? ### Before: ```scala SparkSession.builder() .master("local") .appName("Word Count") .config("spark.some.config.option", "some-value"). .getOrCreate() ``` ### After: ```scala SparkSession.builder() .master("local") .appName("Word Count") .config("spark.some.config.option", "some-value") .getOrCreate() ``` There was one unexpected dot! Author: Jun KimCloses #15498 from tae-jun/SPARK-17953. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/36d81c2c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/36d81c2c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/36d81c2c Branch: refs/heads/master Commit: 36d81c2c68ef4114592b069287743eb5cb078318 Parents: 6ce1b67 Author: Jun Kim Authored: Sat Oct 15 00:36:55 2016 -0700 Committer: Reynold Xin Committed: Sat Oct 15 00:36:55 2016 -0700 -- sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/36d81c2c/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 137c426..baae550 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -64,7 +64,7 @@ import org.apache.spark.util.Utils * SparkSession.builder() * .master("local") * .appName("Word Count") - * .config("spark.some.config.option", "some-value"). + * .config("spark.some.config.option", "some-value") * .getOrCreate() * }}} */ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org