spark git commit: [SPARK-11251] Fix page size calculation in local mode

2015-10-22 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 e405c2a1f -> a76cf51ed


[SPARK-11251] Fix page size calculation in local mode

```
// My machine only has 8 cores
$ bin/spark-shell --master local[32]
scala> val df = sc.parallelize(Seq((1, 1), (2, 2))).toDF("a", "b")
scala> df.as("x").join(df.as("y"), $"x.a" === $"y.a").count()

Caused by: java.io.IOException: Unable to acquire 2097152 bytes of memory
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351)
```

Author: Andrew Or 

Closes #9209 from andrewor14/fix-local-page-size.

(cherry picked from commit 34e71c6d89c1f2b6236dbf0d75cd12da08003c84)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a76cf51e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a76cf51e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a76cf51e

Branch: refs/heads/branch-1.5
Commit: a76cf51ed91d99c88f301ec85f3cda1288bcf346
Parents: e405c2a
Author: Andrew Or 
Authored: Thu Oct 22 15:58:08 2015 -0700
Committer: Reynold Xin 
Committed: Thu Oct 22 15:58:17 2015 -0700

--
 .../scala/org/apache/spark/SparkContext.scala   | 48 ++--
 .../main/scala/org/apache/spark/SparkEnv.scala  |  4 +-
 .../OutputCommitCoordinatorSuite.scala  |  3 +-
 3 files changed, 40 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a76cf51e/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 2a2fa75..a8f6047 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -274,7 +274,7 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
   conf: SparkConf,
   isLocal: Boolean,
   listenerBus: LiveListenerBus): SparkEnv = {
-SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
+SparkEnv.createDriverEnv(conf, isLocal, listenerBus, 
SparkContext.numDriverCores(master))
   }
 
   private[spark] def env: SparkEnv = _env
@@ -2548,24 +2548,28 @@ object SparkContext extends Logging {
   }
 
   /**
+   * The number of driver cores to use for execution in local mode, 0 
otherwise.
+   */
+  private[spark] def numDriverCores(master: String): Int = {
+def convertToInt(threads: String): Int = {
+  if (threads == "*") Runtime.getRuntime.availableProcessors() else 
threads.toInt
+}
+master match {
+  case "local" => 1
+  case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads)
+  case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => 
convertToInt(threads)
+  case _ => 0 // driver is not used for execution
+}
+  }
+
+  /**
* Create a task scheduler based on a given master URL.
* Return a 2-tuple of the scheduler backend and the task scheduler.
*/
   private def createTaskScheduler(
   sc: SparkContext,
   master: String): (SchedulerBackend, TaskScheduler) = {
-// Regular expression used for local[N] and local[*] master formats
-val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
-// Regular expression for local[N, maxRetries], used in tests with failing 
tasks
-val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
-// Regular expression for simulating a Spark cluster of [N, cores, memory] 
locally
-val LOCAL_CLUSTER_REGEX = 
"""local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
-// Regular expression for connecting to Spark deploy clusters
-val SPARK_REGEX = """spark://(.*)""".r
-// Regular expression for connection to Mesos cluster by mesos:// or zk:// 
url
-val MESOS_REGEX = """(mesos|zk)://.*""".r
-// Regular expression for connection to Simr cluster
-val SIMR_REGEX = """simr://(.*)""".r
+import SparkMasterRegex._
 
 // When running locally, don't try to re-execute tasks on failure.
 val MAX_LOCAL_TASK_FAILURES = 1
@@ -2707,6 +2711,24 @@ object SparkContext extends Logging {
 }
 
 /**
+ * A collection of regexes for extracting information from the master string.
+ */
+private object SparkMasterRegex {
+  // Regular expression used for local[N] and local[*] master formats
+  val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
+  // Regular expression for local[N, maxRetries], used in tests with failing 
tasks
+  val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
+  // Regular expression for simulating a Spark cluster of [N, cores, memory] 
locally
+  val 

spark git commit: [SPARK-11251] Fix page size calculation in local mode

2015-10-22 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 163d53e82 -> 34e71c6d8


[SPARK-11251] Fix page size calculation in local mode

```
// My machine only has 8 cores
$ bin/spark-shell --master local[32]
scala> val df = sc.parallelize(Seq((1, 1), (2, 2))).toDF("a", "b")
scala> df.as("x").join(df.as("y"), $"x.a" === $"y.a").count()

Caused by: java.io.IOException: Unable to acquire 2097152 bytes of memory
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351)
```

Author: Andrew Or 

Closes #9209 from andrewor14/fix-local-page-size.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/34e71c6d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/34e71c6d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/34e71c6d

Branch: refs/heads/master
Commit: 34e71c6d89c1f2b6236dbf0d75cd12da08003c84
Parents: 163d53e
Author: Andrew Or 
Authored: Thu Oct 22 15:58:08 2015 -0700
Committer: Reynold Xin 
Committed: Thu Oct 22 15:58:08 2015 -0700

--
 .../scala/org/apache/spark/SparkContext.scala   | 48 ++--
 .../main/scala/org/apache/spark/SparkEnv.scala  |  4 +-
 .../OutputCommitCoordinatorSuite.scala  |  3 +-
 3 files changed, 40 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/34e71c6d/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ccba3ed..a6857b4 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -269,7 +269,7 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
   conf: SparkConf,
   isLocal: Boolean,
   listenerBus: LiveListenerBus): SparkEnv = {
-SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
+SparkEnv.createDriverEnv(conf, isLocal, listenerBus, 
SparkContext.numDriverCores(master))
   }
 
   private[spark] def env: SparkEnv = _env
@@ -2561,24 +2561,28 @@ object SparkContext extends Logging {
   }
 
   /**
+   * The number of driver cores to use for execution in local mode, 0 
otherwise.
+   */
+  private[spark] def numDriverCores(master: String): Int = {
+def convertToInt(threads: String): Int = {
+  if (threads == "*") Runtime.getRuntime.availableProcessors() else 
threads.toInt
+}
+master match {
+  case "local" => 1
+  case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads)
+  case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => 
convertToInt(threads)
+  case _ => 0 // driver is not used for execution
+}
+  }
+
+  /**
* Create a task scheduler based on a given master URL.
* Return a 2-tuple of the scheduler backend and the task scheduler.
*/
   private def createTaskScheduler(
   sc: SparkContext,
   master: String): (SchedulerBackend, TaskScheduler) = {
-// Regular expression used for local[N] and local[*] master formats
-val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
-// Regular expression for local[N, maxRetries], used in tests with failing 
tasks
-val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
-// Regular expression for simulating a Spark cluster of [N, cores, memory] 
locally
-val LOCAL_CLUSTER_REGEX = 
"""local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
-// Regular expression for connecting to Spark deploy clusters
-val SPARK_REGEX = """spark://(.*)""".r
-// Regular expression for connection to Mesos cluster by mesos:// or zk:// 
url
-val MESOS_REGEX = """(mesos|zk)://.*""".r
-// Regular expression for connection to Simr cluster
-val SIMR_REGEX = """simr://(.*)""".r
+import SparkMasterRegex._
 
 // When running locally, don't try to re-execute tasks on failure.
 val MAX_LOCAL_TASK_FAILURES = 1
@@ -2720,6 +2724,24 @@ object SparkContext extends Logging {
 }
 
 /**
+ * A collection of regexes for extracting information from the master string.
+ */
+private object SparkMasterRegex {
+  // Regular expression used for local[N] and local[*] master formats
+  val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
+  // Regular expression for local[N, maxRetries], used in tests with failing 
tasks
+  val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
+  // Regular expression for simulating a Spark cluster of [N, cores, memory] 
locally
+  val LOCAL_CLUSTER_REGEX = 
"""local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
+  // Regular expression for