spark git commit: Revert "[SPARK-17637][SCHEDULER] Packed scheduling for Spark tasks across executors"

2016-10-15 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master ed1463341 -> 72a6e7a57


Revert "[SPARK-17637][SCHEDULER] Packed scheduling for Spark tasks across 
executors"

This reverts commit ed1463341455830b8867b721a1b34f291139baf3.

The patch merged had obvious quality and documentation issue. The idea is 
useful, and we should work towards improving its quality and merging it in 
again.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72a6e7a5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72a6e7a5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72a6e7a5

Branch: refs/heads/master
Commit: 72a6e7a57a63aba69f26c84bf68a5fb213d2a521
Parents: ed14633
Author: Reynold Xin 
Authored: Sat Oct 15 22:31:37 2016 -0700
Committer: Reynold Xin 
Committed: Sat Oct 15 22:31:37 2016 -0700

--
 .../apache/spark/scheduler/TaskAssigner.scala   | 154 ---
 .../spark/scheduler/TaskSchedulerImpl.scala |  53 +++
 .../scheduler/TaskSchedulerImplSuite.scala  |  67 
 docs/configuration.md   |  11 --
 4 files changed, 19 insertions(+), 266 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/72a6e7a5/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala
deleted file mode 100644
index 62df965..000
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.PriorityQueue
-import scala.util.Random
-
-import org.apache.spark.SparkConf
-
-case class OfferState(workOffer: WorkerOffer, var cores: Int) {
-  // Build a list of tasks to assign to each worker.
-  val tasks = new ArrayBuffer[TaskDescription](cores)
-}
-
-abstract class TaskAssigner(conf: SparkConf) {
-  var offer: Seq[OfferState] = _
-  val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
-
-  // The final assigned offer returned to TaskScheduler.
-  def tasks(): Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
-
-  // construct the assigner by the workoffer.
-  def construct(workOffer: Seq[WorkerOffer]): Unit = {
-offer = workOffer.map(o => OfferState(o, o.cores))
-  }
-
-  // Invoked in each round of Taskset assignment to initialize the internal 
structure.
-  def init(): Unit
-
-  // Indicating whether there is offer available to be used by one round of 
Taskset assignment.
-  def hasNext(): Boolean
-
-  // Next available offer returned to one round of Taskset assignment.
-  def getNext(): OfferState
-
-  // Called by the TaskScheduler to indicate whether the current offer is 
accepted
-  // In order to decide whether the current is valid for the next offering.
-  def taskAssigned(assigned: Boolean): Unit
-
-  // Release internally maintained resources. Subclass is responsible to
-  // release its own private resources.
-  def reset: Unit = {
-offer = null
-  }
-}
-
-class RoundRobinAssigner(conf: SparkConf) extends TaskAssigner(conf) {
-  var i = 0
-  override def construct(workOffer: Seq[WorkerOffer]): Unit = {
-offer = Random.shuffle(workOffer.map(o => OfferState(o, o.cores)))
-  }
-  override def init(): Unit = {
-i = 0
-  }
-  override def hasNext: Boolean = {
-i < offer.size
-  }
-  override def getNext(): OfferState = {
-offer(i)
-  }
-  override def taskAssigned(assigned: Boolean): Unit = {
-i += 1
-  }
-  override def reset: Unit = {
-super.reset
-i = 0
-  }
-}
-
-class BalancedAssigner(conf: SparkConf) extends TaskAssigner(conf) {
-  var maxHeap: PriorityQueue[OfferState] = _
-  var current: OfferState = _
-
-  override def construct(workOffer: Seq[WorkerOffer]): Unit = {
-offer = 

spark git commit: [SPARK-17637][SCHEDULER] Packed scheduling for Spark tasks across executors

2016-10-15 Thread mridulm80
Repository: spark
Updated Branches:
  refs/heads/master 36d81c2c6 -> ed1463341


[SPARK-17637][SCHEDULER] Packed scheduling for Spark tasks across executors

## What changes were proposed in this pull request?

Restructure the code and implement two new task assigner.
PackedAssigner: try to allocate tasks to the executors with least available 
cores, so that spark can release reserved executors when dynamic allocation is 
enabled.

BalancedAssigner: try to allocate tasks to the executors with more available 
cores in order to balance the workload across all executors.

By default, the original round robin assigner is used.

We test a pipeline, and new PackedAssigner  save around 45% regarding the 
reserved cpu and memory with dynamic allocation enabled.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, 
manual tests)
Both unit test in TaskSchedulerImplSuite and manual tests in production 
pipeline.

Author: Zhan Zhang 

Closes #15218 from zhzhan/packed-scheduler.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ed146334
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ed146334
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ed146334

Branch: refs/heads/master
Commit: ed1463341455830b8867b721a1b34f291139baf3
Parents: 36d81c2
Author: Zhan Zhang 
Authored: Sat Oct 15 18:45:04 2016 -0700
Committer: Mridul Muralidharan 
Committed: Sat Oct 15 18:45:04 2016 -0700

--
 .../apache/spark/scheduler/TaskAssigner.scala   | 154 +++
 .../spark/scheduler/TaskSchedulerImpl.scala |  53 ---
 .../scheduler/TaskSchedulerImplSuite.scala  |  67 
 docs/configuration.md   |  11 ++
 4 files changed, 266 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ed146334/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala
new file mode 100644
index 000..62df965
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.PriorityQueue
+import scala.util.Random
+
+import org.apache.spark.SparkConf
+
+case class OfferState(workOffer: WorkerOffer, var cores: Int) {
+  // Build a list of tasks to assign to each worker.
+  val tasks = new ArrayBuffer[TaskDescription](cores)
+}
+
+abstract class TaskAssigner(conf: SparkConf) {
+  var offer: Seq[OfferState] = _
+  val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
+
+  // The final assigned offer returned to TaskScheduler.
+  def tasks(): Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
+
+  // construct the assigner by the workoffer.
+  def construct(workOffer: Seq[WorkerOffer]): Unit = {
+offer = workOffer.map(o => OfferState(o, o.cores))
+  }
+
+  // Invoked in each round of Taskset assignment to initialize the internal 
structure.
+  def init(): Unit
+
+  // Indicating whether there is offer available to be used by one round of 
Taskset assignment.
+  def hasNext(): Boolean
+
+  // Next available offer returned to one round of Taskset assignment.
+  def getNext(): OfferState
+
+  // Called by the TaskScheduler to indicate whether the current offer is 
accepted
+  // In order to decide whether the current is valid for the next offering.
+  def taskAssigned(assigned: Boolean): Unit
+
+  // Release internally maintained resources. Subclass is responsible to
+  // release its own private resources.
+  def reset: Unit = {
+offer = null
+  }
+}
+
+class RoundRobinAssigner(conf: SparkConf) extends TaskAssigner(conf) {
+  var i = 0
+  override def 

spark git commit: [SPARK-17953][DOCUMENTATION] Fix typo in SparkSession scaladoc

2016-10-15 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 c53b83749 -> 2a1b10b64


[SPARK-17953][DOCUMENTATION] Fix typo in SparkSession scaladoc

## What changes were proposed in this pull request?

### Before:
```scala
SparkSession.builder()
 .master("local")
 .appName("Word Count")
 .config("spark.some.config.option", "some-value").
 .getOrCreate()
```

### After:
```scala
SparkSession.builder()
 .master("local")
 .appName("Word Count")
 .config("spark.some.config.option", "some-value")
 .getOrCreate()
```

There was one unexpected dot!

Author: Jun Kim 

Closes #15498 from tae-jun/SPARK-17953.

(cherry picked from commit 36d81c2c68ef4114592b069287743eb5cb078318)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a1b10b6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a1b10b6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a1b10b6

Branch: refs/heads/branch-2.0
Commit: 2a1b10b649a8d4c077a0e19df976f1fd36b7e266
Parents: c53b837
Author: Jun Kim 
Authored: Sat Oct 15 00:36:55 2016 -0700
Committer: Reynold Xin 
Committed: Sat Oct 15 00:37:04 2016 -0700

--
 sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2a1b10b6/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index c88206c..a7de115 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -64,7 +64,7 @@ import org.apache.spark.util.Utils
  *   SparkSession.builder()
  * .master("local")
  * .appName("Word Count")
- * .config("spark.some.config.option", "some-value").
+ * .config("spark.some.config.option", "some-value")
  * .getOrCreate()
  * }}}
  */


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17953][DOCUMENTATION] Fix typo in SparkSession scaladoc

2016-10-15 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 6ce1b675e -> 36d81c2c6


[SPARK-17953][DOCUMENTATION] Fix typo in SparkSession scaladoc

## What changes were proposed in this pull request?

### Before:
```scala
SparkSession.builder()
 .master("local")
 .appName("Word Count")
 .config("spark.some.config.option", "some-value").
 .getOrCreate()
```

### After:
```scala
SparkSession.builder()
 .master("local")
 .appName("Word Count")
 .config("spark.some.config.option", "some-value")
 .getOrCreate()
```

There was one unexpected dot!

Author: Jun Kim 

Closes #15498 from tae-jun/SPARK-17953.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/36d81c2c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/36d81c2c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/36d81c2c

Branch: refs/heads/master
Commit: 36d81c2c68ef4114592b069287743eb5cb078318
Parents: 6ce1b67
Author: Jun Kim 
Authored: Sat Oct 15 00:36:55 2016 -0700
Committer: Reynold Xin 
Committed: Sat Oct 15 00:36:55 2016 -0700

--
 sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/36d81c2c/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 137c426..baae550 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -64,7 +64,7 @@ import org.apache.spark.util.Utils
  *   SparkSession.builder()
  * .master("local")
  * .appName("Word Count")
- * .config("spark.some.config.option", "some-value").
+ * .config("spark.some.config.option", "some-value")
  * .getOrCreate()
  * }}}
  */


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org