[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-17 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-219756624
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/58694/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-17 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-219756618
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-17 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-219756200
  
**[Test build #58694 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/58694/consoleFull)**
 for PR 11157 at commit 
[`47d6a9f`](https://github.com/apache/spark/commit/47d6a9f376f7690e75de32c14742a1ff34a58280).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-17 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-219716919
  
**[Test build #58694 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/58694/consoleFull)**
 for PR 11157 at commit 
[`47d6a9f`](https://github.com/apache/spark/commit/47d6a9f376f7690e75de32c14742a1ff34a58280).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-10 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r62743938
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -395,24 +402,29 @@ private[spark] class CoarseMesosSchedulerBackend(
 
   val (afterCPUResources, cpuResourcesToUse) =
 partitionResources(resources, "cpus", taskCPUs)
-  val (resourcesLeft, memResourcesToUse) =
+  val (afterMemResources, memResourcesToUse) =
 partitionResources(afterCPUResources.asJava, "mem", taskMemory)
+  // process port offers
+  val (resourcesLeft, portResources) = 
getPortResources(afterMemResources)
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-10 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r6272
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -356,4 +374,229 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)]): 
Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource])
+: (List[Resource], List[Resource], List[Long]) = {
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong,
+  conf.getInt("spark.blockManager.port", 0).toLong)
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+// reserve non zero ports first
+val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts)
+// reserve actual port numbers for zero ports - not set by the user
+val numOfZeroPorts = portsToCheck.count(_ == 0)
+val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, 
numOfZeroPorts)
+val zeroResources = reservePorts(nonZeroResources._1, randPorts)
+val (resourcesLeft, resourcesToBeUsed) = 
createResources(nonZeroResources, zeroResources)
+(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts)
+  }
+
+  private def createResources(
+  nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]),
+  zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]))
+: (List[Resource], List[Resource]) = {
+val resources = {
+  if (nonZero._2.isEmpty) { // no user ports were defined
+(zero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+
+  } else if (zero._2.isEmpty) { // no random ports were defined
+(nonZero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  nonZero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+  }
+  else {  // we have user defined and random ports defined
+val left = zero._1.flatMap{port => 
createMesosPortResource(port.value, Some(port.role))}
+val used = nonZero._2.flatMap{port =>
+  createMesosPortResource(port.value, Some(port.role))} ++
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))}
+(left, used)
+  }
+}
+resources
+  }
+
+  private case class PortRangeResourceInfo(role: String, value: 
List[(Long, Long)])
+
+  private def getRangeResourceWithRoleInfo(res: JList[Resource], name: 
String)
+: List[PortRangeResourceInfo] = {
+// A resource can have multiple values in the offer since it can 
either be from
+// a specific role or wildcard.
+// Extract role info and port range for every port resource in the 
offer
+res.asScala.filter(_.getName == name)
+  .map{res => PortRangeResourceInfo(res.getRole, 
res.getRanges.getRangeList.asScala
+.map(r => (r.getBegin, r.getEnd)).toList) }.toList
+  }
+
+  /** Helper method to get a pair of assigned and remaining ports along 
with role info */
+  private def reservePorts(
+  availablePortRanges: List[PortRangeResourceInfo],
+  wantedPorts: List[Long])
+: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]) = {
+if (wantedPorts.isEmpty) { // port list is empty we didnt consume any 
resources
+ 

[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-10 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r62744256
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -356,4 +374,229 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)]): 
Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource])
+: (List[Resource], List[Resource], List[Long]) = {
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong,
+  conf.getInt("spark.blockManager.port", 0).toLong)
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+// reserve non zero ports first
+val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts)
+// reserve actual port numbers for zero ports - not set by the user
+val numOfZeroPorts = portsToCheck.count(_ == 0)
+val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, 
numOfZeroPorts)
+val zeroResources = reservePorts(nonZeroResources._1, randPorts)
+val (resourcesLeft, resourcesToBeUsed) = 
createResources(nonZeroResources, zeroResources)
+(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts)
+  }
+
+  private def createResources(
+  nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]),
+  zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]))
+: (List[Resource], List[Resource]) = {
+val resources = {
+  if (nonZero._2.isEmpty) { // no user ports were defined
+(zero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+
+  } else if (zero._2.isEmpty) { // no random ports were defined
+(nonZero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  nonZero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+  }
+  else {  // we have user defined and random ports defined
+val left = zero._1.flatMap{port => 
createMesosPortResource(port.value, Some(port.role))}
+val used = nonZero._2.flatMap{port =>
+  createMesosPortResource(port.value, Some(port.role))} ++
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))}
+(left, used)
+  }
+}
+resources
+  }
+
+  private case class PortRangeResourceInfo(role: String, value: 
List[(Long, Long)])
+
+  private def getRangeResourceWithRoleInfo(res: JList[Resource], name: 
String)
+: List[PortRangeResourceInfo] = {
+// A resource can have multiple values in the offer since it can 
either be from
+// a specific role or wildcard.
+// Extract role info and port range for every port resource in the 
offer
+res.asScala.filter(_.getName == name)
+  .map{res => PortRangeResourceInfo(res.getRole, 
res.getRanges.getRangeList.asScala
+.map(r => (r.getBegin, r.getEnd)).toList) }.toList
+  }
+
+  /** Helper method to get a pair of assigned and remaining ports along 
with role info */
+  private def reservePorts(
+  availablePortRanges: List[PortRangeResourceInfo],
+  wantedPorts: List[Long])
+: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]) = {
+if (wantedPorts.isEmpty) { // port list is empty we didnt consume any 
resources
+ 

[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-10 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r62743925
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -395,24 +402,29 @@ private[spark] class CoarseMesosSchedulerBackend(
 
   val (afterCPUResources, cpuResourcesToUse) =
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-10 Thread skonto
Github user skonto commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-218277517
  
Ok but could you be more specific to your comments because its not always 
clear what is the problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-10 Thread mgummelt
Github user mgummelt commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-218277321
  
So?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-10 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r62743444
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -46,6 +46,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
 
--- End diff --

The additions to this file seem overly complex for just grabbing port 
resources.  Maybe all this complexity is necessary, but I'm skeptical.  I 
started in on it and got lost.  I don't have time to review it all today.  
Please ensure there is no more room for simplification.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-10 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r62743066
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -356,4 +374,229 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)]): 
Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource])
+: (List[Resource], List[Resource], List[Long]) = {
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong,
+  conf.getInt("spark.blockManager.port", 0).toLong)
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+// reserve non zero ports first
+val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts)
+// reserve actual port numbers for zero ports - not set by the user
+val numOfZeroPorts = portsToCheck.count(_ == 0)
+val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, 
numOfZeroPorts)
+val zeroResources = reservePorts(nonZeroResources._1, randPorts)
+val (resourcesLeft, resourcesToBeUsed) = 
createResources(nonZeroResources, zeroResources)
+(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts)
+  }
+
+  private def createResources(
+  nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]),
+  zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]))
+: (List[Resource], List[Resource]) = {
+val resources = {
+  if (nonZero._2.isEmpty) { // no user ports were defined
+(zero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+
+  } else if (zero._2.isEmpty) { // no random ports were defined
+(nonZero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  nonZero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+  }
+  else {  // we have user defined and random ports defined
+val left = zero._1.flatMap{port => 
createMesosPortResource(port.value, Some(port.role))}
+val used = nonZero._2.flatMap{port =>
+  createMesosPortResource(port.value, Some(port.role))} ++
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))}
+(left, used)
+  }
+}
+resources
+  }
+
+  private case class PortRangeResourceInfo(role: String, value: 
List[(Long, Long)])
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-10 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r62743126
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -356,4 +374,229 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)]): 
Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource])
+: (List[Resource], List[Resource], List[Long]) = {
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong,
+  conf.getInt("spark.blockManager.port", 0).toLong)
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+// reserve non zero ports first
+val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts)
+// reserve actual port numbers for zero ports - not set by the user
+val numOfZeroPorts = portsToCheck.count(_ == 0)
+val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, 
numOfZeroPorts)
+val zeroResources = reservePorts(nonZeroResources._1, randPorts)
+val (resourcesLeft, resourcesToBeUsed) = 
createResources(nonZeroResources, zeroResources)
+(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts)
+  }
+
+  private def createResources(
+  nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]),
+  zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]))
+: (List[Resource], List[Resource]) = {
+val resources = {
+  if (nonZero._2.isEmpty) { // no user ports were defined
+(zero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+
+  } else if (zero._2.isEmpty) { // no random ports were defined
+(nonZero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  nonZero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+  }
+  else {  // we have user defined and random ports defined
+val left = zero._1.flatMap{port => 
createMesosPortResource(port.value, Some(port.role))}
+val used = nonZero._2.flatMap{port =>
+  createMesosPortResource(port.value, Some(port.role))} ++
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))}
+(left, used)
+  }
+}
+resources
+  }
+
+  private case class PortRangeResourceInfo(role: String, value: 
List[(Long, Long)])
+
+  private def getRangeResourceWithRoleInfo(res: JList[Resource], name: 
String)
+: List[PortRangeResourceInfo] = {
+// A resource can have multiple values in the offer since it can 
either be from
+// a specific role or wildcard.
+// Extract role info and port range for every port resource in the 
offer
+res.asScala.filter(_.getName == name)
+  .map{res => PortRangeResourceInfo(res.getRole, 
res.getRanges.getRangeList.asScala
+.map(r => (r.getBegin, r.getEnd)).toList) }.toList
+  }
+
+  /** Helper method to get a pair of assigned and remaining ports along 
with role info */
+  private def reservePorts(
+  availablePortRanges: List[PortRangeResourceInfo],
+  wantedPorts: List[Long])
+: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]) = {
+if (wantedPorts.isEmpty) { // port list is empty we didnt consume any 
resources
+ 

[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-10 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r62743042
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -356,4 +374,229 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)]): 
Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource])
+: (List[Resource], List[Resource], List[Long]) = {
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong,
--- End diff --

which part? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-10 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r62742851
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -356,4 +374,229 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)]): 
Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource])
+: (List[Resource], List[Resource], List[Long]) = {
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong,
+  conf.getInt("spark.blockManager.port", 0).toLong)
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+// reserve non zero ports first
+val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts)
+// reserve actual port numbers for zero ports - not set by the user
+val numOfZeroPorts = portsToCheck.count(_ == 0)
+val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, 
numOfZeroPorts)
+val zeroResources = reservePorts(nonZeroResources._1, randPorts)
+val (resourcesLeft, resourcesToBeUsed) = 
createResources(nonZeroResources, zeroResources)
+(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts)
+  }
+
+  private def createResources(
+  nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]),
+  zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]))
+: (List[Resource], List[Resource]) = {
+val resources = {
+  if (nonZero._2.isEmpty) { // no user ports were defined
+(zero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+
+  } else if (zero._2.isEmpty) { // no random ports were defined
+(nonZero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  nonZero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+  }
+  else {  // we have user defined and random ports defined
+val left = zero._1.flatMap{port => 
createMesosPortResource(port.value, Some(port.role))}
+val used = nonZero._2.flatMap{port =>
+  createMesosPortResource(port.value, Some(port.role))} ++
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))}
+(left, used)
+  }
+}
+resources
+  }
+
+  private case class PortRangeResourceInfo(role: String, value: 
List[(Long, Long)])
+
+  private def getRangeResourceWithRoleInfo(res: JList[Resource], name: 
String)
+: List[PortRangeResourceInfo] = {
+// A resource can have multiple values in the offer since it can 
either be from
+// a specific role or wildcard.
+// Extract role info and port range for every port resource in the 
offer
+res.asScala.filter(_.getName == name)
+  .map{res => PortRangeResourceInfo(res.getRole, 
res.getRanges.getRangeList.asScala
+.map(r => (r.getBegin, r.getEnd)).toList) }.toList
+  }
+
+  /** Helper method to get a pair of assigned and remaining ports along 
with role info */
+  private def reservePorts(
+  availablePortRanges: List[PortRangeResourceInfo],
+  wantedPorts: List[Long])
+: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]) = {
+if (wantedPorts.isEmpty) { // port list is empty we didnt consume any 
resources

[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-10 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r62742808
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -356,4 +374,229 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)]): 
Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource])
+: (List[Resource], List[Resource], List[Long]) = {
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong,
+  conf.getInt("spark.blockManager.port", 0).toLong)
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+// reserve non zero ports first
+val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts)
+// reserve actual port numbers for zero ports - not set by the user
+val numOfZeroPorts = portsToCheck.count(_ == 0)
+val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, 
numOfZeroPorts)
+val zeroResources = reservePorts(nonZeroResources._1, randPorts)
+val (resourcesLeft, resourcesToBeUsed) = 
createResources(nonZeroResources, zeroResources)
+(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts)
+  }
+
+  private def createResources(
+  nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]),
+  zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]))
+: (List[Resource], List[Resource]) = {
+val resources = {
+  if (nonZero._2.isEmpty) { // no user ports were defined
+(zero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+
+  } else if (zero._2.isEmpty) { // no random ports were defined
+(nonZero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  nonZero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+  }
+  else {  // we have user defined and random ports defined
+val left = zero._1.flatMap{port => 
createMesosPortResource(port.value, Some(port.role))}
+val used = nonZero._2.flatMap{port =>
+  createMesosPortResource(port.value, Some(port.role))} ++
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))}
+(left, used)
+  }
+}
+resources
+  }
+
+  private case class PortRangeResourceInfo(role: String, value: 
List[(Long, Long)])
+
+  private def getRangeResourceWithRoleInfo(res: JList[Resource], name: 
String)
+: List[PortRangeResourceInfo] = {
+// A resource can have multiple values in the offer since it can 
either be from
+// a specific role or wildcard.
+// Extract role info and port range for every port resource in the 
offer
+res.asScala.filter(_.getName == name)
+  .map{res => PortRangeResourceInfo(res.getRole, 
res.getRanges.getRangeList.asScala
+.map(r => (r.getBegin, r.getEnd)).toList) }.toList
+  }
+
+  /** Helper method to get a pair of assigned and remaining ports along 
with role info */
+  private def reservePorts(
+  availablePortRanges: List[PortRangeResourceInfo],
+  wantedPorts: List[Long])
+: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]) = {
+if (wantedPorts.isEmpty) { // port list is empty we didnt consume any 
resources

[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-10 Thread skonto
Github user skonto commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-218273995
  
But that means the admin has no control to restrict ranges on the isolates 
namespace


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-10 Thread mgummelt
Github user mgummelt commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-218273778
  
@skonto The offered ports are host ports.  Executors launched in an 
isolated namespace won't touch host ports unless they implement port mapping, 
which is why we still need this PR (I think).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-10 Thread skonto
Github user skonto commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-218273216
  
What i was trying to say is that even separate tasks with their own 
namespaces and having isolated port numbers need to honour what it is offered 
to them and not pick random ports. But i dont know how this is going to be 
implemented in 0.29. For example if Task1 needs a random port can just pick it 
but if specific port ranges are offered due to restrictions on a slave then 
those should be used to choose a random port within them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-10 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r62733797
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -356,4 +374,229 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)]): 
Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource])
+: (List[Resource], List[Resource], List[Long]) = {
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong,
--- End diff --

plz factor this out.  It's used multiple times


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-10 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r62733341
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -356,4 +374,229 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)]): 
Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource])
+: (List[Resource], List[Resource], List[Long]) = {
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong,
+  conf.getInt("spark.blockManager.port", 0).toLong)
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+// reserve non zero ports first
+val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts)
+// reserve actual port numbers for zero ports - not set by the user
+val numOfZeroPorts = portsToCheck.count(_ == 0)
+val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, 
numOfZeroPorts)
+val zeroResources = reservePorts(nonZeroResources._1, randPorts)
+val (resourcesLeft, resourcesToBeUsed) = 
createResources(nonZeroResources, zeroResources)
+(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts)
+  }
+
+  private def createResources(
+  nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]),
+  zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]))
+: (List[Resource], List[Resource]) = {
+val resources = {
+  if (nonZero._2.isEmpty) { // no user ports were defined
+(zero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+
+  } else if (zero._2.isEmpty) { // no random ports were defined
+(nonZero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  nonZero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+  }
+  else {  // we have user defined and random ports defined
+val left = zero._1.flatMap{port => 
createMesosPortResource(port.value, Some(port.role))}
+val used = nonZero._2.flatMap{port =>
+  createMesosPortResource(port.value, Some(port.role))} ++
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))}
+(left, used)
+  }
+}
+resources
+  }
+
+  private case class PortRangeResourceInfo(role: String, value: 
List[(Long, Long)])
--- End diff --

s/value/range


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-10 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r62731739
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -356,4 +374,229 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)]): 
Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource])
+: (List[Resource], List[Resource], List[Long]) = {
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong,
+  conf.getInt("spark.blockManager.port", 0).toLong)
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+// reserve non zero ports first
+val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts)
+// reserve actual port numbers for zero ports - not set by the user
+val numOfZeroPorts = portsToCheck.count(_ == 0)
+val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, 
numOfZeroPorts)
+val zeroResources = reservePorts(nonZeroResources._1, randPorts)
+val (resourcesLeft, resourcesToBeUsed) = 
createResources(nonZeroResources, zeroResources)
+(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts)
+  }
+
+  private def createResources(
+  nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]),
+  zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]))
+: (List[Resource], List[Resource]) = {
+val resources = {
+  if (nonZero._2.isEmpty) { // no user ports were defined
+(zero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+
+  } else if (zero._2.isEmpty) { // no random ports were defined
+(nonZero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  nonZero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+  }
+  else {  // we have user defined and random ports defined
+val left = zero._1.flatMap{port => 
createMesosPortResource(port.value, Some(port.role))}
+val used = nonZero._2.flatMap{port =>
+  createMesosPortResource(port.value, Some(port.role))} ++
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))}
+(left, used)
+  }
+}
+resources
+  }
+
+  private case class PortRangeResourceInfo(role: String, value: 
List[(Long, Long)])
+
+  private def getRangeResourceWithRoleInfo(res: JList[Resource], name: 
String)
+: List[PortRangeResourceInfo] = {
+// A resource can have multiple values in the offer since it can 
either be from
+// a specific role or wildcard.
+// Extract role info and port range for every port resource in the 
offer
+res.asScala.filter(_.getName == name)
+  .map{res => PortRangeResourceInfo(res.getRole, 
res.getRanges.getRangeList.asScala
+.map(r => (r.getBegin, r.getEnd)).toList) }.toList
+  }
+
+  /** Helper method to get a pair of assigned and remaining ports along 
with role info */
+  private def reservePorts(
+  availablePortRanges: List[PortRangeResourceInfo],
+  wantedPorts: List[Long])
+: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]) = {
+if (wantedPorts.isEmpty) { // port list is empty we didnt consume any 
resources

[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-10 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r62731256
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -395,24 +402,29 @@ private[spark] class CoarseMesosSchedulerBackend(
 
   val (afterCPUResources, cpuResourcesToUse) =
--- End diff --

Can you factor this all out into a getResources() method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-10 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r62731081
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -395,24 +402,29 @@ private[spark] class CoarseMesosSchedulerBackend(
 
   val (afterCPUResources, cpuResourcesToUse) =
 partitionResources(resources, "cpus", taskCPUs)
-  val (resourcesLeft, memResourcesToUse) =
+  val (afterMemResources, memResourcesToUse) =
 partitionResources(afterCPUResources.asJava, "mem", taskMemory)
+  // process port offers
+  val (resourcesLeft, portResources) = 
getPortResources(afterMemResources)
--- End diff --

s/getPortResources/partitionPortResources


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-10 Thread mgummelt
Github user mgummelt commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-218254187
  
We need to make this opt-in.  Many users run Mesos with IP-per-container, 
and thus don't utilize host ports.  DC/OS will soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-10 Thread mgummelt
Github user mgummelt commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-218243767
  
I'm not sure what you mean, but I just now learned that we likely will need 
this PR in order to support port mapping, even if we did use CNI.  I'll go 
ahead with the review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-09 Thread skonto
Github user skonto commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-217958594
  
But for admin/security reasons you may want to have fix port ranges offered 
and thus define specific ports for each task not randomly picked up. Should not 
we treat ports in that case as in this PR? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-09 Thread mgummelt
Github user mgummelt commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-217930926
  
Port offers are only relevant to executors running in the host network 
space.  So if we launch all executors in an isolated namespace, we shouldn't 
have to worry about ports at all.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-07 Thread skonto
Github user skonto commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-217621412
  
Ok np. That also means that mesos will offer the same port on the same 
slave multiple times right?
So if we have lets say port 1000 assigned to one executor then in order to 
start another executor on the same slave with the same port then we need to 
1000 be offered again, will that happen? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-06 Thread mgummelt
Github user mgummelt commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-217528595
  
I just learned that Mesos 0.29 will have network isolation support, which 
will allow us to launch each container in its own network namespace.  What do 
you think about just waiting for that?  Then we don't have to think about port 
conflicts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-217503819
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-217503821
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/58004/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-06 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-217503581
  
**[Test build #58004 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/58004/consoleFull)**
 for PR 11157 at commit 
[`9c7cf33`](https://github.com/apache/spark/commit/9c7cf332ccf350e721d25b0070b7c2637261ccaf).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-06 Thread skonto
Github user skonto commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-217477341
  
@mgummelt ready.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-06 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-217475441
  
**[Test build #58004 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/58004/consoleFull)**
 for PR 11157 at commit 
[`9c7cf33`](https://github.com/apache/spark/commit/9c7cf332ccf350e721d25b0070b7c2637261ccaf).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-217465989
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/57992/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-217465986
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-06 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-217465665
  
**[Test build #57992 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/57992/consoleFull)**
 for PR 11157 at commit 
[`dba3e34`](https://github.com/apache/spark/commit/dba3e34c826ddfcaa096254e1f0d230c49b4349d).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-06 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-217434915
  
**[Test build #57992 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/57992/consoleFull)**
 for PR 11157 at commit 
[`dba3e34`](https://github.com/apache/spark/commit/dba3e34c826ddfcaa096254e1f0d230c49b4349d).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-05 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r62158563
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -186,6 +190,12 @@ private[spark] class CoarseMesosSchedulerBackend(
 .setValue(value)
 .build())
 }
+
+// Set the ports to be pickedup by the executor
+environment.addVariables(Environment.Variable.newBuilder()
+  .setName("SPARK_MESOS_PREASSIGNED_PORTS")
--- End diff --

Ok i will give it a shot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-05 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r62158547
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -376,24 +386,29 @@ private[spark] class CoarseMesosSchedulerBackend(
 
   val (afterCPUResources, cpuResourcesToUse) =
 partitionResources(resources, "cpus", taskCPUs)
-  val (resourcesLeft, memResourcesToUse) =
+  val (remainingMemResources, memResourcesToUse) =
--- End diff --

i will fix that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-05 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r62158505
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -72,13 +71,13 @@ private[spark] class CoarseMesosSchedulerBackend(
   private val shuffleServiceEnabled = 
conf.getBoolean("spark.shuffle.service.enabled", false)
 
   // Cores we have acquired with each Mesos task ID
-  val coresByTaskId = new HashMap[String, Int]
+  val coresByTaskId = new mutable.HashMap[String, Int]
--- End diff --

Right now the code has at the imports section:

import scala.collection.mutable
import scala.collection.mutable.{Buffer, HashMap, HashSet}

and bellow:
val coresByTaskId = new HashMap[String, Int]

So the mutable is used anyway, at least this is shown to my IDE.

I just made the mutable explicit because its cleaner.

As for the mutable vs immutable problem mutable is already used and IMHO 
needed eg.

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala#L483


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-04 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r62136262
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -217,7 +227,7 @@ private[spark] class CoarseMesosSchedulerBackend(
   command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get))
 }
 
-conf.getOption("spark.mesos.uris").map { uris =>
+conf.getOption("spark.mesos.uris").foreach { uris =>
--- End diff --

:+1: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-04 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r62136229
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -186,6 +190,12 @@ private[spark] class CoarseMesosSchedulerBackend(
 .setValue(value)
 .build())
 }
+
+// Set the ports to be pickedup by the executor
+environment.addVariables(Environment.Variable.newBuilder()
+  .setName("SPARK_MESOS_PREASSIGNED_PORTS")
--- End diff --

I'd rather not introduce yet another env var.  Can't we just use the 
existing `SPARK_EXECUTOR_OPTS`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-04 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r62135944
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -376,24 +386,29 @@ private[spark] class CoarseMesosSchedulerBackend(
 
   val (afterCPUResources, cpuResourcesToUse) =
 partitionResources(resources, "cpus", taskCPUs)
-  val (resourcesLeft, memResourcesToUse) =
+  val (remainingMemResources, memResourcesToUse) =
--- End diff --

s/remainingMemResources/afterMemResources to be consistent w/ 
`afterCPUResources`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-05-04 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r62135836
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -72,13 +71,13 @@ private[spark] class CoarseMesosSchedulerBackend(
   private val shuffleServiceEnabled = 
conf.getBoolean("spark.shuffle.service.enabled", false)
 
   // Cores we have acquired with each Mesos task ID
-  val coresByTaskId = new HashMap[String, Int]
+  val coresByTaskId = new mutable.HashMap[String, Int]
--- End diff --

Why make these mutable?  AFAICT, they don't need to be.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-21 Thread skonto
Github user skonto commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-213119956
  
Ready for review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-21 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-212979856
  
**[Test build #56542 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/56542/consoleFull)**
 for PR 11157 at commit 
[`0432771`](https://github.com/apache/spark/commit/043277133adfe849a87e653e3d81ebb79ef47226).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-21 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-212980274
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/56542/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-21 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-212980270
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-21 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-212965783
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-21 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-212965792
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/56541/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-21 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-212965542
  
**[Test build #56541 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/56541/consoleFull)**
 for PR 11157 at commit 
[`a410473`](https://github.com/apache/spark/commit/a410473f9cf3248b8d399bda7dc94978f4d6b679).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-21 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-212927264
  
**[Test build #56542 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/56542/consoleFull)**
 for PR 11157 at commit 
[`0432771`](https://github.com/apache/spark/commit/043277133adfe849a87e653e3d81ebb79ef47226).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-21 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-212922415
  
**[Test build #56541 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/56541/consoleFull)**
 for PR 11157 at commit 
[`a410473`](https://github.com/apache/spark/commit/a410473f9cf3248b8d399bda7dc94978f4d6b679).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-21 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60567877
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -1978,57 +1978,134 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Attempt to start a service on the given port, or fail after a number 
of attempts.
-   * Each subsequent attempt uses 1 + the port used in the previous 
attempt (unless the port is 0).
-   *
-   * @param startPort The initial port to start the service on.
-   * @param startService Function to start service on a given port.
-   * This is expected to throw java.net.BindException 
on port collision.
-   * @param conf A SparkConf used to get the maximum number of retries 
when binding to a port.
-   * @param serviceName Name of the service.
-   * @return (service: T, port: Int)
-   */
-  def startServiceOnPort[T](
-  startPort: Int,
+* Attempt to start a service on the given port, or fail after a number 
of attempts.
+* Each subsequent attempt uses 1 + the port used in the previous 
attempt (unless the port is 0).
+* It takes into consideration port restrictions through the env var 
AVAILABLE_PORTS
+*
+* @param startPort The initial port to start the service on.
+* @param startService Function to start service on a given port.
+* This is expected to throw java.net.BindException 
on port collision.
+* @param conf A SparkConf used to get the maximum number of retries 
when binding to a port.
+* @param serviceName Name of the service.
+* @return (service: T, port: Int)
+*/
+  def startServiceOnPort[T](startPort: Int,
   startService: Int => (T, Int),
   conf: SparkConf,
   serviceName: String = ""): (T, Int) = {
 
+val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
+
+// define some helpers, they all share common data, maybe a service 
abstract class
+// for all services could be a good fit here.
+
+def portRangeToList(ranges: String): List[(Long, Long)] = {
+  if (ranges == "") {
+return List()
+  }
+  ranges.split(" ").map { r => val ret = r.substring(1, r.length - 
1).split(",")
+(ret(0).toLong, ret(1).toLong)
+  }.toList
+}
+
+def startOnce(tryPort: Int): (Option[T], Int) = {
+  val serviceString = if (serviceName.isEmpty) "" else s" 
'$serviceName'"
+  try {
+val (service, port) = startService(tryPort)
+logInfo(s"Successfully started service$serviceString on port 
$port.")
+(Some(service), port)
+  } catch {
+case e: Exception if isBindCollision(e) => 
logWarning(s"Service$serviceString " +
+  s"could not bind on port $tryPort. ")
+  (None, -1)
+  }
+}
+
+def retryPort(next: Int => Int, maxRetries: Int): (T, Int) = {
+
+  for (offset <- 0 until maxRetries) {
+val tryPort = next(offset)
+try {
+  val (service, port) = startService(tryPort)
+  logInfo(s"Successfully started service$serviceString on port 
$port.")
+  return (service, port)
+} catch {
+  case e: Exception if isBindCollision(e) =>
+if (offset >= maxRetries) {
+  val exceptionMessage =
+s"${e.getMessage}: Service$serviceString failed after 
$maxRetries retries!"
+  val exception = new BindException(exceptionMessage)
+  // restore original stack trace
+  exception.setStackTrace(e.getStackTrace)
+  throw exception
+}
+logWarning(s"Service$serviceString could not bind on port 
$tryPort.")
+}
+  }
+  // Should never happen
+  throw new SparkException(s"Failed to start service$serviceString on 
port $startPort")
+}
+
+def startFromAvailable(rand: Boolean = false): (T, Int) = {
+  val ports = portRangeToList(sys.env.get("AVAILABLE_RAND_PORTS").get)
--- End diff --

i will do that anyway @tnachen ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-21 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60561827
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @param takenPorts ports already used for that slave
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)],
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-21 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60561580
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -154,7 +161,8 @@ private[spark] class CoarseMesosSchedulerBackend(
 startScheduler(driver)
   }
 
-  def createCommand(offer: Offer, numCores: Int, taskId: String): 
CommandInfo = {
+  def createCommand(offer: Offer, numCores: Int, taskId: String, 
randPorts: List[Long] = List())
--- End diff --

ok i will fix it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-21 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60559591
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -75,6 +75,13 @@ private[spark] class CoarseMesosSchedulerBackend(
   val coresByTaskId = new HashMap[String, Int]
   var totalCoresAcquired = 0
 
+  // Ports acquired so far
+  // SlaveID ->  ports
+  val takenPortsPerSlave = new mutable.HashMap[String, List[Long]]
--- End diff --

will remove the states


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-20 Thread skonto
Github user skonto commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-212429985
  
WIP


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-20 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60402294
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @param takenPorts ports already used for that slave
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)],
+   takenPorts: List[Long] = List()): Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// If we require a port that is taken we have to decline the offer 
since mesos
+// shares all port ranges on the slave
+val contained = for {port <- nonZeroPorts}
+  yield {
+takenPorts.contains(port)
+  }
+
+if (contained.contains(true)) {
+  return false
+}
+
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource]): (List[Resource], List[Resource], List[Long]) 
= {
+
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+
--- End diff --

I know there is https://github.com/databricks/scala-style-guide#blanklines
but honestly when mutliple fields have no space in between especially when 
they dont form a logical group among other groups then readability is low... at 
least my opinion...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-20 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60399111
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -1978,57 +1978,134 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Attempt to start a service on the given port, or fail after a number 
of attempts.
-   * Each subsequent attempt uses 1 + the port used in the previous 
attempt (unless the port is 0).
-   *
-   * @param startPort The initial port to start the service on.
-   * @param startService Function to start service on a given port.
-   * This is expected to throw java.net.BindException 
on port collision.
-   * @param conf A SparkConf used to get the maximum number of retries 
when binding to a port.
-   * @param serviceName Name of the service.
-   * @return (service: T, port: Int)
-   */
-  def startServiceOnPort[T](
-  startPort: Int,
+* Attempt to start a service on the given port, or fail after a number 
of attempts.
+* Each subsequent attempt uses 1 + the port used in the previous 
attempt (unless the port is 0).
+* It takes into consideration port restrictions through the env var 
AVAILABLE_PORTS
+*
+* @param startPort The initial port to start the service on.
+* @param startService Function to start service on a given port.
+* This is expected to throw java.net.BindException 
on port collision.
+* @param conf A SparkConf used to get the maximum number of retries 
when binding to a port.
+* @param serviceName Name of the service.
+* @return (service: T, port: Int)
+*/
+  def startServiceOnPort[T](startPort: Int,
   startService: Int => (T, Int),
   conf: SparkConf,
   serviceName: String = ""): (T, Int) = {
 
+val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
+
+// define some helpers, they all share common data, maybe a service 
abstract class
+// for all services could be a good fit here.
+
+def portRangeToList(ranges: String): List[(Long, Long)] = {
+  if (ranges == "") {
+return List()
+  }
+  ranges.split(" ").map { r => val ret = r.substring(1, r.length - 
1).split(",")
+(ret(0).toLong, ret(1).toLong)
+  }.toList
+}
+
+def startOnce(tryPort: Int): (Option[T], Int) = {
+  val serviceString = if (serviceName.isEmpty) "" else s" 
'$serviceName'"
+  try {
+val (service, port) = startService(tryPort)
+logInfo(s"Successfully started service$serviceString on port 
$port.")
+(Some(service), port)
+  } catch {
+case e: Exception if isBindCollision(e) => 
logWarning(s"Service$serviceString " +
+  s"could not bind on port $tryPort. ")
+  (None, -1)
+  }
+}
+
+def retryPort(next: Int => Int, maxRetries: Int): (T, Int) = {
+
+  for (offset <- 0 until maxRetries) {
+val tryPort = next(offset)
+try {
+  val (service, port) = startService(tryPort)
+  logInfo(s"Successfully started service$serviceString on port 
$port.")
+  return (service, port)
+} catch {
+  case e: Exception if isBindCollision(e) =>
+if (offset >= maxRetries) {
+  val exceptionMessage =
+s"${e.getMessage}: Service$serviceString failed after 
$maxRetries retries!"
+  val exception = new BindException(exceptionMessage)
+  // restore original stack trace
+  exception.setStackTrace(e.getStackTrace)
+  throw exception
+}
+logWarning(s"Service$serviceString could not bind on port 
$tryPort.")
+}
+  }
+  // Should never happen
+  throw new SparkException(s"Failed to start service$serviceString on 
port $startPort")
+}
+
+def startFromAvailable(rand: Boolean = false): (T, Int) = {
+  val ports = portRangeToList(sys.env.get("AVAILABLE_RAND_PORTS").get)
--- End diff --

It will affect CoarseGrainedExecutorBackend. which is not mesos spceific... 
i dont see a way to avoid touching some portion which is not mesos only.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For addition

[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread tnachen
Github user tnachen commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60310542
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -1978,57 +1978,134 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Attempt to start a service on the given port, or fail after a number 
of attempts.
-   * Each subsequent attempt uses 1 + the port used in the previous 
attempt (unless the port is 0).
-   *
-   * @param startPort The initial port to start the service on.
-   * @param startService Function to start service on a given port.
-   * This is expected to throw java.net.BindException 
on port collision.
-   * @param conf A SparkConf used to get the maximum number of retries 
when binding to a port.
-   * @param serviceName Name of the service.
-   * @return (service: T, port: Int)
-   */
-  def startServiceOnPort[T](
-  startPort: Int,
+* Attempt to start a service on the given port, or fail after a number 
of attempts.
+* Each subsequent attempt uses 1 + the port used in the previous 
attempt (unless the port is 0).
+* It takes into consideration port restrictions through the env var 
AVAILABLE_PORTS
+*
+* @param startPort The initial port to start the service on.
+* @param startService Function to start service on a given port.
+* This is expected to throw java.net.BindException 
on port collision.
+* @param conf A SparkConf used to get the maximum number of retries 
when binding to a port.
+* @param serviceName Name of the service.
+* @return (service: T, port: Int)
+*/
+  def startServiceOnPort[T](startPort: Int,
   startService: Int => (T, Int),
   conf: SparkConf,
   serviceName: String = ""): (T, Int) = {
 
+val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
+
+// define some helpers, they all share common data, maybe a service 
abstract class
+// for all services could be a good fit here.
+
+def portRangeToList(ranges: String): List[(Long, Long)] = {
+  if (ranges == "") {
+return List()
+  }
+  ranges.split(" ").map { r => val ret = r.substring(1, r.length - 
1).split(",")
+(ret(0).toLong, ret(1).toLong)
+  }.toList
+}
+
+def startOnce(tryPort: Int): (Option[T], Int) = {
+  val serviceString = if (serviceName.isEmpty) "" else s" 
'$serviceName'"
+  try {
+val (service, port) = startService(tryPort)
+logInfo(s"Successfully started service$serviceString on port 
$port.")
+(Some(service), port)
+  } catch {
+case e: Exception if isBindCollision(e) => 
logWarning(s"Service$serviceString " +
+  s"could not bind on port $tryPort. ")
+  (None, -1)
+  }
+}
+
+def retryPort(next: Int => Int, maxRetries: Int): (T, Int) = {
+
+  for (offset <- 0 until maxRetries) {
+val tryPort = next(offset)
+try {
+  val (service, port) = startService(tryPort)
+  logInfo(s"Successfully started service$serviceString on port 
$port.")
+  return (service, port)
+} catch {
+  case e: Exception if isBindCollision(e) =>
+if (offset >= maxRetries) {
+  val exceptionMessage =
+s"${e.getMessage}: Service$serviceString failed after 
$maxRetries retries!"
+  val exception = new BindException(exceptionMessage)
+  // restore original stack trace
+  exception.setStackTrace(e.getStackTrace)
+  throw exception
+}
+logWarning(s"Service$serviceString could not bind on port 
$tryPort.")
+}
+  }
+  // Should never happen
+  throw new SparkException(s"Failed to start service$serviceString on 
port $startPort")
+}
+
+def startFromAvailable(rand: Boolean = false): (T, Int) = {
+  val ports = portRangeToList(sys.env.get("AVAILABLE_RAND_PORTS").get)
--- End diff --

I prefer the latter approach you mentioned where you preassign them, as 
that's only impacting locally Mesos portion of the code base and we can 
potentially control it more. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@sp

[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60301970
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -45,7 +45,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
 
   /**
* Creates a new MesosSchedulerDriver that communicates to the Mesos 
master.
-   * @param masterUrl The url to connect to Mesos master
+*
+* @param masterUrl The url to connect to Mesos master
--- End diff --

i guess not i will fix thnx


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60301895
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -333,7 +350,8 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
   /**
* Return the amount of memory to allocate to each executor, taking into 
account
* container overheads.
-   * @param sc SparkContext to use to get 
`spark.mesos.executor.memoryOverhead` value
+*
+* @param sc SparkContext to use to get 
`spark.mesos.executor.memoryOverhead` value
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60301880
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -171,7 +186,8 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
   /**
* Partition the existing set of resources into two groups, those 
remaining to be
* scheduled and those requested to be used for a new task.
-   * @param resources The full list of available resources
+*
+* @param resources The full list of available resources
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60301818
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @param takenPorts ports already used for that slave
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)],
+   takenPorts: List[Long] = List()): Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// If we require a port that is taken we have to decline the offer 
since mesos
+// shares all port ranges on the slave
+val contained = for {port <- nonZeroPorts}
+  yield {
+takenPorts.contains(port)
+  }
+
+if (contained.contains(true)) {
+  return false
+}
+
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource]): (List[Resource], List[Resource], List[Long]) 
= {
+
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+
+val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong,
+  conf.getInt("spark.blockManager.port", 0).toLong)
+
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// reserve non zero ports first
+
+val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts)
+
+// reserve actual port numbers for zero ports - not set by the user
+
+val numOfZeroPorts = portsToCheck.count(_ == 0)
+
+val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, 
numOfZeroPorts)
+
+val zeroResources = reservePorts(nonZeroResources._1, randPorts)
+
+val (resourcesLeft, resourcesToBeUsed) = 
createResources(nonZeroResources, zeroResources)
+
+(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts)
+  }
+
+  private def createResources(
+  nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]),
+  zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]))
+  : (List[Resource], List[Resource]) = {
+
+val resources = {
+  if (nonZero._2.isEmpty) { // no user ports were defined
+(zero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+
+  } else if (zero._2.isEmpty) { // no random ports were defined
+(nonZero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  nonZero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+  }
+  else {  // we have user defined and random ports defined
+val left = zero._1.flatMap{port => 
createMesosPortResource(port.value, Some(port.role))}
+
+val used = nonZero._2.flatMap{port =>
+  createMesosPortResource(port.value, Some(port.role))} ++
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))}
+
+(left, used)
+  }
+}
+resources
+  }
+
+  private case class PortRangeResourceInfo(role: String, value: 
List[(Long, Long)])
+
+  private def getRangeResourceWithRoleInfo(res: JList[Resource], name: 
String)
+  : List[PortRangeResourceInfo] = {
+// A resource can have multiple values in the offer since it can 
either be from
+// a specific role or wildcard.
+res.asScala.filter(_.getName == name)
+  .map{res => PortRangeResourceInfo(res.getRole, 
res.getRanges.getRangeList.

[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60301857
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @param takenPorts ports already used for that slave
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)],
+   takenPorts: List[Long] = List()): Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// If we require a port that is taken we have to decline the offer 
since mesos
+// shares all port ranges on the slave
+val contained = for {port <- nonZeroPorts}
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60301778
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @param takenPorts ports already used for that slave
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)],
+   takenPorts: List[Long] = List()): Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// If we require a port that is taken we have to decline the offer 
since mesos
+// shares all port ranges on the slave
+val contained = for {port <- nonZeroPorts}
+  yield {
+takenPorts.contains(port)
+  }
+
+if (contained.contains(true)) {
+  return false
+}
+
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource]): (List[Resource], List[Resource], List[Long]) 
= {
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60301695
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @param takenPorts ports already used for that slave
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)],
+   takenPorts: List[Long] = List()): Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// If we require a port that is taken we have to decline the offer 
since mesos
+// shares all port ranges on the slave
+val contained = for {port <- nonZeroPorts}
+  yield {
+takenPorts.contains(port)
+  }
+
+if (contained.contains(true)) {
+  return false
+}
+
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource]): (List[Resource], List[Resource], List[Long]) 
= {
+
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+
+val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong,
+  conf.getInt("spark.blockManager.port", 0).toLong)
+
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// reserve non zero ports first
+
+val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts)
+
+// reserve actual port numbers for zero ports - not set by the user
+
+val numOfZeroPorts = portsToCheck.count(_ == 0)
+
+val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, 
numOfZeroPorts)
+
+val zeroResources = reservePorts(nonZeroResources._1, randPorts)
+
+val (resourcesLeft, resourcesToBeUsed) = 
createResources(nonZeroResources, zeroResources)
+
+(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts)
+  }
+
+  private def createResources(
+  nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]),
+  zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]))
+  : (List[Resource], List[Resource]) = {
+
+val resources = {
+  if (nonZero._2.isEmpty) { // no user ports were defined
+(zero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+
+  } else if (zero._2.isEmpty) { // no random ports were defined
+(nonZero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  nonZero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+  }
+  else {  // we have user defined and random ports defined
+val left = zero._1.flatMap{port => 
createMesosPortResource(port.value, Some(port.role))}
+
+val used = nonZero._2.flatMap{port =>
+  createMesosPortResource(port.value, Some(port.role))} ++
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))}
+
+(left, used)
+  }
+}
+resources
+  }
+
+  private case class PortRangeResourceInfo(role: String, value: 
List[(Long, Long)])
+
+  private def getRangeResourceWithRoleInfo(res: JList[Resource], name: 
String)
+  : List[PortRangeResourceInfo] = {
+// A resource can have multiple values in the offer since it can 
either be from
+// a specific role or wildcard.
+res.asScala.filter(_.getName == name)
+  .map{res => PortRangeResourceInfo(res.getRole, 
res.getRanges.getRangeList.

[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60301651
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @param takenPorts ports already used for that slave
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)],
+   takenPorts: List[Long] = List()): Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// If we require a port that is taken we have to decline the offer 
since mesos
+// shares all port ranges on the slave
+val contained = for {port <- nonZeroPorts}
+  yield {
+takenPorts.contains(port)
+  }
+
+if (contained.contains(true)) {
+  return false
+}
+
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource]): (List[Resource], List[Resource], List[Long]) 
= {
+
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+
+val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong,
+  conf.getInt("spark.blockManager.port", 0).toLong)
+
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// reserve non zero ports first
+
+val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts)
+
+// reserve actual port numbers for zero ports - not set by the user
+
+val numOfZeroPorts = portsToCheck.count(_ == 0)
+
+val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, 
numOfZeroPorts)
+
+val zeroResources = reservePorts(nonZeroResources._1, randPorts)
+
+val (resourcesLeft, resourcesToBeUsed) = 
createResources(nonZeroResources, zeroResources)
+
+(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts)
+  }
+
+  private def createResources(
+  nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]),
+  zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]))
+  : (List[Resource], List[Resource]) = {
+
+val resources = {
+  if (nonZero._2.isEmpty) { // no user ports were defined
+(zero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+
+  } else if (zero._2.isEmpty) { // no random ports were defined
+(nonZero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  nonZero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+  }
+  else {  // we have user defined and random ports defined
+val left = zero._1.flatMap{port => 
createMesosPortResource(port.value, Some(port.role))}
+
+val used = nonZero._2.flatMap{port =>
+  createMesosPortResource(port.value, Some(port.role))} ++
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))}
+
+(left, used)
+  }
+}
+resources
+  }
+
+  private case class PortRangeResourceInfo(role: String, value: 
List[(Long, Long)])
+
+  private def getRangeResourceWithRoleInfo(res: JList[Resource], name: 
String)
+  : List[PortRangeResourceInfo] = {
+// A resource can have multiple values in the offer since it can 
either be from
+// a specific role or wildcard.
+res.asScala.filter(_.getName == name)
+  .map{res => PortRangeResourceInfo(res.getRole, 
res.getRanges.getRangeList.

[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60301627
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @param takenPorts ports already used for that slave
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)],
+   takenPorts: List[Long] = List()): Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// If we require a port that is taken we have to decline the offer 
since mesos
+// shares all port ranges on the slave
+val contained = for {port <- nonZeroPorts}
+  yield {
+takenPorts.contains(port)
+  }
+
+if (contained.contains(true)) {
+  return false
+}
+
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource]): (List[Resource], List[Resource], List[Long]) 
= {
+
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+
+val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong,
+  conf.getInt("spark.blockManager.port", 0).toLong)
+
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// reserve non zero ports first
+
+val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts)
+
+// reserve actual port numbers for zero ports - not set by the user
+
+val numOfZeroPorts = portsToCheck.count(_ == 0)
+
+val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, 
numOfZeroPorts)
+
+val zeroResources = reservePorts(nonZeroResources._1, randPorts)
+
+val (resourcesLeft, resourcesToBeUsed) = 
createResources(nonZeroResources, zeroResources)
+
+(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts)
+  }
+
+  private def createResources(
+  nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]),
+  zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]))
+  : (List[Resource], List[Resource]) = {
+
+val resources = {
+  if (nonZero._2.isEmpty) { // no user ports were defined
+(zero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+
+  } else if (zero._2.isEmpty) { // no random ports were defined
+(nonZero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  nonZero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+  }
+  else {  // we have user defined and random ports defined
+val left = zero._1.flatMap{port => 
createMesosPortResource(port.value, Some(port.role))}
+
+val used = nonZero._2.flatMap{port =>
+  createMesosPortResource(port.value, Some(port.role))} ++
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))}
+
+(left, used)
+  }
+}
+resources
+  }
+
+  private case class PortRangeResourceInfo(role: String, value: 
List[(Long, Long)])
+
+  private def getRangeResourceWithRoleInfo(res: JList[Resource], name: 
String)
+  : List[PortRangeResourceInfo] = {
+// A resource can have multiple values in the offer since it can 
either be from
+// a specific role or wildcard.
+res.asScala.filter(_.getName == name)
+  .map{res => PortRangeResourceInfo(res.getRole, 
res.getRanges.getRangeList.

[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread skonto
Github user skonto commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-212104928
  
Thnx guys i will work on the above and add more info in the doc , rebase 
and come back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60301588
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @param takenPorts ports already used for that slave
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)],
+   takenPorts: List[Long] = List()): Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// If we require a port that is taken we have to decline the offer 
since mesos
+// shares all port ranges on the slave
+val contained = for {port <- nonZeroPorts}
+  yield {
+takenPorts.contains(port)
+  }
+
+if (contained.contains(true)) {
+  return false
+}
+
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource]): (List[Resource], List[Resource], List[Long]) 
= {
+
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+
+val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong,
+  conf.getInt("spark.blockManager.port", 0).toLong)
+
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// reserve non zero ports first
+
+val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts)
+
+// reserve actual port numbers for zero ports - not set by the user
+
+val numOfZeroPorts = portsToCheck.count(_ == 0)
+
+val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, 
numOfZeroPorts)
+
+val zeroResources = reservePorts(nonZeroResources._1, randPorts)
+
+val (resourcesLeft, resourcesToBeUsed) = 
createResources(nonZeroResources, zeroResources)
+
+(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts)
+  }
+
+  private def createResources(
+  nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]),
+  zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]))
+  : (List[Resource], List[Resource]) = {
+
+val resources = {
+  if (nonZero._2.isEmpty) { // no user ports were defined
+(zero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+
+  } else if (zero._2.isEmpty) { // no random ports were defined
+(nonZero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  nonZero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+  }
+  else {  // we have user defined and random ports defined
+val left = zero._1.flatMap{port => 
createMesosPortResource(port.value, Some(port.role))}
+
+val used = nonZero._2.flatMap{port =>
+  createMesosPortResource(port.value, Some(port.role))} ++
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))}
+
+(left, used)
+  }
+}
+resources
+  }
+
+  private case class PortRangeResourceInfo(role: String, value: 
List[(Long, Long)])
+
+  private def getRangeResourceWithRoleInfo(res: JList[Resource], name: 
String)
+  : List[PortRangeResourceInfo] = {
+// A resource can have multiple values in the offer since it can 
either be from
+// a specific role or wildcard.
+res.asScala.filter(_.getName == name)
+  .map{res => PortRangeResourceInfo(res.getRole, 
res.getRanges.getRangeList.

[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60301605
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @param takenPorts ports already used for that slave
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)],
+   takenPorts: List[Long] = List()): Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// If we require a port that is taken we have to decline the offer 
since mesos
+// shares all port ranges on the slave
+val contained = for {port <- nonZeroPorts}
+  yield {
+takenPorts.contains(port)
+  }
+
+if (contained.contains(true)) {
+  return false
+}
+
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource]): (List[Resource], List[Resource], List[Long]) 
= {
+
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+
+val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong,
+  conf.getInt("spark.blockManager.port", 0).toLong)
+
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// reserve non zero ports first
+
+val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts)
+
+// reserve actual port numbers for zero ports - not set by the user
+
+val numOfZeroPorts = portsToCheck.count(_ == 0)
+
+val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, 
numOfZeroPorts)
+
+val zeroResources = reservePorts(nonZeroResources._1, randPorts)
+
+val (resourcesLeft, resourcesToBeUsed) = 
createResources(nonZeroResources, zeroResources)
+
+(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts)
+  }
+
+  private def createResources(
+  nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]),
+  zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]))
+  : (List[Resource], List[Resource]) = {
+
+val resources = {
+  if (nonZero._2.isEmpty) { // no user ports were defined
+(zero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+
+  } else if (zero._2.isEmpty) { // no random ports were defined
+(nonZero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  nonZero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+  }
+  else {  // we have user defined and random ports defined
+val left = zero._1.flatMap{port => 
createMesosPortResource(port.value, Some(port.role))}
+
+val used = nonZero._2.flatMap{port =>
+  createMesosPortResource(port.value, Some(port.role))} ++
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))}
+
+(left, used)
+  }
+}
+resources
+  }
+
+  private case class PortRangeResourceInfo(role: String, value: 
List[(Long, Long)])
+
+  private def getRangeResourceWithRoleInfo(res: JList[Resource], name: 
String)
+  : List[PortRangeResourceInfo] = {
+// A resource can have multiple values in the offer since it can 
either be from
+// a specific role or wildcard.
+res.asScala.filter(_.getName == name)
+  .map{res => PortRangeResourceInfo(res.getRole, 
res.getRanges.getRangeList.

[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60301402
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @param takenPorts ports already used for that slave
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)],
+   takenPorts: List[Long] = List()): Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// If we require a port that is taken we have to decline the offer 
since mesos
+// shares all port ranges on the slave
+val contained = for {port <- nonZeroPorts}
+  yield {
+takenPorts.contains(port)
+  }
+
+if (contained.contains(true)) {
+  return false
+}
+
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource]): (List[Resource], List[Resource], List[Long]) 
= {
+
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60301188
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -1978,57 +1978,134 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Attempt to start a service on the given port, or fail after a number 
of attempts.
-   * Each subsequent attempt uses 1 + the port used in the previous 
attempt (unless the port is 0).
-   *
-   * @param startPort The initial port to start the service on.
-   * @param startService Function to start service on a given port.
-   * This is expected to throw java.net.BindException 
on port collision.
-   * @param conf A SparkConf used to get the maximum number of retries 
when binding to a port.
-   * @param serviceName Name of the service.
-   * @return (service: T, port: Int)
-   */
-  def startServiceOnPort[T](
-  startPort: Int,
+* Attempt to start a service on the given port, or fail after a number 
of attempts.
+* Each subsequent attempt uses 1 + the port used in the previous 
attempt (unless the port is 0).
+* It takes into consideration port restrictions through the env var 
AVAILABLE_PORTS
+*
+* @param startPort The initial port to start the service on.
+* @param startService Function to start service on a given port.
+* This is expected to throw java.net.BindException 
on port collision.
+* @param conf A SparkConf used to get the maximum number of retries 
when binding to a port.
+* @param serviceName Name of the service.
+* @return (service: T, port: Int)
+*/
+  def startServiceOnPort[T](startPort: Int,
   startService: Int => (T, Int),
   conf: SparkConf,
   serviceName: String = ""): (T, Int) = {
 
+val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
+
+// define some helpers, they all share common data, maybe a service 
abstract class
+// for all services could be a good fit here.
+
+def portRangeToList(ranges: String): List[(Long, Long)] = {
+  if (ranges == "") {
+return List()
+  }
+  ranges.split(" ").map { r => val ret = r.substring(1, r.length - 
1).split(",")
+(ret(0).toLong, ret(1).toLong)
+  }.toList
+}
+
+def startOnce(tryPort: Int): (Option[T], Int) = {
+  val serviceString = if (serviceName.isEmpty) "" else s" 
'$serviceName'"
+  try {
+val (service, port) = startService(tryPort)
+logInfo(s"Successfully started service$serviceString on port 
$port.")
+(Some(service), port)
+  } catch {
+case e: Exception if isBindCollision(e) => 
logWarning(s"Service$serviceString " +
+  s"could not bind on port $tryPort. ")
+  (None, -1)
+  }
+}
+
+def retryPort(next: Int => Int, maxRetries: Int): (T, Int) = {
+
+  for (offset <- 0 until maxRetries) {
+val tryPort = next(offset)
+try {
+  val (service, port) = startService(tryPort)
+  logInfo(s"Successfully started service$serviceString on port 
$port.")
+  return (service, port)
+} catch {
+  case e: Exception if isBindCollision(e) =>
+if (offset >= maxRetries) {
+  val exceptionMessage =
+s"${e.getMessage}: Service$serviceString failed after 
$maxRetries retries!"
+  val exception = new BindException(exceptionMessage)
+  // restore original stack trace
+  exception.setStackTrace(e.getStackTrace)
+  throw exception
+}
+logWarning(s"Service$serviceString could not bind on port 
$tryPort.")
+}
+  }
+  // Should never happen
+  throw new SparkException(s"Failed to start service$serviceString on 
port $startPort")
+}
+
+def startFromAvailable(rand: Boolean = false): (T, Int) = {
+  val ports = portRangeToList(sys.env.get("AVAILABLE_RAND_PORTS").get)
--- End diff --

I will rename the env variable and discuss other approaches if you want.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60299848
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -1978,57 +1978,134 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Attempt to start a service on the given port, or fail after a number 
of attempts.
-   * Each subsequent attempt uses 1 + the port used in the previous 
attempt (unless the port is 0).
-   *
-   * @param startPort The initial port to start the service on.
-   * @param startService Function to start service on a given port.
-   * This is expected to throw java.net.BindException 
on port collision.
-   * @param conf A SparkConf used to get the maximum number of retries 
when binding to a port.
-   * @param serviceName Name of the service.
-   * @return (service: T, port: Int)
-   */
-  def startServiceOnPort[T](
-  startPort: Int,
+* Attempt to start a service on the given port, or fail after a number 
of attempts.
+* Each subsequent attempt uses 1 + the port used in the previous 
attempt (unless the port is 0).
+* It takes into consideration port restrictions through the env var 
AVAILABLE_PORTS
+*
+* @param startPort The initial port to start the service on.
+* @param startService Function to start service on a given port.
+* This is expected to throw java.net.BindException 
on port collision.
+* @param conf A SparkConf used to get the maximum number of retries 
when binding to a port.
+* @param serviceName Name of the service.
+* @return (service: T, port: Int)
+*/
+  def startServiceOnPort[T](startPort: Int,
   startService: Int => (T, Int),
   conf: SparkConf,
   serviceName: String = ""): (T, Int) = {
 
+val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
+
+// define some helpers, they all share common data, maybe a service 
abstract class
+// for all services could be a good fit here.
+
+def portRangeToList(ranges: String): List[(Long, Long)] = {
+  if (ranges == "") {
+return List()
+  }
+  ranges.split(" ").map { r => val ret = r.substring(1, r.length - 
1).split(",")
+(ret(0).toLong, ret(1).toLong)
+  }.toList
+}
+
+def startOnce(tryPort: Int): (Option[T], Int) = {
+  val serviceString = if (serviceName.isEmpty) "" else s" 
'$serviceName'"
+  try {
+val (service, port) = startService(tryPort)
+logInfo(s"Successfully started service$serviceString on port 
$port.")
+(Some(service), port)
+  } catch {
+case e: Exception if isBindCollision(e) => 
logWarning(s"Service$serviceString " +
+  s"could not bind on port $tryPort. ")
+  (None, -1)
+  }
+}
+
+def retryPort(next: Int => Int, maxRetries: Int): (T, Int) = {
+
+  for (offset <- 0 until maxRetries) {
+val tryPort = next(offset)
+try {
+  val (service, port) = startService(tryPort)
+  logInfo(s"Successfully started service$serviceString on port 
$port.")
+  return (service, port)
+} catch {
+  case e: Exception if isBindCollision(e) =>
+if (offset >= maxRetries) {
+  val exceptionMessage =
+s"${e.getMessage}: Service$serviceString failed after 
$maxRetries retries!"
+  val exception = new BindException(exceptionMessage)
+  // restore original stack trace
+  exception.setStackTrace(e.getStackTrace)
+  throw exception
+}
+logWarning(s"Service$serviceString could not bind on port 
$tryPort.")
+}
+  }
+  // Should never happen
+  throw new SparkException(s"Failed to start service$serviceString on 
port $startPort")
+}
+
+def startFromAvailable(rand: Boolean = false): (T, Int) = {
+  val ports = portRangeToList(sys.env.get("AVAILABLE_RAND_PORTS").get)
--- End diff --

It is called when AVAILABLE_RAND_PORTS is set. So if spark.executor.port or 
spark.blockManager.port are set to a specific value then this will be used so 
no collision...
Again... if they are not set.. startFromAvailable(true) ->  
retryPort(tryPort, maxRetries) will be called . The former shuffles the ports 
and pass them down to the latter. Also retryPort  uses the old logic where each 
port will be tried until there is one free.
 Another approach is to preassign  
spark.executor.port,spark.blockManager.port with some offered values explicitly 
and carry those values  with AVAILABLE_RAND_PORTS. 
   

[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60296064
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @param takenPorts ports already used for that slave
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)],
+   takenPorts: List[Long] = List()): Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// If we require a port that is taken we have to decline the offer 
since mesos
+// shares all port ranges on the slave
+val contained = for {port <- nonZeroPorts}
+  yield {
+takenPorts.contains(port)
+  }
+
+if (contained.contains(true)) {
+  return false
+}
+
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource]): (List[Resource], List[Resource], List[Long]) 
= {
+
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+
+val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong,
+  conf.getInt("spark.blockManager.port", 0).toLong)
+
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// reserve non zero ports first
+
+val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts)
+
+// reserve actual port numbers for zero ports - not set by the user
+
+val numOfZeroPorts = portsToCheck.count(_ == 0)
+
+val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, 
numOfZeroPorts)
--- End diff --

Just keep the semantics the same for 0. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread tnachen
Github user tnachen commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60257784
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -1978,57 +1978,134 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Attempt to start a service on the given port, or fail after a number 
of attempts.
-   * Each subsequent attempt uses 1 + the port used in the previous 
attempt (unless the port is 0).
-   *
-   * @param startPort The initial port to start the service on.
-   * @param startService Function to start service on a given port.
-   * This is expected to throw java.net.BindException 
on port collision.
-   * @param conf A SparkConf used to get the maximum number of retries 
when binding to a port.
-   * @param serviceName Name of the service.
-   * @return (service: T, port: Int)
-   */
-  def startServiceOnPort[T](
-  startPort: Int,
+* Attempt to start a service on the given port, or fail after a number 
of attempts.
+* Each subsequent attempt uses 1 + the port used in the previous 
attempt (unless the port is 0).
+* It takes into consideration port restrictions through the env var 
AVAILABLE_PORTS
+*
+* @param startPort The initial port to start the service on.
+* @param startService Function to start service on a given port.
+* This is expected to throw java.net.BindException 
on port collision.
+* @param conf A SparkConf used to get the maximum number of retries 
when binding to a port.
+* @param serviceName Name of the service.
+* @return (service: T, port: Int)
+*/
+  def startServiceOnPort[T](startPort: Int,
   startService: Int => (T, Int),
   conf: SparkConf,
   serviceName: String = ""): (T, Int) = {
 
+val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
+
+// define some helpers, they all share common data, maybe a service 
abstract class
+// for all services could be a good fit here.
+
+def portRangeToList(ranges: String): List[(Long, Long)] = {
+  if (ranges == "") {
+return List()
+  }
+  ranges.split(" ").map { r => val ret = r.substring(1, r.length - 
1).split(",")
+(ret(0).toLong, ret(1).toLong)
+  }.toList
+}
+
+def startOnce(tryPort: Int): (Option[T], Int) = {
+  val serviceString = if (serviceName.isEmpty) "" else s" 
'$serviceName'"
+  try {
+val (service, port) = startService(tryPort)
+logInfo(s"Successfully started service$serviceString on port 
$port.")
+(Some(service), port)
+  } catch {
+case e: Exception if isBindCollision(e) => 
logWarning(s"Service$serviceString " +
+  s"could not bind on port $tryPort. ")
+  (None, -1)
+  }
+}
+
+def retryPort(next: Int => Int, maxRetries: Int): (T, Int) = {
+
+  for (offset <- 0 until maxRetries) {
+val tryPort = next(offset)
+try {
+  val (service, port) = startService(tryPort)
+  logInfo(s"Successfully started service$serviceString on port 
$port.")
+  return (service, port)
+} catch {
+  case e: Exception if isBindCollision(e) =>
+if (offset >= maxRetries) {
+  val exceptionMessage =
+s"${e.getMessage}: Service$serviceString failed after 
$maxRetries retries!"
+  val exception = new BindException(exceptionMessage)
+  // restore original stack trace
+  exception.setStackTrace(e.getStackTrace)
+  throw exception
+}
+logWarning(s"Service$serviceString could not bind on port 
$tryPort.")
+}
+  }
+  // Should never happen
+  throw new SparkException(s"Failed to start service$serviceString on 
port $startPort")
+}
+
+def startFromAvailable(rand: Boolean = false): (T, Int) = {
+  val ports = portRangeToList(sys.env.get("AVAILABLE_RAND_PORTS").get)
--- End diff --

Btw if startFromAvailable is called multiple times we might clash ports 
right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread tnachen
Github user tnachen commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60257720
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -1978,57 +1978,134 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Attempt to start a service on the given port, or fail after a number 
of attempts.
-   * Each subsequent attempt uses 1 + the port used in the previous 
attempt (unless the port is 0).
-   *
-   * @param startPort The initial port to start the service on.
-   * @param startService Function to start service on a given port.
-   * This is expected to throw java.net.BindException 
on port collision.
-   * @param conf A SparkConf used to get the maximum number of retries 
when binding to a port.
-   * @param serviceName Name of the service.
-   * @return (service: T, port: Int)
-   */
-  def startServiceOnPort[T](
-  startPort: Int,
+* Attempt to start a service on the given port, or fail after a number 
of attempts.
+* Each subsequent attempt uses 1 + the port used in the previous 
attempt (unless the port is 0).
+* It takes into consideration port restrictions through the env var 
AVAILABLE_PORTS
+*
+* @param startPort The initial port to start the service on.
+* @param startService Function to start service on a given port.
+* This is expected to throw java.net.BindException 
on port collision.
+* @param conf A SparkConf used to get the maximum number of retries 
when binding to a port.
+* @param serviceName Name of the service.
+* @return (service: T, port: Int)
+*/
+  def startServiceOnPort[T](startPort: Int,
   startService: Int => (T, Int),
   conf: SparkConf,
   serviceName: String = ""): (T, Int) = {
 
+val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
+
+// define some helpers, they all share common data, maybe a service 
abstract class
+// for all services could be a good fit here.
+
+def portRangeToList(ranges: String): List[(Long, Long)] = {
+  if (ranges == "") {
+return List()
+  }
+  ranges.split(" ").map { r => val ret = r.substring(1, r.length - 
1).split(",")
+(ret(0).toLong, ret(1).toLong)
+  }.toList
+}
+
+def startOnce(tryPort: Int): (Option[T], Int) = {
+  val serviceString = if (serviceName.isEmpty) "" else s" 
'$serviceName'"
+  try {
+val (service, port) = startService(tryPort)
+logInfo(s"Successfully started service$serviceString on port 
$port.")
+(Some(service), port)
+  } catch {
+case e: Exception if isBindCollision(e) => 
logWarning(s"Service$serviceString " +
+  s"could not bind on port $tryPort. ")
+  (None, -1)
+  }
+}
+
+def retryPort(next: Int => Int, maxRetries: Int): (T, Int) = {
+
+  for (offset <- 0 until maxRetries) {
+val tryPort = next(offset)
+try {
+  val (service, port) = startService(tryPort)
+  logInfo(s"Successfully started service$serviceString on port 
$port.")
+  return (service, port)
+} catch {
+  case e: Exception if isBindCollision(e) =>
+if (offset >= maxRetries) {
+  val exceptionMessage =
+s"${e.getMessage}: Service$serviceString failed after 
$maxRetries retries!"
+  val exception = new BindException(exceptionMessage)
+  // restore original stack trace
+  exception.setStackTrace(e.getStackTrace)
+  throw exception
+}
+logWarning(s"Service$serviceString could not bind on port 
$tryPort.")
+}
+  }
+  // Should never happen
+  throw new SparkException(s"Failed to start service$serviceString on 
port $startPort")
+}
+
+def startFromAvailable(rand: Boolean = false): (T, Int) = {
+  val ports = portRangeToList(sys.env.get("AVAILABLE_RAND_PORTS").get)
--- End diff --

I'm not sure this is the best approach yet, but regardless we should name 
spark specific env variable with SPARK_* (SPARK_AVAILABLE_PORTS). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional comm

[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread tnachen
Github user tnachen commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60255016
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @param takenPorts ports already used for that slave
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)],
+   takenPorts: List[Long] = List()): Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// If we require a port that is taken we have to decline the offer 
since mesos
+// shares all port ranges on the slave
+val contained = for {port <- nonZeroPorts}
+  yield {
+takenPorts.contains(port)
+  }
+
+if (contained.contains(true)) {
+  return false
+}
+
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource]): (List[Resource], List[Resource], List[Long]) 
= {
+
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+
+val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong,
+  conf.getInt("spark.blockManager.port", 0).toLong)
+
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// reserve non zero ports first
+
+val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts)
+
+// reserve actual port numbers for zero ports - not set by the user
+
+val numOfZeroPorts = portsToCheck.count(_ == 0)
+
+val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, 
numOfZeroPorts)
--- End diff --

What's the intention to pick random ports? Why not just pick the first few 
ones?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread tnachen
Github user tnachen commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60254817
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @param takenPorts ports already used for that slave
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)],
+   takenPorts: List[Long] = List()): Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// If we require a port that is taken we have to decline the offer 
since mesos
+// shares all port ranges on the slave
+val contained = for {port <- nonZeroPorts}
+  yield {
+takenPorts.contains(port)
+  }
+
+if (contained.contains(true)) {
+  return false
+}
+
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource]): (List[Resource], List[Resource], List[Long]) 
= {
+
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+
+val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong,
+  conf.getInt("spark.blockManager.port", 0).toLong)
+
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// reserve non zero ports first
+
+val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts)
+
+// reserve actual port numbers for zero ports - not set by the user
+
+val numOfZeroPorts = portsToCheck.count(_ == 0)
+
+val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, 
numOfZeroPorts)
+
+val zeroResources = reservePorts(nonZeroResources._1, randPorts)
+
+val (resourcesLeft, resourcesToBeUsed) = 
createResources(nonZeroResources, zeroResources)
+
+(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts)
+  }
+
+  private def createResources(
+  nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]),
+  zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]))
+  : (List[Resource], List[Resource]) = {
+
+val resources = {
+  if (nonZero._2.isEmpty) { // no user ports were defined
+(zero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+
+  } else if (zero._2.isEmpty) { // no random ports were defined
+(nonZero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  nonZero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+  }
+  else {  // we have user defined and random ports defined
+val left = zero._1.flatMap{port => 
createMesosPortResource(port.value, Some(port.role))}
+
+val used = nonZero._2.flatMap{port =>
+  createMesosPortResource(port.value, Some(port.role))} ++
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))}
+
+(left, used)
+  }
+}
+resources
+  }
+
+  private case class PortRangeResourceInfo(role: String, value: 
List[(Long, Long)])
+
+  private def getRangeResourceWithRoleInfo(res: JList[Resource], name: 
String)
+  : List[PortRangeResourceInfo] = {
+// A resource can have multiple values in the offer since it can 
either be from
+// a specific role or wildcard.
+res.asScala.filter(_.getName == name)
+  .map{res => PortRangeResourceInfo(res.getRole, 
res.getRanges.getRangeList

[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread tnachen
Github user tnachen commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60254758
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @param takenPorts ports already used for that slave
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)],
+   takenPorts: List[Long] = List()): Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// If we require a port that is taken we have to decline the offer 
since mesos
+// shares all port ranges on the slave
+val contained = for {port <- nonZeroPorts}
+  yield {
+takenPorts.contains(port)
+  }
+
+if (contained.contains(true)) {
+  return false
+}
+
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource]): (List[Resource], List[Resource], List[Long]) 
= {
+
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+
+val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong,
+  conf.getInt("spark.blockManager.port", 0).toLong)
+
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// reserve non zero ports first
+
+val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts)
+
+// reserve actual port numbers for zero ports - not set by the user
+
+val numOfZeroPorts = portsToCheck.count(_ == 0)
+
+val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, 
numOfZeroPorts)
+
+val zeroResources = reservePorts(nonZeroResources._1, randPorts)
+
+val (resourcesLeft, resourcesToBeUsed) = 
createResources(nonZeroResources, zeroResources)
+
+(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts)
+  }
+
+  private def createResources(
+  nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]),
+  zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]))
+  : (List[Resource], List[Resource]) = {
+
+val resources = {
+  if (nonZero._2.isEmpty) { // no user ports were defined
+(zero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+
+  } else if (zero._2.isEmpty) { // no random ports were defined
+(nonZero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  nonZero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+  }
+  else {  // we have user defined and random ports defined
+val left = zero._1.flatMap{port => 
createMesosPortResource(port.value, Some(port.role))}
+
+val used = nonZero._2.flatMap{port =>
+  createMesosPortResource(port.value, Some(port.role))} ++
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))}
+
+(left, used)
+  }
+}
+resources
+  }
+
+  private case class PortRangeResourceInfo(role: String, value: 
List[(Long, Long)])
+
+  private def getRangeResourceWithRoleInfo(res: JList[Resource], name: 
String)
+  : List[PortRangeResourceInfo] = {
+// A resource can have multiple values in the offer since it can 
either be from
+// a specific role or wildcard.
+res.asScala.filter(_.getName == name)
+  .map{res => PortRangeResourceInfo(res.getRole, 
res.getRanges.getRangeList

[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread tnachen
Github user tnachen commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60254665
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @param takenPorts ports already used for that slave
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)],
+   takenPorts: List[Long] = List()): Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// If we require a port that is taken we have to decline the offer 
since mesos
+// shares all port ranges on the slave
+val contained = for {port <- nonZeroPorts}
+  yield {
+takenPorts.contains(port)
+  }
+
+if (contained.contains(true)) {
+  return false
+}
+
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource]): (List[Resource], List[Resource], List[Long]) 
= {
+
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+
+val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong,
+  conf.getInt("spark.blockManager.port", 0).toLong)
+
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// reserve non zero ports first
+
+val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts)
+
+// reserve actual port numbers for zero ports - not set by the user
+
+val numOfZeroPorts = portsToCheck.count(_ == 0)
+
+val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, 
numOfZeroPorts)
+
+val zeroResources = reservePorts(nonZeroResources._1, randPorts)
+
+val (resourcesLeft, resourcesToBeUsed) = 
createResources(nonZeroResources, zeroResources)
+
+(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts)
+  }
+
+  private def createResources(
+  nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]),
+  zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]))
+  : (List[Resource], List[Resource]) = {
+
+val resources = {
+  if (nonZero._2.isEmpty) { // no user ports were defined
+(zero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+
+  } else if (zero._2.isEmpty) { // no random ports were defined
+(nonZero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  nonZero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+  }
+  else {  // we have user defined and random ports defined
+val left = zero._1.flatMap{port => 
createMesosPortResource(port.value, Some(port.role))}
+
+val used = nonZero._2.flatMap{port =>
+  createMesosPortResource(port.value, Some(port.role))} ++
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))}
+
+(left, used)
+  }
+}
+resources
+  }
+
+  private case class PortRangeResourceInfo(role: String, value: 
List[(Long, Long)])
+
+  private def getRangeResourceWithRoleInfo(res: JList[Resource], name: 
String)
+  : List[PortRangeResourceInfo] = {
+// A resource can have multiple values in the offer since it can 
either be from
+// a specific role or wildcard.
+res.asScala.filter(_.getName == name)
+  .map{res => PortRangeResourceInfo(res.getRole, 
res.getRanges.getRangeList

[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread tnachen
Github user tnachen commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60254778
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @param takenPorts ports already used for that slave
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)],
+   takenPorts: List[Long] = List()): Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// If we require a port that is taken we have to decline the offer 
since mesos
+// shares all port ranges on the slave
+val contained = for {port <- nonZeroPorts}
+  yield {
+takenPorts.contains(port)
+  }
+
+if (contained.contains(true)) {
+  return false
+}
+
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource]): (List[Resource], List[Resource], List[Long]) 
= {
+
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+
+val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong,
+  conf.getInt("spark.blockManager.port", 0).toLong)
+
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// reserve non zero ports first
+
+val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts)
+
+// reserve actual port numbers for zero ports - not set by the user
+
+val numOfZeroPorts = portsToCheck.count(_ == 0)
+
+val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, 
numOfZeroPorts)
+
+val zeroResources = reservePorts(nonZeroResources._1, randPorts)
+
+val (resourcesLeft, resourcesToBeUsed) = 
createResources(nonZeroResources, zeroResources)
+
+(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts)
+  }
+
+  private def createResources(
+  nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]),
+  zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]))
+  : (List[Resource], List[Resource]) = {
+
+val resources = {
+  if (nonZero._2.isEmpty) { // no user ports were defined
+(zero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+
+  } else if (zero._2.isEmpty) { // no random ports were defined
+(nonZero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  nonZero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+  }
+  else {  // we have user defined and random ports defined
+val left = zero._1.flatMap{port => 
createMesosPortResource(port.value, Some(port.role))}
+
+val used = nonZero._2.flatMap{port =>
+  createMesosPortResource(port.value, Some(port.role))} ++
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))}
+
+(left, used)
+  }
+}
+resources
+  }
+
+  private case class PortRangeResourceInfo(role: String, value: 
List[(Long, Long)])
+
+  private def getRangeResourceWithRoleInfo(res: JList[Resource], name: 
String)
+  : List[PortRangeResourceInfo] = {
+// A resource can have multiple values in the offer since it can 
either be from
+// a specific role or wildcard.
+res.asScala.filter(_.getName == name)
+  .map{res => PortRangeResourceInfo(res.getRole, 
res.getRanges.getRangeList

[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread tnachen
Github user tnachen commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60254689
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @param takenPorts ports already used for that slave
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)],
+   takenPorts: List[Long] = List()): Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// If we require a port that is taken we have to decline the offer 
since mesos
+// shares all port ranges on the slave
+val contained = for {port <- nonZeroPorts}
+  yield {
+takenPorts.contains(port)
+  }
+
+if (contained.contains(true)) {
+  return false
+}
+
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource]): (List[Resource], List[Resource], List[Long]) 
= {
+
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+
+val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong,
+  conf.getInt("spark.blockManager.port", 0).toLong)
+
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// reserve non zero ports first
+
+val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts)
+
+// reserve actual port numbers for zero ports - not set by the user
+
+val numOfZeroPorts = portsToCheck.count(_ == 0)
+
+val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, 
numOfZeroPorts)
+
+val zeroResources = reservePorts(nonZeroResources._1, randPorts)
+
+val (resourcesLeft, resourcesToBeUsed) = 
createResources(nonZeroResources, zeroResources)
+
+(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts)
+  }
+
+  private def createResources(
+  nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]),
+  zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]))
+  : (List[Resource], List[Resource]) = {
+
+val resources = {
+  if (nonZero._2.isEmpty) { // no user ports were defined
+(zero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+
+  } else if (zero._2.isEmpty) { // no random ports were defined
+(nonZero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  nonZero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+  }
+  else {  // we have user defined and random ports defined
+val left = zero._1.flatMap{port => 
createMesosPortResource(port.value, Some(port.role))}
+
+val used = nonZero._2.flatMap{port =>
+  createMesosPortResource(port.value, Some(port.role))} ++
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))}
+
+(left, used)
+  }
+}
+resources
+  }
+
+  private case class PortRangeResourceInfo(role: String, value: 
List[(Long, Long)])
+
+  private def getRangeResourceWithRoleInfo(res: JList[Resource], name: 
String)
+  : List[PortRangeResourceInfo] = {
+// A resource can have multiple values in the offer since it can 
either be from
+// a specific role or wildcard.
+res.asScala.filter(_.getName == name)
+  .map{res => PortRangeResourceInfo(res.getRole, 
res.getRanges.getRangeList

[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread tnachen
Github user tnachen commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60254568
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @param takenPorts ports already used for that slave
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)],
+   takenPorts: List[Long] = List()): Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// If we require a port that is taken we have to decline the offer 
since mesos
+// shares all port ranges on the slave
+val contained = for {port <- nonZeroPorts}
+  yield {
+takenPorts.contains(port)
+  }
+
+if (contained.contains(true)) {
+  return false
+}
+
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource]): (List[Resource], List[Resource], List[Long]) 
= {
+
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+
+val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong,
+  conf.getInt("spark.blockManager.port", 0).toLong)
+
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// reserve non zero ports first
+
+val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts)
+
+// reserve actual port numbers for zero ports - not set by the user
+
+val numOfZeroPorts = portsToCheck.count(_ == 0)
+
+val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, 
numOfZeroPorts)
+
+val zeroResources = reservePorts(nonZeroResources._1, randPorts)
+
+val (resourcesLeft, resourcesToBeUsed) = 
createResources(nonZeroResources, zeroResources)
+
+(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts)
+  }
+
+  private def createResources(
+  nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]),
+  zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]))
+  : (List[Resource], List[Resource]) = {
+
+val resources = {
+  if (nonZero._2.isEmpty) { // no user ports were defined
+(zero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+
+  } else if (zero._2.isEmpty) { // no random ports were defined
+(nonZero._1.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))},
+  nonZero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))})
+  }
+  else {  // we have user defined and random ports defined
+val left = zero._1.flatMap{port => 
createMesosPortResource(port.value, Some(port.role))}
+
+val used = nonZero._2.flatMap{port =>
+  createMesosPortResource(port.value, Some(port.role))} ++
+  zero._2.flatMap{port => createMesosPortResource(port.value, 
Some(port.role))}
+
+(left, used)
+  }
+}
+resources
+  }
+
+  private case class PortRangeResourceInfo(role: String, value: 
List[(Long, Long)])
+
+  private def getRangeResourceWithRoleInfo(res: JList[Resource], name: 
String)
+  : List[PortRangeResourceInfo] = {
+// A resource can have multiple values in the offer since it can 
either be from
+// a specific role or wildcard.
+res.asScala.filter(_.getName == name)
+  .map{res => PortRangeResourceInfo(res.getRole, 
res.getRanges.getRangeList

[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread tnachen
Github user tnachen commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60254354
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @param takenPorts ports already used for that slave
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)],
+   takenPorts: List[Long] = List()): Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// If we require a port that is taken we have to decline the offer 
since mesos
+// shares all port ranges on the slave
+val contained = for {port <- nonZeroPorts}
+  yield {
+takenPorts.contains(port)
+  }
+
+if (contained.contains(true)) {
+  return false
+}
+
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource]): (List[Resource], List[Resource], List[Long]) 
= {
+
+val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, 
"ports")
+
--- End diff --

Remove all the whitespaces in between


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread tnachen
Github user tnachen commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60254407
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @param takenPorts ports already used for that slave
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)],
+   takenPorts: List[Long] = List()): Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// If we require a port that is taken we have to decline the offer 
since mesos
+// shares all port ranges on the slave
+val contained = for {port <- nonZeroPorts}
+  yield {
+takenPorts.contains(port)
+  }
+
+if (contained.contains(true)) {
+  return false
+}
+
+val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))
+
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param conf the spark config
+   * @param ports the ports offered
+   * @return resources left, port resources to be used and the list of 
assigned ports
+   */
+  def partitionPorts(
+  conf: SparkConf,
+  ports: List[Resource]): (List[Resource], List[Resource], List[Long]) 
= {
--- End diff --

Please specify return type


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread tnachen
Github user tnachen commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60253926
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @param takenPorts ports already used for that slave
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)],
+   takenPorts: List[Long] = List()): Boolean = {
+
+def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
+  sc.conf.getInt("spark.blockManager.port", 0))
+val nonZeroPorts = portsToCheck.filter(_ != 0)
+
+// If we require a port that is taken we have to decline the offer 
since mesos
+// shares all port ranges on the slave
+val contained = for {port <- nonZeroPorts}
--- End diff --

How about just:
if (nonZeroPorts.exists(port => takenPorts.contains(port)) {
  return false
}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread tnachen
Github user tnachen commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60253411
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param sc the Spark Context
+   * @param ports the list of ports to check
+   * @param takenPorts ports already used for that slave
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)],
--- End diff --

Each input parameter in each line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread tnachen
Github user tnachen commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60253302
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -171,7 +186,8 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
   /**
* Partition the existing set of resources into two groups, those 
remaining to be
* scheduled and those requested to be used for a new task.
-   * @param resources The full list of available resources
+*
+* @param resources The full list of available resources
--- End diff --

Same


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread tnachen
Github user tnachen commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60253331
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -333,7 +350,8 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
   /**
* Return the amount of memory to allocate to each executor, taking into 
account
* container overheads.
-   * @param sc SparkContext to use to get 
`spark.mesos.executor.memoryOverhead` value
+*
+* @param sc SparkContext to use to get 
`spark.mesos.executor.memoryOverhead` value
--- End diff --

Same


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread tnachen
Github user tnachen commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60253191
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -45,7 +45,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
 
   /**
* Creates a new MesosSchedulerDriver that communicates to the Mesos 
master.
-   * @param masterUrl The url to connect to Mesos master
+*
+* @param masterUrl The url to connect to Mesos master
--- End diff --

Is this intentional?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread tnachen
Github user tnachen commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60253095
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -154,7 +161,8 @@ private[spark] class CoarseMesosSchedulerBackend(
 startScheduler(driver)
   }
 
-  def createCommand(offer: Offer, numCores: Int, taskId: String): 
CommandInfo = {
+  def createCommand(offer: Offer, numCores: Int, taskId: String, 
randPorts: List[Long] = List())
--- End diff --

I think according to spark style we should move the method parameters into 
each line in this case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60218121
  
--- Diff: core/src/main/scala/org/apache/spark/SparkEnv.scala ---
@@ -244,6 +244,7 @@ object SparkEnv extends Logging {
   conf.set("spark.driver.port", rpcEnv.address.port.toString)
 } else if (rpcEnv.address != null) {
   conf.set("spark.executor.port", rpcEnv.address.port.toString)
+  logInfo(s"Starting executor with port: 
${rpcEnv.address.port.toString}")
--- End diff --

Sure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60218050
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -75,6 +75,13 @@ private[spark] class CoarseMesosSchedulerBackend(
   val coresByTaskId = new HashMap[String, Int]
   var totalCoresAcquired = 0
 
+  // Ports acquired so far
+  // SlaveID ->  ports
+  val takenPortsPerSlave = new mutable.HashMap[String, List[Long]]
--- End diff --

For now it is used by checkPorts utility method only, to check for the case 
where spark.executor.port or spark.blockManager.port have been defined and 
reserved on a slave and if so this port limits the number of execturs as we 
have discussed . So i keep track of what is allocated so i can deny an offer 
fast. Of course i could just check if those values are contained in the offered 
ports anyway. I guess i could refactor this part and ommit this state keeping 
if i dont miss anything here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread skonto
Github user skonto commented on the pull request:

https://github.com/apache/spark/pull/11157#issuecomment-211870773
  
Thnx @mgummelt. Yes i follow that. here is the document 
https://docs.google.com/document/d/1UUPQBtDCuo5M10REDRJiBFa0Vws0inemxcOP9LjTACw/edit?usp=sharing
 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...

2016-04-19 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r60215292
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 ---
@@ -154,7 +161,8 @@ private[spark] class CoarseMesosSchedulerBackend(
 startScheduler(driver)
   }
 
-  def createCommand(offer: Offer, numCores: Int, taskId: String): 
CommandInfo = {
+  def createCommand(offer: Offer, numCores: Int, taskId: String, 
randPorts: List[Long] = List())
--- End diff --

Sure


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   >