spark git commit: [SPARK-8099] set executor cores into system in yarn-cluster mode

2015-06-05 Thread sandy
Repository: spark
Updated Branches:
  refs/heads/master 4036d05ce -> 0992a0a77


[SPARK-8099] set executor cores into system in yarn-cluster mode

Author: Xutingjun 
Author: xutingjun 

Closes #6643 from XuTingjun/SPARK-8099 and squashes the following commits:

80b18cd [Xutingjun] change to STANDALONE | YARN
ce33148 [Xutingjun] set executor cores into system
e51cc9e [Xutingjun] set executor cores into system
0600861 [xutingjun] set executor cores into system


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0992a0a7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0992a0a7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0992a0a7

Branch: refs/heads/master
Commit: 0992a0a77d38081c6c206bb34333013125d85376
Parents: 4036d05
Author: Xutingjun 
Authored: Fri Jun 5 11:41:39 2015 -0700
Committer: Sandy Ryza 
Committed: Fri Jun 5 11:41:39 2015 -0700

--
 core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0992a0a7/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 3aa3f94..a0eae77 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -425,7 +425,6 @@ object SparkSubmit {
   // Yarn client only
   OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
   OptionAssigner(args.numExecutors, YARN, CLIENT, sysProp = 
"spark.executor.instances"),
-  OptionAssigner(args.executorCores, YARN, CLIENT, sysProp = 
"spark.executor.cores"),
   OptionAssigner(args.files, YARN, CLIENT, sysProp = 
"spark.yarn.dist.files"),
   OptionAssigner(args.archives, YARN, CLIENT, sysProp = 
"spark.yarn.dist.archives"),
   OptionAssigner(args.principal, YARN, CLIENT, sysProp = 
"spark.yarn.principal"),
@@ -446,7 +445,7 @@ object SparkSubmit {
   OptionAssigner(args.keytab, YARN, CLUSTER, clOption = "--keytab"),
 
   // Other options
-  OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES,
+  OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES,
 sysProp = "spark.executor.cores"),
   OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, 
ALL_DEPLOY_MODES,
 sysProp = "spark.executor.memory"),


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-7699] [CORE] Lazy start the scheduler for dynamic allocation

2015-06-05 Thread sandy
Repository: spark
Updated Branches:
  refs/heads/master 0992a0a77 -> 3f80bc841


[SPARK-7699] [CORE] Lazy start the scheduler for dynamic allocation

This patch propose to lazy start the scheduler for dynamic allocation to avoid 
fast ramp down executor numbers is load is less.

This implementation will:
1. immediately start the scheduler is `numExecutorsTarget` is 0, this is the 
expected behavior.
2. if `numExecutorsTarget` is not zero, start the scheduler until the number is 
satisfied, if the load is less, this initial started executors will last for at 
least 60 seconds, user will have a window to submit a job, no need to revamp 
the executors.
3. if `numExecutorsTarget` is not satisfied until the timeout, this means 
resource is not enough, the scheduler will start until this timeout, will not 
wait infinitely.

Please help to review, thanks a lot.

Author: jerryshao 

Closes #6430 from jerryshao/SPARK-7699 and squashes the following commits:

02cac8e [jerryshao] Address the comments
7242450 [jerryshao] Remove the useless import
ecc0b00 [jerryshao] Address the comments
6f75f00 [jerryshao] Style changes
8b8decc [jerryshao] change the test name
fb822ca [jerryshao] Change the solution according to comments
1cc74e5 [jerryshao] Lazy start the scheduler for dynamic allocation


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f80bc84
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f80bc84
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f80bc84

Branch: refs/heads/master
Commit: 3f80bc841ab155925fb0530eef5927990f4a5793
Parents: 0992a0a
Author: jerryshao 
Authored: Fri Jun 5 12:28:37 2015 -0700
Committer: Sandy Ryza 
Committed: Fri Jun 5 12:28:37 2015 -0700

--
 .../spark/ExecutorAllocationManager.scala   | 17 +++-
 .../spark/ExecutorAllocationManagerSuite.scala  | 90 
 2 files changed, 89 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3f80bc84/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index f7323a4..9939103 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -150,6 +150,13 @@ private[spark] class ExecutorAllocationManager(
   // Metric source for ExecutorAllocationManager to expose internal status to 
MetricsSystem.
   val executorAllocationManagerSource = new ExecutorAllocationManagerSource
 
+  // Whether we are still waiting for the initial set of executors to be 
allocated.
+  // While this is true, we will not cancel outstanding executor requests. 
This is
+  // set to false when:
+  //   (1) a stage is submitted, or
+  //   (2) an executor idle timeout has elapsed.
+  @volatile private var initializing: Boolean = true
+
   /**
* Verify that the settings specified through the config are valid.
* If not, throw an appropriate exception.
@@ -240,6 +247,7 @@ private[spark] class ExecutorAllocationManager(
 removeTimes.retain { case (executorId, expireTime) =>
   val expired = now >= expireTime
   if (expired) {
+initializing = false
 removeExecutor(executorId)
   }
   !expired
@@ -261,7 +269,11 @@ private[spark] class ExecutorAllocationManager(
   private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {
 val maxNeeded = maxNumExecutorsNeeded
 
-if (maxNeeded < numExecutorsTarget) {
+if (initializing) {
+  // Do not change our target while we are still initializing,
+  // Otherwise the first job may have to ramp up unnecessarily
+  0
+} else if (maxNeeded < numExecutorsTarget) {
   // The target number exceeds the number we actually need, so stop adding 
new
   // executors and inform the cluster manager to cancel the extra pending 
requests
   val oldNumExecutorsTarget = numExecutorsTarget
@@ -271,7 +283,7 @@ private[spark] class ExecutorAllocationManager(
   // If the new target has not changed, avoid sending a message to the 
cluster manager
   if (numExecutorsTarget < oldNumExecutorsTarget) {
 client.requestTotalExecutors(numExecutorsTarget)
-logInfo(s"Lowering target number of executors to $numExecutorsTarget 
(previously " +
+logDebug(s"Lowering target number of executors to $numExecutorsTarget 
(previously " +
   s"$oldNumExecutorsTarget) because not all requested executors are 
actually needed")
   }
   numExecutorsTarget - oldNumExecutorsTarget
@@ -481,6 +493,7 @

spark git commit: [SPARK-9782] [YARN] Support YARN application tags via SparkConf

2015-08-18 Thread sandy
Repository: spark
Updated Branches:
  refs/heads/master 80cb25b22 -> 9b731fad2


[SPARK-9782] [YARN] Support YARN application tags via SparkConf

Add a new test case in yarn/ClientSuite which checks how the various SparkConf
and ClientArguments propagate into the ApplicationSubmissionContext.

Author: Dennis Huo 

Closes #8072 from dennishuo/dhuo-yarn-application-tags.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b731fad
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b731fad
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b731fad

Branch: refs/heads/master
Commit: 9b731fad2b43ca18f3c5274062d4c7bc2622ab72
Parents: 80cb25b
Author: Dennis Huo 
Authored: Tue Aug 18 14:34:20 2015 -0700
Committer: Sandy Ryza 
Committed: Tue Aug 18 14:34:20 2015 -0700

--
 docs/running-on-yarn.md |  8 +
 .../org/apache/spark/deploy/yarn/Client.scala   | 21 
 .../apache/spark/deploy/yarn/ClientSuite.scala  | 36 
 3 files changed, 65 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9b731fad/docs/running-on-yarn.md
--
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index ec32c41..8ac26e9 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -320,6 +320,14 @@ If you need a reference to the proper location to put log 
files in the YARN so t
   
 
 
+  spark.yarn.tags
+  (none)
+  
+  Comma-separated list of strings to pass through as YARN application tags 
appearing
+  in YARN ApplicationReports, which can be used for filtering when querying 
YARN apps.
+  
+
+
   spark.yarn.keytab
   (none)
   

http://git-wip-us.apache.org/repos/asf/spark/blob/9b731fad/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
--
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 6d63dda..5c6a716 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -163,6 +163,23 @@ private[spark] class Client(
 appContext.setQueue(args.amQueue)
 appContext.setAMContainerSpec(containerContext)
 appContext.setApplicationType("SPARK")
+sparkConf.getOption(CONF_SPARK_YARN_APPLICATION_TAGS)
+  .map(StringUtils.getTrimmedStringCollection(_))
+  .filter(!_.isEmpty())
+  .foreach { tagCollection =>
+try {
+  // The setApplicationTags method was only introduced in Hadoop 2.4+, 
so we need to use
+  // reflection to set it, printing a warning if a tag was specified 
but the YARN version
+  // doesn't support it.
+  val method = appContext.getClass().getMethod(
+"setApplicationTags", classOf[java.util.Set[String]])
+  method.invoke(appContext, new 
java.util.HashSet[String](tagCollection))
+} catch {
+  case e: NoSuchMethodException =>
+logWarning(s"Ignoring $CONF_SPARK_YARN_APPLICATION_TAGS because 
this version of " +
+  "YARN does not support it")
+}
+  }
 sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt) match {
   case Some(v) => appContext.setMaxAppAttempts(v)
   case None => logDebug("spark.yarn.maxAppAttempts is not set. " +
@@ -987,6 +1004,10 @@ object Client extends Logging {
   // of the executors
   val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"
 
+  // Comma-separated list of strings to pass through as YARN application tags 
appearing
+  // in YARN ApplicationReports, which can be used for filtering when querying 
YARN.
+  val CONF_SPARK_YARN_APPLICATION_TAGS = "spark.yarn.tags"
+
   // Staging directory is private! -> rwx
   val STAGING_DIR_PERMISSION: FsPermission =
 FsPermission.createImmutable(Integer.parseInt("700", 8).toShort)

http://git-wip-us.apache.org/repos/asf/spark/blob/9b731fad/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
--
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 837f8d3..0a5402c 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -29,8 +29,11 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import or

spark git commit: [SPARK-7451] [YARN] Preemption of executors is counted as failure causing Spark job to fail

2015-05-08 Thread sandy
Repository: spark
Updated Branches:
  refs/heads/master 84bf931f3 -> b6c797b08


[SPARK-7451] [YARN] Preemption of executors is counted as failure causing Spark 
job to fail

Added a check to handle container exit status for the preemption scenario, log 
an INFO message in such cases and move on.
andrewor14

Author: Ashwin Shankar 

Closes #5993 from ashwinshankar77/SPARK-7451 and squashes the following commits:

90900cf [Ashwin Shankar] Fix log info message
cf8b6cf [Ashwin Shankar] Stop counting preemption of executors as failure


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b6c797b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b6c797b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b6c797b0

Branch: refs/heads/master
Commit: b6c797b08cbd08d7aab59ad0106af0f5f41ef186
Parents: 84bf931
Author: Ashwin Shankar 
Authored: Fri May 8 17:51:00 2015 -0700
Committer: Sandy Ryza 
Committed: Fri May 8 17:51:00 2015 -0700

--
 .../main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala  | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b6c797b0/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
--
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index b8f42da..88d68d5 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -373,7 +373,9 @@ private[yarn] class YarnAllocator(
 // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
 // there are some exit status' we shouldn't necessarily count against 
us, but for
 // now I think its ok as none of the containers are expected to exit
-if (completedContainer.getExitStatus == -103) { // vmem limit exceeded
+if (completedContainer.getExitStatus == ContainerExitStatus.PREEMPTED) 
{
+  logInfo("Container preempted: " + containerId)
+} else if (completedContainer.getExitStatus == -103) { // vmem limit 
exceeded
   logWarning(memLimitExceededLogMessage(
 completedContainer.getDiagnostics,
 VMEM_EXCEEDED_PATTERN))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-7451] [YARN] Preemption of executors is counted as failure causing Spark job to fail

2015-05-08 Thread sandy
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 85cab3482 -> 959c7b6ca


[SPARK-7451] [YARN] Preemption of executors is counted as failure causing Spark 
job to fail

Added a check to handle container exit status for the preemption scenario, log 
an INFO message in such cases and move on.
andrewor14

Author: Ashwin Shankar 

Closes #5993 from ashwinshankar77/SPARK-7451 and squashes the following commits:

90900cf [Ashwin Shankar] Fix log info message
cf8b6cf [Ashwin Shankar] Stop counting preemption of executors as failure
(cherry picked from commit b6c797b08cbd08d7aab59ad0106af0f5f41ef186)

Signed-off-by: Sandy Ryza 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/959c7b6c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/959c7b6c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/959c7b6c

Branch: refs/heads/branch-1.4
Commit: 959c7b6ca422082de9e89703e3c7b926d93d30d1
Parents: 85cab34
Author: Ashwin Shankar 
Authored: Fri May 8 17:51:00 2015 -0700
Committer: Sandy Ryza 
Committed: Fri May 8 17:51:46 2015 -0700

--
 .../main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala  | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/959c7b6c/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
--
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index b8f42da..88d68d5 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -373,7 +373,9 @@ private[yarn] class YarnAllocator(
 // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
 // there are some exit status' we shouldn't necessarily count against 
us, but for
 // now I think its ok as none of the containers are expected to exit
-if (completedContainer.getExitStatus == -103) { // vmem limit exceeded
+if (completedContainer.getExitStatus == ContainerExitStatus.PREEMPTED) 
{
+  logInfo("Container preempted: " + containerId)
+} else if (completedContainer.getExitStatus == -103) { // vmem limit 
exceeded
   logWarning(memLimitExceededLogMessage(
 completedContainer.getDiagnostics,
 VMEM_EXCEEDED_PATTERN))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-6470] [YARN] Add support for YARN node labels.

2015-05-11 Thread sandy
Repository: spark
Updated Branches:
  refs/heads/master 0a4844f90 -> 82fee9d9a


[SPARK-6470] [YARN] Add support for YARN node labels.

This is difficult to write a test for because it relies on the latest version 
of YARN, but I verified manually that the patch does pass along the label 
expression on this version and containers are successfully launched.

Author: Sandy Ryza 

Closes #5242 from sryza/sandy-spark-6470 and squashes the following commits:

6af87b9 [Sandy Ryza] Change info to warning
6e22d99 [Sandy Ryza] [YARN] SPARK-6470.  Add support for YARN node labels.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/82fee9d9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/82fee9d9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/82fee9d9

Branch: refs/heads/master
Commit: 82fee9d9aad2c9ba2fb4bd658579fe99218cafac
Parents: 0a4844f
Author: Sandy Ryza 
Authored: Mon May 11 12:09:39 2015 -0700
Committer: Sandy Ryza 
Committed: Mon May 11 12:09:39 2015 -0700

--
 docs/running-on-yarn.md |  9 ++
 .../spark/deploy/yarn/YarnAllocator.scala   | 31 +++-
 2 files changed, 39 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/82fee9d9/docs/running-on-yarn.md
--
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 4fb4a90..51c1339 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -220,6 +220,15 @@ Most of the configs are the same for Spark on YARN as for 
other deployment modes
   Otherwise, the client process will exit after submission.
   
 
+
+  spark.yarn.executor.nodeLabelExpression
+  (none)
+  
+  A YARN node label expression that restricts the set of nodes executors will 
be scheduled on.
+  Only versions of YARN greater than or equal to 2.6 support node label 
expressions, so when
+  running against earlier versions, this property will be ignored.
+  
+
 
 
 # Launching Spark on YARN

http://git-wip-us.apache.org/repos/asf/spark/blob/82fee9d9/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
--
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 88d68d5..8a08f56 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -117,6 +117,24 @@ private[yarn] class YarnAllocator(
   // For testing
   private val launchContainers = 
sparkConf.getBoolean("spark.yarn.launchContainers", true)
 
+  private val labelExpression = 
sparkConf.getOption("spark.yarn.executor.nodeLabelExpression")
+
+  // ContainerRequest constructor that can take a node label expression. We 
grab it through
+  // reflection because it's only available in later versions of YARN.
+  private val nodeLabelConstructor = labelExpression.flatMap { expr =>
+try {
+  Some(classOf[ContainerRequest].getConstructor(classOf[Resource],
+classOf[Array[String]], classOf[Array[String]], classOf[Priority], 
classOf[Boolean],
+classOf[String]))
+} catch {
+  case e: NoSuchMethodException => {
+logWarning(s"Node label expression $expr will be ignored because YARN 
version on" +
+  " classpath does not support it.")
+None
+  }
+}
+  }
+
   def getNumExecutorsRunning: Int = numExecutorsRunning
 
   def getNumExecutorsFailed: Int = numExecutorsFailed
@@ -211,7 +229,7 @@ private[yarn] class YarnAllocator(
 s"cores and ${resource.getMemory} MB memory including $memoryOverhead 
MB overhead")
 
   for (i <- 0 until missing) {
-val request = new ContainerRequest(resource, null, null, 
RM_REQUEST_PRIORITY)
+val request = createContainerRequest(resource)
 amClient.addContainerRequest(request)
 val nodes = request.getNodes
 val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.last
@@ -231,6 +249,17 @@ private[yarn] class YarnAllocator(
   }
 
   /**
+   * Creates a container request, handling the reflection required to use YARN 
features that were
+   * added in recent versions.
+   */
+  private def createContainerRequest(resource: Resource): ContainerRequest = {
+nodeLabelConstructor.map { constructor =>
+  constructor.newInstance(resource, null, null, RM_REQUEST_PRIORITY, true: 
java.lang.Boolean,
+labelExpression.orNull)
+}.getOrElse(new ContainerRequest(resource, null, null, 
RM_REQUEST_PRIORITY))
+  }
+
+  /**
* Handle containers granted by the R

spark git commit: [SPARK-7515] [DOC] Update documentation for PySpark on YARN with cluster mode

2015-05-11 Thread sandy
Repository: spark
Updated Branches:
  refs/heads/master 7ce2a33c3 -> 6e9910c21


[SPARK-7515] [DOC] Update documentation for PySpark on YARN with cluster mode

Now PySpark on YARN with cluster mode is supported so let's update doc.

Author: Kousuke Saruta 

Closes #6040 from sarutak/update-doc-for-pyspark-on-yarn and squashes the 
following commits:

ad9f88c [Kousuke Saruta] Brushed up sentences
469fd2e [Kousuke Saruta] Merge branch 'master' of 
https://github.com/apache/spark into update-doc-for-pyspark-on-yarn
fcfdb92 [Kousuke Saruta] Updated doc for PySpark on YARN with cluster mode


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e9910c2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e9910c2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e9910c2

Branch: refs/heads/master
Commit: 6e9910c21ada19ae14122b5f14a6a30845b98229
Parents: 7ce2a33
Author: Kousuke Saruta 
Authored: Mon May 11 14:19:11 2015 -0700
Committer: Sandy Ryza 
Committed: Mon May 11 14:19:11 2015 -0700

--
 docs/submitting-applications.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6e9910c2/docs/submitting-applications.md
--
diff --git a/docs/submitting-applications.md b/docs/submitting-applications.md
index 3ecbf23..e586452 100644
--- a/docs/submitting-applications.md
+++ b/docs/submitting-applications.md
@@ -59,7 +59,7 @@ for applications that involve the REPL (e.g. Spark shell).
 Alternatively, if your application is submitted from a machine far from the 
worker machines (e.g.
 locally on your laptop), it is common to use `cluster` mode to minimize 
network latency between
 the drivers and the executors. Note that `cluster` mode is currently not 
supported for
-Mesos clusters or Python applications.
+Mesos clusters. Currently only YARN supports cluster mode for Python 
applications.
 
 For Python applications, simply pass a `.py` file in the place of 
`` instead of a JAR,
 and add Python `.zip`, `.egg` or `.py` files to the search path with 
`--py-files`.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4352] [YARN] [WIP] Incorporate locality preferences in dynamic allocation requests

2015-07-27 Thread sandy
Repository: spark
Updated Branches:
  refs/heads/master 2104931d7 -> ab6259566


[SPARK-4352] [YARN] [WIP] Incorporate locality preferences in dynamic 
allocation requests

Currently there's no locality preference for container request in YARN mode, 
this will affect the performance if fetching data remotely, so here proposed to 
add locality in Yarn dynamic allocation mode.

Ping sryza, please help to review, thanks a lot.

Author: jerryshao 

Closes #6394 from jerryshao/SPARK-4352 and squashes the following commits:

d45fecb [jerryshao] Add documents
6c3fe5c [jerryshao] Fix bug
8db6c0e [jerryshao] Further address the comments
2e2b2cb [jerryshao] Fix rebase compiling problem
ce5f096 [jerryshao] Fix style issue
7f7df95 [jerryshao] Fix rebase issue
9ca9e07 [jerryshao] Code refactor according to comments
d3e4236 [jerryshao] Further address the comments
5e7a593 [jerryshao] Fix bug introduced code rebase
9ca7783 [jerryshao] Style changes
08317f9 [jerryshao] code and comment refines
65b2423 [jerryshao] Further address the comments
a27c587 [jerryshao] address the comment
27faabc [jerryshao] redundant code remove
9ce06a1 [jerryshao] refactor the code
f5ba27b [jerryshao] Style fix
2c6cc8a [jerryshao] Fix bug and add unit tests
0757335 [jerryshao] Consider the distribution of existed containers to 
recalculate the new container requests
0ad66ff [jerryshao] Fix compile bugs
1c20381 [jerryshao] Minor fix
5ef2dc8 [jerryshao] Add docs and improve the code
3359814 [jerryshao] Fix rebase and test bugs
0398539 [jerryshao] reinitialize the new implementation
67596d6 [jerryshao] Still fix the code
654e1d2 [jerryshao] Fix some bugs
45b1c89 [jerryshao] Further polish the algorithm
dea0152 [jerryshao] Enable node locality information in YarnAllocator
74bbcc6 [jerryshao] Support node locality for dynamic allocation initial commit


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab625956
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab625956
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab625956

Branch: refs/heads/master
Commit: ab625956616664c2b4861781a578311da75a9ae4
Parents: 2104931
Author: jerryshao 
Authored: Mon Jul 27 15:46:35 2015 -0700
Committer: Sandy Ryza 
Committed: Mon Jul 27 15:46:35 2015 -0700

--
 .../apache/spark/ExecutorAllocationClient.scala |  18 +-
 .../spark/ExecutorAllocationManager.scala   |  62 ++-
 .../scala/org/apache/spark/SparkContext.scala   |  25 ++-
 .../apache/spark/scheduler/DAGScheduler.scala   |  26 ++-
 .../org/apache/spark/scheduler/Stage.scala  |   7 +-
 .../org/apache/spark/scheduler/StageInfo.scala  |  13 +-
 .../cluster/CoarseGrainedClusterMessage.scala   |   6 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala |  32 +++-
 .../cluster/YarnSchedulerBackend.scala  |   3 +-
 .../spark/ExecutorAllocationManagerSuite.scala  |  55 +-
 .../apache/spark/HeartbeatReceiverSuite.scala   |   7 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   |   5 +-
 ...ityPreferredContainerPlacementStrategy.scala | 182 +++
 .../spark/deploy/yarn/YarnAllocator.scala   |  47 -
 .../yarn/ContainerPlacementStrategySuite.scala  | 125 +
 .../spark/deploy/yarn/YarnAllocatorSuite.scala  |  14 +-
 16 files changed, 578 insertions(+), 49 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ab625956/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
index 443830f..842bfdb 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -24,11 +24,23 @@ package org.apache.spark
 private[spark] trait ExecutorAllocationClient {
 
   /**
-   * Express a preference to the cluster manager for a given total number of 
executors.
-   * This can result in canceling pending requests or filing additional 
requests.
+   * Update the cluster manager on our scheduling needs. Three bits of 
information are included
+   * to help it make decisions.
+   * @param numExecutors The total number of executors we'd like to have. The 
cluster manager
+   * shouldn't kill any running executor to reach this 
number, but,
+   * if all existing executors were to die, this is the 
number of executors
+   * we'd want to be allocated.
+   * @param localityAwareTasks The number of tasks in all active stages that 
have a locality
+   *   preferences. This includes running, pending, 
and completed tasks.
+   * @param hos