spark git commit: [SPARK-11251] Fix page size calculation in local mode
Repository: spark Updated Branches: refs/heads/master 163d53e82 -> 34e71c6d8 [SPARK-11251] Fix page size calculation in local mode ``` // My machine only has 8 cores $ bin/spark-shell --master local[32] scala> val df = sc.parallelize(Seq((1, 1), (2, 2))).toDF("a", "b") scala> df.as("x").join(df.as("y"), $"x.a" === $"y.a").count() Caused by: java.io.IOException: Unable to acquire 2097152 bytes of memory at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351) ``` Author: Andrew Or Closes #9209 from andrewor14/fix-local-page-size. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/34e71c6d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/34e71c6d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/34e71c6d Branch: refs/heads/master Commit: 34e71c6d89c1f2b6236dbf0d75cd12da08003c84 Parents: 163d53e Author: Andrew Or Authored: Thu Oct 22 15:58:08 2015 -0700 Committer: Reynold Xin Committed: Thu Oct 22 15:58:08 2015 -0700 -- .../scala/org/apache/spark/SparkContext.scala | 48 ++-- .../main/scala/org/apache/spark/SparkEnv.scala | 4 +- .../OutputCommitCoordinatorSuite.scala | 3 +- 3 files changed, 40 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/34e71c6d/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ccba3ed..a6857b4 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -269,7 +269,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = { -SparkEnv.createDriverEnv(conf, isLocal, listenerBus) +SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master)) } private[spark] def env: SparkEnv = _env @@ -2561,24 +2561,28 @@ object SparkContext extends Logging { } /** + * The number of driver cores to use for execution in local mode, 0 otherwise. + */ + private[spark] def numDriverCores(master: String): Int = { +def convertToInt(threads: String): Int = { + if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt +} +master match { + case "local" => 1 + case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads) + case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads) + case _ => 0 // driver is not used for execution +} + } + + /** * Create a task scheduler based on a given master URL. * Return a 2-tuple of the scheduler backend and the task scheduler. */ private def createTaskScheduler( sc: SparkContext, master: String): (SchedulerBackend, TaskScheduler) = { -// Regular expression used for local[N] and local[*] master formats -val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r -// Regular expression for local[N, maxRetries], used in tests with failing tasks -val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r -// Regular expression for simulating a Spark cluster of [N, cores, memory] locally -val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r -// Regular expression for connecting to Spark deploy clusters -val SPARK_REGEX = """spark://(.*)""".r -// Regular expression for connection to Mesos cluster by mesos:// or zk:// url -val MESOS_REGEX = """(mesos|zk)://.*""".r -// Regular expression for connection to Simr cluster -val SIMR_REGEX = """simr://(.*)""".r +import SparkMasterRegex._ // When running locally, don't try to re-execute tasks on failure. val MAX_LOCAL_TASK_FAILURES = 1 @@ -2720,6 +2724,24 @@ object SparkContext extends Logging { } /** + * A collection of regexes for extracting information from the master string. + */ +private object SparkMasterRegex { + // Regular expression used for local[N] and local[*] master formats + val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r + // Regular expression for local[N, maxRetries], used in tests with failing tasks + val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r + // Regular expression for simulating a Spark cluster of [N, cores, memory] locally + val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r + // Regular expression for connecting to Spark deploy clusters + val SPARK_REGEX = """spark://(.*)"
spark git commit: [SPARK-11251] Fix page size calculation in local mode
Repository: spark Updated Branches: refs/heads/branch-1.5 e405c2a1f -> a76cf51ed [SPARK-11251] Fix page size calculation in local mode ``` // My machine only has 8 cores $ bin/spark-shell --master local[32] scala> val df = sc.parallelize(Seq((1, 1), (2, 2))).toDF("a", "b") scala> df.as("x").join(df.as("y"), $"x.a" === $"y.a").count() Caused by: java.io.IOException: Unable to acquire 2097152 bytes of memory at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351) ``` Author: Andrew Or Closes #9209 from andrewor14/fix-local-page-size. (cherry picked from commit 34e71c6d89c1f2b6236dbf0d75cd12da08003c84) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a76cf51e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a76cf51e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a76cf51e Branch: refs/heads/branch-1.5 Commit: a76cf51ed91d99c88f301ec85f3cda1288bcf346 Parents: e405c2a Author: Andrew Or Authored: Thu Oct 22 15:58:08 2015 -0700 Committer: Reynold Xin Committed: Thu Oct 22 15:58:17 2015 -0700 -- .../scala/org/apache/spark/SparkContext.scala | 48 ++-- .../main/scala/org/apache/spark/SparkEnv.scala | 4 +- .../OutputCommitCoordinatorSuite.scala | 3 +- 3 files changed, 40 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a76cf51e/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 2a2fa75..a8f6047 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -274,7 +274,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = { -SparkEnv.createDriverEnv(conf, isLocal, listenerBus) +SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master)) } private[spark] def env: SparkEnv = _env @@ -2548,24 +2548,28 @@ object SparkContext extends Logging { } /** + * The number of driver cores to use for execution in local mode, 0 otherwise. + */ + private[spark] def numDriverCores(master: String): Int = { +def convertToInt(threads: String): Int = { + if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt +} +master match { + case "local" => 1 + case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads) + case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads) + case _ => 0 // driver is not used for execution +} + } + + /** * Create a task scheduler based on a given master URL. * Return a 2-tuple of the scheduler backend and the task scheduler. */ private def createTaskScheduler( sc: SparkContext, master: String): (SchedulerBackend, TaskScheduler) = { -// Regular expression used for local[N] and local[*] master formats -val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r -// Regular expression for local[N, maxRetries], used in tests with failing tasks -val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r -// Regular expression for simulating a Spark cluster of [N, cores, memory] locally -val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r -// Regular expression for connecting to Spark deploy clusters -val SPARK_REGEX = """spark://(.*)""".r -// Regular expression for connection to Mesos cluster by mesos:// or zk:// url -val MESOS_REGEX = """(mesos|zk)://.*""".r -// Regular expression for connection to Simr cluster -val SIMR_REGEX = """simr://(.*)""".r +import SparkMasterRegex._ // When running locally, don't try to re-execute tasks on failure. val MAX_LOCAL_TASK_FAILURES = 1 @@ -2707,6 +2711,24 @@ object SparkContext extends Logging { } /** + * A collection of regexes for extracting information from the master string. + */ +private object SparkMasterRegex { + // Regular expression used for local[N] and local[*] master formats + val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r + // Regular expression for local[N, maxRetries], used in tests with failing tasks + val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r + // Regular expression for simulating a Spark cluster of [N, cores, memory] locally + val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""