[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-219756624 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/58694/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-219756618 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-219756200 **[Test build #58694 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/58694/consoleFull)** for PR 11157 at commit [`47d6a9f`](https://github.com/apache/spark/commit/47d6a9f376f7690e75de32c14742a1ff34a58280). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-219716919 **[Test build #58694 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/58694/consoleFull)** for PR 11157 at commit [`47d6a9f`](https://github.com/apache/spark/commit/47d6a9f376f7690e75de32c14742a1ff34a58280). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r62743938 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -395,24 +402,29 @@ private[spark] class CoarseMesosSchedulerBackend( val (afterCPUResources, cpuResourcesToUse) = partitionResources(resources, "cpus", taskCPUs) - val (resourcesLeft, memResourcesToUse) = + val (afterMemResources, memResourcesToUse) = partitionResources(afterCPUResources.asJava, "mem", taskMemory) + // process port offers + val (resourcesLeft, portResources) = getPortResources(afterMemResources) --- End diff -- ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r6272 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -356,4 +374,229 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)]): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]) +: (List[Resource], List[Resource], List[Long]) = { +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") +val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong, + conf.getInt("spark.blockManager.port", 0).toLong) +val nonZeroPorts = portsToCheck.filter(_ != 0) +// reserve non zero ports first +val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts) +// reserve actual port numbers for zero ports - not set by the user +val numOfZeroPorts = portsToCheck.count(_ == 0) +val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, numOfZeroPorts) +val zeroResources = reservePorts(nonZeroResources._1, randPorts) +val (resourcesLeft, resourcesToBeUsed) = createResources(nonZeroResources, zeroResources) +(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts) + } + + private def createResources( + nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]), + zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo])) +: (List[Resource], List[Resource]) = { +val resources = { + if (nonZero._2.isEmpty) { // no user ports were defined +(zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + + } else if (zero._2.isEmpty) { // no random ports were defined +(nonZero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + nonZero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + } + else { // we have user defined and random ports defined +val left = zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))} +val used = nonZero._2.flatMap{port => + createMesosPortResource(port.value, Some(port.role))} ++ + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))} +(left, used) + } +} +resources + } + + private case class PortRangeResourceInfo(role: String, value: List[(Long, Long)]) + + private def getRangeResourceWithRoleInfo(res: JList[Resource], name: String) +: List[PortRangeResourceInfo] = { +// A resource can have multiple values in the offer since it can either be from +// a specific role or wildcard. +// Extract role info and port range for every port resource in the offer +res.asScala.filter(_.getName == name) + .map{res => PortRangeResourceInfo(res.getRole, res.getRanges.getRangeList.asScala +.map(r => (r.getBegin, r.getEnd)).toList) }.toList + } + + /** Helper method to get a pair of assigned and remaining ports along with role info */ + private def reservePorts( + availablePortRanges: List[PortRangeResourceInfo], + wantedPorts: List[Long]) +: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]) = { +if (wantedPorts.isEmpty) { // port list is empty we didnt consume any resources +
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r62744256 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -356,4 +374,229 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)]): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]) +: (List[Resource], List[Resource], List[Long]) = { +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") +val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong, + conf.getInt("spark.blockManager.port", 0).toLong) +val nonZeroPorts = portsToCheck.filter(_ != 0) +// reserve non zero ports first +val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts) +// reserve actual port numbers for zero ports - not set by the user +val numOfZeroPorts = portsToCheck.count(_ == 0) +val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, numOfZeroPorts) +val zeroResources = reservePorts(nonZeroResources._1, randPorts) +val (resourcesLeft, resourcesToBeUsed) = createResources(nonZeroResources, zeroResources) +(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts) + } + + private def createResources( + nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]), + zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo])) +: (List[Resource], List[Resource]) = { +val resources = { + if (nonZero._2.isEmpty) { // no user ports were defined +(zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + + } else if (zero._2.isEmpty) { // no random ports were defined +(nonZero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + nonZero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + } + else { // we have user defined and random ports defined +val left = zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))} +val used = nonZero._2.flatMap{port => + createMesosPortResource(port.value, Some(port.role))} ++ + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))} +(left, used) + } +} +resources + } + + private case class PortRangeResourceInfo(role: String, value: List[(Long, Long)]) + + private def getRangeResourceWithRoleInfo(res: JList[Resource], name: String) +: List[PortRangeResourceInfo] = { +// A resource can have multiple values in the offer since it can either be from +// a specific role or wildcard. +// Extract role info and port range for every port resource in the offer +res.asScala.filter(_.getName == name) + .map{res => PortRangeResourceInfo(res.getRole, res.getRanges.getRangeList.asScala +.map(r => (r.getBegin, r.getEnd)).toList) }.toList + } + + /** Helper method to get a pair of assigned and remaining ports along with role info */ + private def reservePorts( + availablePortRanges: List[PortRangeResourceInfo], + wantedPorts: List[Long]) +: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]) = { +if (wantedPorts.isEmpty) { // port list is empty we didnt consume any resources +
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r62743925 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -395,24 +402,29 @@ private[spark] class CoarseMesosSchedulerBackend( val (afterCPUResources, cpuResourcesToUse) = --- End diff -- ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-218277517 Ok but could you be more specific to your comments because its not always clear what is the problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user mgummelt commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-218277321 So? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r62743444 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -46,6 +46,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { --- End diff -- The additions to this file seem overly complex for just grabbing port resources. Maybe all this complexity is necessary, but I'm skeptical. I started in on it and got lost. I don't have time to review it all today. Please ensure there is no more room for simplification. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r62743066 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -356,4 +374,229 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)]): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]) +: (List[Resource], List[Resource], List[Long]) = { +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") +val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong, + conf.getInt("spark.blockManager.port", 0).toLong) +val nonZeroPorts = portsToCheck.filter(_ != 0) +// reserve non zero ports first +val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts) +// reserve actual port numbers for zero ports - not set by the user +val numOfZeroPorts = portsToCheck.count(_ == 0) +val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, numOfZeroPorts) +val zeroResources = reservePorts(nonZeroResources._1, randPorts) +val (resourcesLeft, resourcesToBeUsed) = createResources(nonZeroResources, zeroResources) +(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts) + } + + private def createResources( + nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]), + zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo])) +: (List[Resource], List[Resource]) = { +val resources = { + if (nonZero._2.isEmpty) { // no user ports were defined +(zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + + } else if (zero._2.isEmpty) { // no random ports were defined +(nonZero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + nonZero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + } + else { // we have user defined and random ports defined +val left = zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))} +val used = nonZero._2.flatMap{port => + createMesosPortResource(port.value, Some(port.role))} ++ + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))} +(left, used) + } +} +resources + } + + private case class PortRangeResourceInfo(role: String, value: List[(Long, Long)]) --- End diff -- ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r62743126 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -356,4 +374,229 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)]): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]) +: (List[Resource], List[Resource], List[Long]) = { +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") +val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong, + conf.getInt("spark.blockManager.port", 0).toLong) +val nonZeroPorts = portsToCheck.filter(_ != 0) +// reserve non zero ports first +val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts) +// reserve actual port numbers for zero ports - not set by the user +val numOfZeroPorts = portsToCheck.count(_ == 0) +val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, numOfZeroPorts) +val zeroResources = reservePorts(nonZeroResources._1, randPorts) +val (resourcesLeft, resourcesToBeUsed) = createResources(nonZeroResources, zeroResources) +(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts) + } + + private def createResources( + nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]), + zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo])) +: (List[Resource], List[Resource]) = { +val resources = { + if (nonZero._2.isEmpty) { // no user ports were defined +(zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + + } else if (zero._2.isEmpty) { // no random ports were defined +(nonZero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + nonZero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + } + else { // we have user defined and random ports defined +val left = zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))} +val used = nonZero._2.flatMap{port => + createMesosPortResource(port.value, Some(port.role))} ++ + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))} +(left, used) + } +} +resources + } + + private case class PortRangeResourceInfo(role: String, value: List[(Long, Long)]) + + private def getRangeResourceWithRoleInfo(res: JList[Resource], name: String) +: List[PortRangeResourceInfo] = { +// A resource can have multiple values in the offer since it can either be from +// a specific role or wildcard. +// Extract role info and port range for every port resource in the offer +res.asScala.filter(_.getName == name) + .map{res => PortRangeResourceInfo(res.getRole, res.getRanges.getRangeList.asScala +.map(r => (r.getBegin, r.getEnd)).toList) }.toList + } + + /** Helper method to get a pair of assigned and remaining ports along with role info */ + private def reservePorts( + availablePortRanges: List[PortRangeResourceInfo], + wantedPorts: List[Long]) +: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]) = { +if (wantedPorts.isEmpty) { // port list is empty we didnt consume any resources +
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r62743042 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -356,4 +374,229 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)]): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]) +: (List[Resource], List[Resource], List[Long]) = { +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") +val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong, --- End diff -- which part? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r62742851 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -356,4 +374,229 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)]): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]) +: (List[Resource], List[Resource], List[Long]) = { +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") +val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong, + conf.getInt("spark.blockManager.port", 0).toLong) +val nonZeroPorts = portsToCheck.filter(_ != 0) +// reserve non zero ports first +val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts) +// reserve actual port numbers for zero ports - not set by the user +val numOfZeroPorts = portsToCheck.count(_ == 0) +val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, numOfZeroPorts) +val zeroResources = reservePorts(nonZeroResources._1, randPorts) +val (resourcesLeft, resourcesToBeUsed) = createResources(nonZeroResources, zeroResources) +(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts) + } + + private def createResources( + nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]), + zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo])) +: (List[Resource], List[Resource]) = { +val resources = { + if (nonZero._2.isEmpty) { // no user ports were defined +(zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + + } else if (zero._2.isEmpty) { // no random ports were defined +(nonZero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + nonZero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + } + else { // we have user defined and random ports defined +val left = zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))} +val used = nonZero._2.flatMap{port => + createMesosPortResource(port.value, Some(port.role))} ++ + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))} +(left, used) + } +} +resources + } + + private case class PortRangeResourceInfo(role: String, value: List[(Long, Long)]) + + private def getRangeResourceWithRoleInfo(res: JList[Resource], name: String) +: List[PortRangeResourceInfo] = { +// A resource can have multiple values in the offer since it can either be from +// a specific role or wildcard. +// Extract role info and port range for every port resource in the offer +res.asScala.filter(_.getName == name) + .map{res => PortRangeResourceInfo(res.getRole, res.getRanges.getRangeList.asScala +.map(r => (r.getBegin, r.getEnd)).toList) }.toList + } + + /** Helper method to get a pair of assigned and remaining ports along with role info */ + private def reservePorts( + availablePortRanges: List[PortRangeResourceInfo], + wantedPorts: List[Long]) +: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]) = { +if (wantedPorts.isEmpty) { // port list is empty we didnt consume any resources
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r62742808 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -356,4 +374,229 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)]): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]) +: (List[Resource], List[Resource], List[Long]) = { +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") +val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong, + conf.getInt("spark.blockManager.port", 0).toLong) +val nonZeroPorts = portsToCheck.filter(_ != 0) +// reserve non zero ports first +val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts) +// reserve actual port numbers for zero ports - not set by the user +val numOfZeroPorts = portsToCheck.count(_ == 0) +val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, numOfZeroPorts) +val zeroResources = reservePorts(nonZeroResources._1, randPorts) +val (resourcesLeft, resourcesToBeUsed) = createResources(nonZeroResources, zeroResources) +(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts) + } + + private def createResources( + nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]), + zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo])) +: (List[Resource], List[Resource]) = { +val resources = { + if (nonZero._2.isEmpty) { // no user ports were defined +(zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + + } else if (zero._2.isEmpty) { // no random ports were defined +(nonZero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + nonZero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + } + else { // we have user defined and random ports defined +val left = zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))} +val used = nonZero._2.flatMap{port => + createMesosPortResource(port.value, Some(port.role))} ++ + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))} +(left, used) + } +} +resources + } + + private case class PortRangeResourceInfo(role: String, value: List[(Long, Long)]) + + private def getRangeResourceWithRoleInfo(res: JList[Resource], name: String) +: List[PortRangeResourceInfo] = { +// A resource can have multiple values in the offer since it can either be from +// a specific role or wildcard. +// Extract role info and port range for every port resource in the offer +res.asScala.filter(_.getName == name) + .map{res => PortRangeResourceInfo(res.getRole, res.getRanges.getRangeList.asScala +.map(r => (r.getBegin, r.getEnd)).toList) }.toList + } + + /** Helper method to get a pair of assigned and remaining ports along with role info */ + private def reservePorts( + availablePortRanges: List[PortRangeResourceInfo], + wantedPorts: List[Long]) +: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]) = { +if (wantedPorts.isEmpty) { // port list is empty we didnt consume any resources
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-218273995 But that means the admin has no control to restrict ranges on the isolates namespace --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user mgummelt commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-218273778 @skonto The offered ports are host ports. Executors launched in an isolated namespace won't touch host ports unless they implement port mapping, which is why we still need this PR (I think). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-218273216 What i was trying to say is that even separate tasks with their own namespaces and having isolated port numbers need to honour what it is offered to them and not pick random ports. But i dont know how this is going to be implemented in 0.29. For example if Task1 needs a random port can just pick it but if specific port ranges are offered due to restrictions on a slave then those should be used to choose a random port within them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r62733797 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -356,4 +374,229 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)]): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]) +: (List[Resource], List[Resource], List[Long]) = { +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") +val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong, --- End diff -- plz factor this out. It's used multiple times --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r62733341 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -356,4 +374,229 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)]): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]) +: (List[Resource], List[Resource], List[Long]) = { +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") +val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong, + conf.getInt("spark.blockManager.port", 0).toLong) +val nonZeroPorts = portsToCheck.filter(_ != 0) +// reserve non zero ports first +val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts) +// reserve actual port numbers for zero ports - not set by the user +val numOfZeroPorts = portsToCheck.count(_ == 0) +val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, numOfZeroPorts) +val zeroResources = reservePorts(nonZeroResources._1, randPorts) +val (resourcesLeft, resourcesToBeUsed) = createResources(nonZeroResources, zeroResources) +(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts) + } + + private def createResources( + nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]), + zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo])) +: (List[Resource], List[Resource]) = { +val resources = { + if (nonZero._2.isEmpty) { // no user ports were defined +(zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + + } else if (zero._2.isEmpty) { // no random ports were defined +(nonZero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + nonZero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + } + else { // we have user defined and random ports defined +val left = zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))} +val used = nonZero._2.flatMap{port => + createMesosPortResource(port.value, Some(port.role))} ++ + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))} +(left, used) + } +} +resources + } + + private case class PortRangeResourceInfo(role: String, value: List[(Long, Long)]) --- End diff -- s/value/range --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r62731739 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -356,4 +374,229 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)]): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]) +: (List[Resource], List[Resource], List[Long]) = { +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") +val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong, + conf.getInt("spark.blockManager.port", 0).toLong) +val nonZeroPorts = portsToCheck.filter(_ != 0) +// reserve non zero ports first +val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts) +// reserve actual port numbers for zero ports - not set by the user +val numOfZeroPorts = portsToCheck.count(_ == 0) +val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, numOfZeroPorts) +val zeroResources = reservePorts(nonZeroResources._1, randPorts) +val (resourcesLeft, resourcesToBeUsed) = createResources(nonZeroResources, zeroResources) +(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts) + } + + private def createResources( + nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]), + zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo])) +: (List[Resource], List[Resource]) = { +val resources = { + if (nonZero._2.isEmpty) { // no user ports were defined +(zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + + } else if (zero._2.isEmpty) { // no random ports were defined +(nonZero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + nonZero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + } + else { // we have user defined and random ports defined +val left = zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))} +val used = nonZero._2.flatMap{port => + createMesosPortResource(port.value, Some(port.role))} ++ + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))} +(left, used) + } +} +resources + } + + private case class PortRangeResourceInfo(role: String, value: List[(Long, Long)]) + + private def getRangeResourceWithRoleInfo(res: JList[Resource], name: String) +: List[PortRangeResourceInfo] = { +// A resource can have multiple values in the offer since it can either be from +// a specific role or wildcard. +// Extract role info and port range for every port resource in the offer +res.asScala.filter(_.getName == name) + .map{res => PortRangeResourceInfo(res.getRole, res.getRanges.getRangeList.asScala +.map(r => (r.getBegin, r.getEnd)).toList) }.toList + } + + /** Helper method to get a pair of assigned and remaining ports along with role info */ + private def reservePorts( + availablePortRanges: List[PortRangeResourceInfo], + wantedPorts: List[Long]) +: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]) = { +if (wantedPorts.isEmpty) { // port list is empty we didnt consume any resources
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r62731256 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -395,24 +402,29 @@ private[spark] class CoarseMesosSchedulerBackend( val (afterCPUResources, cpuResourcesToUse) = --- End diff -- Can you factor this all out into a getResources() method? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r62731081 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -395,24 +402,29 @@ private[spark] class CoarseMesosSchedulerBackend( val (afterCPUResources, cpuResourcesToUse) = partitionResources(resources, "cpus", taskCPUs) - val (resourcesLeft, memResourcesToUse) = + val (afterMemResources, memResourcesToUse) = partitionResources(afterCPUResources.asJava, "mem", taskMemory) + // process port offers + val (resourcesLeft, portResources) = getPortResources(afterMemResources) --- End diff -- s/getPortResources/partitionPortResources --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user mgummelt commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-218254187 We need to make this opt-in. Many users run Mesos with IP-per-container, and thus don't utilize host ports. DC/OS will soon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user mgummelt commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-218243767 I'm not sure what you mean, but I just now learned that we likely will need this PR in order to support port mapping, even if we did use CNI. I'll go ahead with the review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-217958594 But for admin/security reasons you may want to have fix port ranges offered and thus define specific ports for each task not randomly picked up. Should not we treat ports in that case as in this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user mgummelt commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-217930926 Port offers are only relevant to executors running in the host network space. So if we launch all executors in an isolated namespace, we shouldn't have to worry about ports at all. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-217621412 Ok np. That also means that mesos will offer the same port on the same slave multiple times right? So if we have lets say port 1000 assigned to one executor then in order to start another executor on the same slave with the same port then we need to 1000 be offered again, will that happen? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user mgummelt commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-217528595 I just learned that Mesos 0.29 will have network isolation support, which will allow us to launch each container in its own network namespace. What do you think about just waiting for that? Then we don't have to think about port conflicts. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-217503819 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-217503821 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/58004/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-217503581 **[Test build #58004 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/58004/consoleFull)** for PR 11157 at commit [`9c7cf33`](https://github.com/apache/spark/commit/9c7cf332ccf350e721d25b0070b7c2637261ccaf). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-217477341 @mgummelt ready. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-217475441 **[Test build #58004 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/58004/consoleFull)** for PR 11157 at commit [`9c7cf33`](https://github.com/apache/spark/commit/9c7cf332ccf350e721d25b0070b7c2637261ccaf). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-217465989 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/57992/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-217465986 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-217465665 **[Test build #57992 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/57992/consoleFull)** for PR 11157 at commit [`dba3e34`](https://github.com/apache/spark/commit/dba3e34c826ddfcaa096254e1f0d230c49b4349d). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-217434915 **[Test build #57992 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/57992/consoleFull)** for PR 11157 at commit [`dba3e34`](https://github.com/apache/spark/commit/dba3e34c826ddfcaa096254e1f0d230c49b4349d). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r62158563 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -186,6 +190,12 @@ private[spark] class CoarseMesosSchedulerBackend( .setValue(value) .build()) } + +// Set the ports to be pickedup by the executor +environment.addVariables(Environment.Variable.newBuilder() + .setName("SPARK_MESOS_PREASSIGNED_PORTS") --- End diff -- Ok i will give it a shot. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r62158547 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -376,24 +386,29 @@ private[spark] class CoarseMesosSchedulerBackend( val (afterCPUResources, cpuResourcesToUse) = partitionResources(resources, "cpus", taskCPUs) - val (resourcesLeft, memResourcesToUse) = + val (remainingMemResources, memResourcesToUse) = --- End diff -- i will fix that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r62158505 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -72,13 +71,13 @@ private[spark] class CoarseMesosSchedulerBackend( private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) // Cores we have acquired with each Mesos task ID - val coresByTaskId = new HashMap[String, Int] + val coresByTaskId = new mutable.HashMap[String, Int] --- End diff -- Right now the code has at the imports section: import scala.collection.mutable import scala.collection.mutable.{Buffer, HashMap, HashSet} and bellow: val coresByTaskId = new HashMap[String, Int] So the mutable is used anyway, at least this is shown to my IDE. I just made the mutable explicit because its cleaner. As for the mutable vs immutable problem mutable is already used and IMHO needed eg. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala#L483 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r62136262 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -217,7 +227,7 @@ private[spark] class CoarseMesosSchedulerBackend( command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get)) } -conf.getOption("spark.mesos.uris").map { uris => +conf.getOption("spark.mesos.uris").foreach { uris => --- End diff -- :+1: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r62136229 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -186,6 +190,12 @@ private[spark] class CoarseMesosSchedulerBackend( .setValue(value) .build()) } + +// Set the ports to be pickedup by the executor +environment.addVariables(Environment.Variable.newBuilder() + .setName("SPARK_MESOS_PREASSIGNED_PORTS") --- End diff -- I'd rather not introduce yet another env var. Can't we just use the existing `SPARK_EXECUTOR_OPTS`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r62135944 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -376,24 +386,29 @@ private[spark] class CoarseMesosSchedulerBackend( val (afterCPUResources, cpuResourcesToUse) = partitionResources(resources, "cpus", taskCPUs) - val (resourcesLeft, memResourcesToUse) = + val (remainingMemResources, memResourcesToUse) = --- End diff -- s/remainingMemResources/afterMemResources to be consistent w/ `afterCPUResources` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r62135836 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -72,13 +71,13 @@ private[spark] class CoarseMesosSchedulerBackend( private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) // Cores we have acquired with each Mesos task ID - val coresByTaskId = new HashMap[String, Int] + val coresByTaskId = new mutable.HashMap[String, Int] --- End diff -- Why make these mutable? AFAICT, they don't need to be. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-213119956 Ready for review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-212979856 **[Test build #56542 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/56542/consoleFull)** for PR 11157 at commit [`0432771`](https://github.com/apache/spark/commit/043277133adfe849a87e653e3d81ebb79ef47226). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-212980274 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/56542/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-212980270 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-212965783 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-212965792 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/56541/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-212965542 **[Test build #56541 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/56541/consoleFull)** for PR 11157 at commit [`a410473`](https://github.com/apache/spark/commit/a410473f9cf3248b8d399bda7dc94978f4d6b679). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-212927264 **[Test build #56542 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/56542/consoleFull)** for PR 11157 at commit [`0432771`](https://github.com/apache/spark/commit/043277133adfe849a87e653e3d81ebb79ef47226). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-212922415 **[Test build #56541 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/56541/consoleFull)** for PR 11157 at commit [`a410473`](https://github.com/apache/spark/commit/a410473f9cf3248b8d399bda7dc94978f4d6b679). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60567877 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -1978,57 +1978,134 @@ private[spark] object Utils extends Logging { } /** - * Attempt to start a service on the given port, or fail after a number of attempts. - * Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). - * - * @param startPort The initial port to start the service on. - * @param startService Function to start service on a given port. - * This is expected to throw java.net.BindException on port collision. - * @param conf A SparkConf used to get the maximum number of retries when binding to a port. - * @param serviceName Name of the service. - * @return (service: T, port: Int) - */ - def startServiceOnPort[T]( - startPort: Int, +* Attempt to start a service on the given port, or fail after a number of attempts. +* Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). +* It takes into consideration port restrictions through the env var AVAILABLE_PORTS +* +* @param startPort The initial port to start the service on. +* @param startService Function to start service on a given port. +* This is expected to throw java.net.BindException on port collision. +* @param conf A SparkConf used to get the maximum number of retries when binding to a port. +* @param serviceName Name of the service. +* @return (service: T, port: Int) +*/ + def startServiceOnPort[T](startPort: Int, startService: Int => (T, Int), conf: SparkConf, serviceName: String = ""): (T, Int) = { +val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" + +// define some helpers, they all share common data, maybe a service abstract class +// for all services could be a good fit here. + +def portRangeToList(ranges: String): List[(Long, Long)] = { + if (ranges == "") { +return List() + } + ranges.split(" ").map { r => val ret = r.substring(1, r.length - 1).split(",") +(ret(0).toLong, ret(1).toLong) + }.toList +} + +def startOnce(tryPort: Int): (Option[T], Int) = { + val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" + try { +val (service, port) = startService(tryPort) +logInfo(s"Successfully started service$serviceString on port $port.") +(Some(service), port) + } catch { +case e: Exception if isBindCollision(e) => logWarning(s"Service$serviceString " + + s"could not bind on port $tryPort. ") + (None, -1) + } +} + +def retryPort(next: Int => Int, maxRetries: Int): (T, Int) = { + + for (offset <- 0 until maxRetries) { +val tryPort = next(offset) +try { + val (service, port) = startService(tryPort) + logInfo(s"Successfully started service$serviceString on port $port.") + return (service, port) +} catch { + case e: Exception if isBindCollision(e) => +if (offset >= maxRetries) { + val exceptionMessage = +s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!" + val exception = new BindException(exceptionMessage) + // restore original stack trace + exception.setStackTrace(e.getStackTrace) + throw exception +} +logWarning(s"Service$serviceString could not bind on port $tryPort.") +} + } + // Should never happen + throw new SparkException(s"Failed to start service$serviceString on port $startPort") +} + +def startFromAvailable(rand: Boolean = false): (T, Int) = { + val ports = portRangeToList(sys.env.get("AVAILABLE_RAND_PORTS").get) --- End diff -- i will do that anyway @tnachen ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60561827 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @param takenPorts ports already used for that slave + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)], --- End diff -- ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60561580 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -154,7 +161,8 @@ private[spark] class CoarseMesosSchedulerBackend( startScheduler(driver) } - def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo = { + def createCommand(offer: Offer, numCores: Int, taskId: String, randPorts: List[Long] = List()) --- End diff -- ok i will fix it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60559591 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -75,6 +75,13 @@ private[spark] class CoarseMesosSchedulerBackend( val coresByTaskId = new HashMap[String, Int] var totalCoresAcquired = 0 + // Ports acquired so far + // SlaveID -> ports + val takenPortsPerSlave = new mutable.HashMap[String, List[Long]] --- End diff -- will remove the states --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-212429985 WIP --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60402294 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @param takenPorts ports already used for that slave + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)], + takenPorts: List[Long] = List()): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// If we require a port that is taken we have to decline the offer since mesos +// shares all port ranges on the slave +val contained = for {port <- nonZeroPorts} + yield { +takenPorts.contains(port) + } + +if (contained.contains(true)) { + return false +} + +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) + +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]): (List[Resource], List[Resource], List[Long]) = { + +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") + --- End diff -- I know there is https://github.com/databricks/scala-style-guide#blanklines but honestly when mutliple fields have no space in between especially when they dont form a logical group among other groups then readability is low... at least my opinion... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60399111 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -1978,57 +1978,134 @@ private[spark] object Utils extends Logging { } /** - * Attempt to start a service on the given port, or fail after a number of attempts. - * Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). - * - * @param startPort The initial port to start the service on. - * @param startService Function to start service on a given port. - * This is expected to throw java.net.BindException on port collision. - * @param conf A SparkConf used to get the maximum number of retries when binding to a port. - * @param serviceName Name of the service. - * @return (service: T, port: Int) - */ - def startServiceOnPort[T]( - startPort: Int, +* Attempt to start a service on the given port, or fail after a number of attempts. +* Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). +* It takes into consideration port restrictions through the env var AVAILABLE_PORTS +* +* @param startPort The initial port to start the service on. +* @param startService Function to start service on a given port. +* This is expected to throw java.net.BindException on port collision. +* @param conf A SparkConf used to get the maximum number of retries when binding to a port. +* @param serviceName Name of the service. +* @return (service: T, port: Int) +*/ + def startServiceOnPort[T](startPort: Int, startService: Int => (T, Int), conf: SparkConf, serviceName: String = ""): (T, Int) = { +val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" + +// define some helpers, they all share common data, maybe a service abstract class +// for all services could be a good fit here. + +def portRangeToList(ranges: String): List[(Long, Long)] = { + if (ranges == "") { +return List() + } + ranges.split(" ").map { r => val ret = r.substring(1, r.length - 1).split(",") +(ret(0).toLong, ret(1).toLong) + }.toList +} + +def startOnce(tryPort: Int): (Option[T], Int) = { + val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" + try { +val (service, port) = startService(tryPort) +logInfo(s"Successfully started service$serviceString on port $port.") +(Some(service), port) + } catch { +case e: Exception if isBindCollision(e) => logWarning(s"Service$serviceString " + + s"could not bind on port $tryPort. ") + (None, -1) + } +} + +def retryPort(next: Int => Int, maxRetries: Int): (T, Int) = { + + for (offset <- 0 until maxRetries) { +val tryPort = next(offset) +try { + val (service, port) = startService(tryPort) + logInfo(s"Successfully started service$serviceString on port $port.") + return (service, port) +} catch { + case e: Exception if isBindCollision(e) => +if (offset >= maxRetries) { + val exceptionMessage = +s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!" + val exception = new BindException(exceptionMessage) + // restore original stack trace + exception.setStackTrace(e.getStackTrace) + throw exception +} +logWarning(s"Service$serviceString could not bind on port $tryPort.") +} + } + // Should never happen + throw new SparkException(s"Failed to start service$serviceString on port $startPort") +} + +def startFromAvailable(rand: Boolean = false): (T, Int) = { + val ports = portRangeToList(sys.env.get("AVAILABLE_RAND_PORTS").get) --- End diff -- It will affect CoarseGrainedExecutorBackend. which is not mesos spceific... i dont see a way to avoid touching some portion which is not mesos only. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For addition
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user tnachen commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60310542 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -1978,57 +1978,134 @@ private[spark] object Utils extends Logging { } /** - * Attempt to start a service on the given port, or fail after a number of attempts. - * Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). - * - * @param startPort The initial port to start the service on. - * @param startService Function to start service on a given port. - * This is expected to throw java.net.BindException on port collision. - * @param conf A SparkConf used to get the maximum number of retries when binding to a port. - * @param serviceName Name of the service. - * @return (service: T, port: Int) - */ - def startServiceOnPort[T]( - startPort: Int, +* Attempt to start a service on the given port, or fail after a number of attempts. +* Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). +* It takes into consideration port restrictions through the env var AVAILABLE_PORTS +* +* @param startPort The initial port to start the service on. +* @param startService Function to start service on a given port. +* This is expected to throw java.net.BindException on port collision. +* @param conf A SparkConf used to get the maximum number of retries when binding to a port. +* @param serviceName Name of the service. +* @return (service: T, port: Int) +*/ + def startServiceOnPort[T](startPort: Int, startService: Int => (T, Int), conf: SparkConf, serviceName: String = ""): (T, Int) = { +val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" + +// define some helpers, they all share common data, maybe a service abstract class +// for all services could be a good fit here. + +def portRangeToList(ranges: String): List[(Long, Long)] = { + if (ranges == "") { +return List() + } + ranges.split(" ").map { r => val ret = r.substring(1, r.length - 1).split(",") +(ret(0).toLong, ret(1).toLong) + }.toList +} + +def startOnce(tryPort: Int): (Option[T], Int) = { + val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" + try { +val (service, port) = startService(tryPort) +logInfo(s"Successfully started service$serviceString on port $port.") +(Some(service), port) + } catch { +case e: Exception if isBindCollision(e) => logWarning(s"Service$serviceString " + + s"could not bind on port $tryPort. ") + (None, -1) + } +} + +def retryPort(next: Int => Int, maxRetries: Int): (T, Int) = { + + for (offset <- 0 until maxRetries) { +val tryPort = next(offset) +try { + val (service, port) = startService(tryPort) + logInfo(s"Successfully started service$serviceString on port $port.") + return (service, port) +} catch { + case e: Exception if isBindCollision(e) => +if (offset >= maxRetries) { + val exceptionMessage = +s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!" + val exception = new BindException(exceptionMessage) + // restore original stack trace + exception.setStackTrace(e.getStackTrace) + throw exception +} +logWarning(s"Service$serviceString could not bind on port $tryPort.") +} + } + // Should never happen + throw new SparkException(s"Failed to start service$serviceString on port $startPort") +} + +def startFromAvailable(rand: Boolean = false): (T, Int) = { + val ports = portRangeToList(sys.env.get("AVAILABLE_RAND_PORTS").get) --- End diff -- I prefer the latter approach you mentioned where you preassign them, as that's only impacting locally Mesos portion of the code base and we can potentially control it more. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@sp
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60301970 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -45,7 +45,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging { /** * Creates a new MesosSchedulerDriver that communicates to the Mesos master. - * @param masterUrl The url to connect to Mesos master +* +* @param masterUrl The url to connect to Mesos master --- End diff -- i guess not i will fix thnx --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60301895 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -333,7 +350,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging { /** * Return the amount of memory to allocate to each executor, taking into account * container overheads. - * @param sc SparkContext to use to get `spark.mesos.executor.memoryOverhead` value +* +* @param sc SparkContext to use to get `spark.mesos.executor.memoryOverhead` value --- End diff -- ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60301880 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -171,7 +186,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging { /** * Partition the existing set of resources into two groups, those remaining to be * scheduled and those requested to be used for a new task. - * @param resources The full list of available resources +* +* @param resources The full list of available resources --- End diff -- ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60301818 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @param takenPorts ports already used for that slave + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)], + takenPorts: List[Long] = List()): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// If we require a port that is taken we have to decline the offer since mesos +// shares all port ranges on the slave +val contained = for {port <- nonZeroPorts} + yield { +takenPorts.contains(port) + } + +if (contained.contains(true)) { + return false +} + +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) + +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]): (List[Resource], List[Resource], List[Long]) = { + +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") + +val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong, + conf.getInt("spark.blockManager.port", 0).toLong) + +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// reserve non zero ports first + +val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts) + +// reserve actual port numbers for zero ports - not set by the user + +val numOfZeroPorts = portsToCheck.count(_ == 0) + +val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, numOfZeroPorts) + +val zeroResources = reservePorts(nonZeroResources._1, randPorts) + +val (resourcesLeft, resourcesToBeUsed) = createResources(nonZeroResources, zeroResources) + +(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts) + } + + private def createResources( + nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]), + zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo])) + : (List[Resource], List[Resource]) = { + +val resources = { + if (nonZero._2.isEmpty) { // no user ports were defined +(zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + + } else if (zero._2.isEmpty) { // no random ports were defined +(nonZero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + nonZero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + } + else { // we have user defined and random ports defined +val left = zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))} + +val used = nonZero._2.flatMap{port => + createMesosPortResource(port.value, Some(port.role))} ++ + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))} + +(left, used) + } +} +resources + } + + private case class PortRangeResourceInfo(role: String, value: List[(Long, Long)]) + + private def getRangeResourceWithRoleInfo(res: JList[Resource], name: String) + : List[PortRangeResourceInfo] = { +// A resource can have multiple values in the offer since it can either be from +// a specific role or wildcard. +res.asScala.filter(_.getName == name) + .map{res => PortRangeResourceInfo(res.getRole, res.getRanges.getRangeList.
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60301857 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @param takenPorts ports already used for that slave + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)], + takenPorts: List[Long] = List()): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// If we require a port that is taken we have to decline the offer since mesos +// shares all port ranges on the slave +val contained = for {port <- nonZeroPorts} --- End diff -- ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60301778 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @param takenPorts ports already used for that slave + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)], + takenPorts: List[Long] = List()): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// If we require a port that is taken we have to decline the offer since mesos +// shares all port ranges on the slave +val contained = for {port <- nonZeroPorts} + yield { +takenPorts.contains(port) + } + +if (contained.contains(true)) { + return false +} + +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) + +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]): (List[Resource], List[Resource], List[Long]) = { --- End diff -- ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60301695 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @param takenPorts ports already used for that slave + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)], + takenPorts: List[Long] = List()): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// If we require a port that is taken we have to decline the offer since mesos +// shares all port ranges on the slave +val contained = for {port <- nonZeroPorts} + yield { +takenPorts.contains(port) + } + +if (contained.contains(true)) { + return false +} + +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) + +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]): (List[Resource], List[Resource], List[Long]) = { + +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") + +val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong, + conf.getInt("spark.blockManager.port", 0).toLong) + +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// reserve non zero ports first + +val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts) + +// reserve actual port numbers for zero ports - not set by the user + +val numOfZeroPorts = portsToCheck.count(_ == 0) + +val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, numOfZeroPorts) + +val zeroResources = reservePorts(nonZeroResources._1, randPorts) + +val (resourcesLeft, resourcesToBeUsed) = createResources(nonZeroResources, zeroResources) + +(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts) + } + + private def createResources( + nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]), + zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo])) + : (List[Resource], List[Resource]) = { + +val resources = { + if (nonZero._2.isEmpty) { // no user ports were defined +(zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + + } else if (zero._2.isEmpty) { // no random ports were defined +(nonZero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + nonZero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + } + else { // we have user defined and random ports defined +val left = zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))} + +val used = nonZero._2.flatMap{port => + createMesosPortResource(port.value, Some(port.role))} ++ + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))} + +(left, used) + } +} +resources + } + + private case class PortRangeResourceInfo(role: String, value: List[(Long, Long)]) + + private def getRangeResourceWithRoleInfo(res: JList[Resource], name: String) + : List[PortRangeResourceInfo] = { +// A resource can have multiple values in the offer since it can either be from +// a specific role or wildcard. +res.asScala.filter(_.getName == name) + .map{res => PortRangeResourceInfo(res.getRole, res.getRanges.getRangeList.
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60301651 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @param takenPorts ports already used for that slave + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)], + takenPorts: List[Long] = List()): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// If we require a port that is taken we have to decline the offer since mesos +// shares all port ranges on the slave +val contained = for {port <- nonZeroPorts} + yield { +takenPorts.contains(port) + } + +if (contained.contains(true)) { + return false +} + +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) + +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]): (List[Resource], List[Resource], List[Long]) = { + +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") + +val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong, + conf.getInt("spark.blockManager.port", 0).toLong) + +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// reserve non zero ports first + +val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts) + +// reserve actual port numbers for zero ports - not set by the user + +val numOfZeroPorts = portsToCheck.count(_ == 0) + +val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, numOfZeroPorts) + +val zeroResources = reservePorts(nonZeroResources._1, randPorts) + +val (resourcesLeft, resourcesToBeUsed) = createResources(nonZeroResources, zeroResources) + +(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts) + } + + private def createResources( + nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]), + zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo])) + : (List[Resource], List[Resource]) = { + +val resources = { + if (nonZero._2.isEmpty) { // no user ports were defined +(zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + + } else if (zero._2.isEmpty) { // no random ports were defined +(nonZero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + nonZero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + } + else { // we have user defined and random ports defined +val left = zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))} + +val used = nonZero._2.flatMap{port => + createMesosPortResource(port.value, Some(port.role))} ++ + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))} + +(left, used) + } +} +resources + } + + private case class PortRangeResourceInfo(role: String, value: List[(Long, Long)]) + + private def getRangeResourceWithRoleInfo(res: JList[Resource], name: String) + : List[PortRangeResourceInfo] = { +// A resource can have multiple values in the offer since it can either be from +// a specific role or wildcard. +res.asScala.filter(_.getName == name) + .map{res => PortRangeResourceInfo(res.getRole, res.getRanges.getRangeList.
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60301627 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @param takenPorts ports already used for that slave + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)], + takenPorts: List[Long] = List()): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// If we require a port that is taken we have to decline the offer since mesos +// shares all port ranges on the slave +val contained = for {port <- nonZeroPorts} + yield { +takenPorts.contains(port) + } + +if (contained.contains(true)) { + return false +} + +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) + +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]): (List[Resource], List[Resource], List[Long]) = { + +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") + +val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong, + conf.getInt("spark.blockManager.port", 0).toLong) + +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// reserve non zero ports first + +val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts) + +// reserve actual port numbers for zero ports - not set by the user + +val numOfZeroPorts = portsToCheck.count(_ == 0) + +val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, numOfZeroPorts) + +val zeroResources = reservePorts(nonZeroResources._1, randPorts) + +val (resourcesLeft, resourcesToBeUsed) = createResources(nonZeroResources, zeroResources) + +(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts) + } + + private def createResources( + nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]), + zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo])) + : (List[Resource], List[Resource]) = { + +val resources = { + if (nonZero._2.isEmpty) { // no user ports were defined +(zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + + } else if (zero._2.isEmpty) { // no random ports were defined +(nonZero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + nonZero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + } + else { // we have user defined and random ports defined +val left = zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))} + +val used = nonZero._2.flatMap{port => + createMesosPortResource(port.value, Some(port.role))} ++ + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))} + +(left, used) + } +} +resources + } + + private case class PortRangeResourceInfo(role: String, value: List[(Long, Long)]) + + private def getRangeResourceWithRoleInfo(res: JList[Resource], name: String) + : List[PortRangeResourceInfo] = { +// A resource can have multiple values in the offer since it can either be from +// a specific role or wildcard. +res.asScala.filter(_.getName == name) + .map{res => PortRangeResourceInfo(res.getRole, res.getRanges.getRangeList.
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-212104928 Thnx guys i will work on the above and add more info in the doc , rebase and come back. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60301588 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @param takenPorts ports already used for that slave + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)], + takenPorts: List[Long] = List()): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// If we require a port that is taken we have to decline the offer since mesos +// shares all port ranges on the slave +val contained = for {port <- nonZeroPorts} + yield { +takenPorts.contains(port) + } + +if (contained.contains(true)) { + return false +} + +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) + +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]): (List[Resource], List[Resource], List[Long]) = { + +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") + +val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong, + conf.getInt("spark.blockManager.port", 0).toLong) + +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// reserve non zero ports first + +val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts) + +// reserve actual port numbers for zero ports - not set by the user + +val numOfZeroPorts = portsToCheck.count(_ == 0) + +val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, numOfZeroPorts) + +val zeroResources = reservePorts(nonZeroResources._1, randPorts) + +val (resourcesLeft, resourcesToBeUsed) = createResources(nonZeroResources, zeroResources) + +(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts) + } + + private def createResources( + nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]), + zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo])) + : (List[Resource], List[Resource]) = { + +val resources = { + if (nonZero._2.isEmpty) { // no user ports were defined +(zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + + } else if (zero._2.isEmpty) { // no random ports were defined +(nonZero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + nonZero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + } + else { // we have user defined and random ports defined +val left = zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))} + +val used = nonZero._2.flatMap{port => + createMesosPortResource(port.value, Some(port.role))} ++ + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))} + +(left, used) + } +} +resources + } + + private case class PortRangeResourceInfo(role: String, value: List[(Long, Long)]) + + private def getRangeResourceWithRoleInfo(res: JList[Resource], name: String) + : List[PortRangeResourceInfo] = { +// A resource can have multiple values in the offer since it can either be from +// a specific role or wildcard. +res.asScala.filter(_.getName == name) + .map{res => PortRangeResourceInfo(res.getRole, res.getRanges.getRangeList.
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60301605 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @param takenPorts ports already used for that slave + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)], + takenPorts: List[Long] = List()): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// If we require a port that is taken we have to decline the offer since mesos +// shares all port ranges on the slave +val contained = for {port <- nonZeroPorts} + yield { +takenPorts.contains(port) + } + +if (contained.contains(true)) { + return false +} + +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) + +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]): (List[Resource], List[Resource], List[Long]) = { + +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") + +val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong, + conf.getInt("spark.blockManager.port", 0).toLong) + +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// reserve non zero ports first + +val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts) + +// reserve actual port numbers for zero ports - not set by the user + +val numOfZeroPorts = portsToCheck.count(_ == 0) + +val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, numOfZeroPorts) + +val zeroResources = reservePorts(nonZeroResources._1, randPorts) + +val (resourcesLeft, resourcesToBeUsed) = createResources(nonZeroResources, zeroResources) + +(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts) + } + + private def createResources( + nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]), + zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo])) + : (List[Resource], List[Resource]) = { + +val resources = { + if (nonZero._2.isEmpty) { // no user ports were defined +(zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + + } else if (zero._2.isEmpty) { // no random ports were defined +(nonZero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + nonZero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + } + else { // we have user defined and random ports defined +val left = zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))} + +val used = nonZero._2.flatMap{port => + createMesosPortResource(port.value, Some(port.role))} ++ + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))} + +(left, used) + } +} +resources + } + + private case class PortRangeResourceInfo(role: String, value: List[(Long, Long)]) + + private def getRangeResourceWithRoleInfo(res: JList[Resource], name: String) + : List[PortRangeResourceInfo] = { +// A resource can have multiple values in the offer since it can either be from +// a specific role or wildcard. +res.asScala.filter(_.getName == name) + .map{res => PortRangeResourceInfo(res.getRole, res.getRanges.getRangeList.
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60301402 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @param takenPorts ports already used for that slave + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)], + takenPorts: List[Long] = List()): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// If we require a port that is taken we have to decline the offer since mesos +// shares all port ranges on the slave +val contained = for {port <- nonZeroPorts} + yield { +takenPorts.contains(port) + } + +if (contained.contains(true)) { + return false +} + +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) + +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]): (List[Resource], List[Resource], List[Long]) = { + +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") + --- End diff -- ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60301188 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -1978,57 +1978,134 @@ private[spark] object Utils extends Logging { } /** - * Attempt to start a service on the given port, or fail after a number of attempts. - * Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). - * - * @param startPort The initial port to start the service on. - * @param startService Function to start service on a given port. - * This is expected to throw java.net.BindException on port collision. - * @param conf A SparkConf used to get the maximum number of retries when binding to a port. - * @param serviceName Name of the service. - * @return (service: T, port: Int) - */ - def startServiceOnPort[T]( - startPort: Int, +* Attempt to start a service on the given port, or fail after a number of attempts. +* Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). +* It takes into consideration port restrictions through the env var AVAILABLE_PORTS +* +* @param startPort The initial port to start the service on. +* @param startService Function to start service on a given port. +* This is expected to throw java.net.BindException on port collision. +* @param conf A SparkConf used to get the maximum number of retries when binding to a port. +* @param serviceName Name of the service. +* @return (service: T, port: Int) +*/ + def startServiceOnPort[T](startPort: Int, startService: Int => (T, Int), conf: SparkConf, serviceName: String = ""): (T, Int) = { +val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" + +// define some helpers, they all share common data, maybe a service abstract class +// for all services could be a good fit here. + +def portRangeToList(ranges: String): List[(Long, Long)] = { + if (ranges == "") { +return List() + } + ranges.split(" ").map { r => val ret = r.substring(1, r.length - 1).split(",") +(ret(0).toLong, ret(1).toLong) + }.toList +} + +def startOnce(tryPort: Int): (Option[T], Int) = { + val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" + try { +val (service, port) = startService(tryPort) +logInfo(s"Successfully started service$serviceString on port $port.") +(Some(service), port) + } catch { +case e: Exception if isBindCollision(e) => logWarning(s"Service$serviceString " + + s"could not bind on port $tryPort. ") + (None, -1) + } +} + +def retryPort(next: Int => Int, maxRetries: Int): (T, Int) = { + + for (offset <- 0 until maxRetries) { +val tryPort = next(offset) +try { + val (service, port) = startService(tryPort) + logInfo(s"Successfully started service$serviceString on port $port.") + return (service, port) +} catch { + case e: Exception if isBindCollision(e) => +if (offset >= maxRetries) { + val exceptionMessage = +s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!" + val exception = new BindException(exceptionMessage) + // restore original stack trace + exception.setStackTrace(e.getStackTrace) + throw exception +} +logWarning(s"Service$serviceString could not bind on port $tryPort.") +} + } + // Should never happen + throw new SparkException(s"Failed to start service$serviceString on port $startPort") +} + +def startFromAvailable(rand: Boolean = false): (T, Int) = { + val ports = portRangeToList(sys.env.get("AVAILABLE_RAND_PORTS").get) --- End diff -- I will rename the env variable and discuss other approaches if you want. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60299848 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -1978,57 +1978,134 @@ private[spark] object Utils extends Logging { } /** - * Attempt to start a service on the given port, or fail after a number of attempts. - * Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). - * - * @param startPort The initial port to start the service on. - * @param startService Function to start service on a given port. - * This is expected to throw java.net.BindException on port collision. - * @param conf A SparkConf used to get the maximum number of retries when binding to a port. - * @param serviceName Name of the service. - * @return (service: T, port: Int) - */ - def startServiceOnPort[T]( - startPort: Int, +* Attempt to start a service on the given port, or fail after a number of attempts. +* Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). +* It takes into consideration port restrictions through the env var AVAILABLE_PORTS +* +* @param startPort The initial port to start the service on. +* @param startService Function to start service on a given port. +* This is expected to throw java.net.BindException on port collision. +* @param conf A SparkConf used to get the maximum number of retries when binding to a port. +* @param serviceName Name of the service. +* @return (service: T, port: Int) +*/ + def startServiceOnPort[T](startPort: Int, startService: Int => (T, Int), conf: SparkConf, serviceName: String = ""): (T, Int) = { +val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" + +// define some helpers, they all share common data, maybe a service abstract class +// for all services could be a good fit here. + +def portRangeToList(ranges: String): List[(Long, Long)] = { + if (ranges == "") { +return List() + } + ranges.split(" ").map { r => val ret = r.substring(1, r.length - 1).split(",") +(ret(0).toLong, ret(1).toLong) + }.toList +} + +def startOnce(tryPort: Int): (Option[T], Int) = { + val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" + try { +val (service, port) = startService(tryPort) +logInfo(s"Successfully started service$serviceString on port $port.") +(Some(service), port) + } catch { +case e: Exception if isBindCollision(e) => logWarning(s"Service$serviceString " + + s"could not bind on port $tryPort. ") + (None, -1) + } +} + +def retryPort(next: Int => Int, maxRetries: Int): (T, Int) = { + + for (offset <- 0 until maxRetries) { +val tryPort = next(offset) +try { + val (service, port) = startService(tryPort) + logInfo(s"Successfully started service$serviceString on port $port.") + return (service, port) +} catch { + case e: Exception if isBindCollision(e) => +if (offset >= maxRetries) { + val exceptionMessage = +s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!" + val exception = new BindException(exceptionMessage) + // restore original stack trace + exception.setStackTrace(e.getStackTrace) + throw exception +} +logWarning(s"Service$serviceString could not bind on port $tryPort.") +} + } + // Should never happen + throw new SparkException(s"Failed to start service$serviceString on port $startPort") +} + +def startFromAvailable(rand: Boolean = false): (T, Int) = { + val ports = portRangeToList(sys.env.get("AVAILABLE_RAND_PORTS").get) --- End diff -- It is called when AVAILABLE_RAND_PORTS is set. So if spark.executor.port or spark.blockManager.port are set to a specific value then this will be used so no collision... Again... if they are not set.. startFromAvailable(true) -> retryPort(tryPort, maxRetries) will be called . The former shuffles the ports and pass them down to the latter. Also retryPort uses the old logic where each port will be tried until there is one free. Another approach is to preassign spark.executor.port,spark.blockManager.port with some offered values explicitly and carry those values with AVAILABLE_RAND_PORTS.
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60296064 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @param takenPorts ports already used for that slave + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)], + takenPorts: List[Long] = List()): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// If we require a port that is taken we have to decline the offer since mesos +// shares all port ranges on the slave +val contained = for {port <- nonZeroPorts} + yield { +takenPorts.contains(port) + } + +if (contained.contains(true)) { + return false +} + +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) + +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]): (List[Resource], List[Resource], List[Long]) = { + +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") + +val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong, + conf.getInt("spark.blockManager.port", 0).toLong) + +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// reserve non zero ports first + +val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts) + +// reserve actual port numbers for zero ports - not set by the user + +val numOfZeroPorts = portsToCheck.count(_ == 0) + +val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, numOfZeroPorts) --- End diff -- Just keep the semantics the same for 0. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user tnachen commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60257784 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -1978,57 +1978,134 @@ private[spark] object Utils extends Logging { } /** - * Attempt to start a service on the given port, or fail after a number of attempts. - * Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). - * - * @param startPort The initial port to start the service on. - * @param startService Function to start service on a given port. - * This is expected to throw java.net.BindException on port collision. - * @param conf A SparkConf used to get the maximum number of retries when binding to a port. - * @param serviceName Name of the service. - * @return (service: T, port: Int) - */ - def startServiceOnPort[T]( - startPort: Int, +* Attempt to start a service on the given port, or fail after a number of attempts. +* Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). +* It takes into consideration port restrictions through the env var AVAILABLE_PORTS +* +* @param startPort The initial port to start the service on. +* @param startService Function to start service on a given port. +* This is expected to throw java.net.BindException on port collision. +* @param conf A SparkConf used to get the maximum number of retries when binding to a port. +* @param serviceName Name of the service. +* @return (service: T, port: Int) +*/ + def startServiceOnPort[T](startPort: Int, startService: Int => (T, Int), conf: SparkConf, serviceName: String = ""): (T, Int) = { +val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" + +// define some helpers, they all share common data, maybe a service abstract class +// for all services could be a good fit here. + +def portRangeToList(ranges: String): List[(Long, Long)] = { + if (ranges == "") { +return List() + } + ranges.split(" ").map { r => val ret = r.substring(1, r.length - 1).split(",") +(ret(0).toLong, ret(1).toLong) + }.toList +} + +def startOnce(tryPort: Int): (Option[T], Int) = { + val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" + try { +val (service, port) = startService(tryPort) +logInfo(s"Successfully started service$serviceString on port $port.") +(Some(service), port) + } catch { +case e: Exception if isBindCollision(e) => logWarning(s"Service$serviceString " + + s"could not bind on port $tryPort. ") + (None, -1) + } +} + +def retryPort(next: Int => Int, maxRetries: Int): (T, Int) = { + + for (offset <- 0 until maxRetries) { +val tryPort = next(offset) +try { + val (service, port) = startService(tryPort) + logInfo(s"Successfully started service$serviceString on port $port.") + return (service, port) +} catch { + case e: Exception if isBindCollision(e) => +if (offset >= maxRetries) { + val exceptionMessage = +s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!" + val exception = new BindException(exceptionMessage) + // restore original stack trace + exception.setStackTrace(e.getStackTrace) + throw exception +} +logWarning(s"Service$serviceString could not bind on port $tryPort.") +} + } + // Should never happen + throw new SparkException(s"Failed to start service$serviceString on port $startPort") +} + +def startFromAvailable(rand: Boolean = false): (T, Int) = { + val ports = portRangeToList(sys.env.get("AVAILABLE_RAND_PORTS").get) --- End diff -- Btw if startFromAvailable is called multiple times we might clash ports right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user tnachen commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60257720 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -1978,57 +1978,134 @@ private[spark] object Utils extends Logging { } /** - * Attempt to start a service on the given port, or fail after a number of attempts. - * Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). - * - * @param startPort The initial port to start the service on. - * @param startService Function to start service on a given port. - * This is expected to throw java.net.BindException on port collision. - * @param conf A SparkConf used to get the maximum number of retries when binding to a port. - * @param serviceName Name of the service. - * @return (service: T, port: Int) - */ - def startServiceOnPort[T]( - startPort: Int, +* Attempt to start a service on the given port, or fail after a number of attempts. +* Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0). +* It takes into consideration port restrictions through the env var AVAILABLE_PORTS +* +* @param startPort The initial port to start the service on. +* @param startService Function to start service on a given port. +* This is expected to throw java.net.BindException on port collision. +* @param conf A SparkConf used to get the maximum number of retries when binding to a port. +* @param serviceName Name of the service. +* @return (service: T, port: Int) +*/ + def startServiceOnPort[T](startPort: Int, startService: Int => (T, Int), conf: SparkConf, serviceName: String = ""): (T, Int) = { +val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" + +// define some helpers, they all share common data, maybe a service abstract class +// for all services could be a good fit here. + +def portRangeToList(ranges: String): List[(Long, Long)] = { + if (ranges == "") { +return List() + } + ranges.split(" ").map { r => val ret = r.substring(1, r.length - 1).split(",") +(ret(0).toLong, ret(1).toLong) + }.toList +} + +def startOnce(tryPort: Int): (Option[T], Int) = { + val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" + try { +val (service, port) = startService(tryPort) +logInfo(s"Successfully started service$serviceString on port $port.") +(Some(service), port) + } catch { +case e: Exception if isBindCollision(e) => logWarning(s"Service$serviceString " + + s"could not bind on port $tryPort. ") + (None, -1) + } +} + +def retryPort(next: Int => Int, maxRetries: Int): (T, Int) = { + + for (offset <- 0 until maxRetries) { +val tryPort = next(offset) +try { + val (service, port) = startService(tryPort) + logInfo(s"Successfully started service$serviceString on port $port.") + return (service, port) +} catch { + case e: Exception if isBindCollision(e) => +if (offset >= maxRetries) { + val exceptionMessage = +s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!" + val exception = new BindException(exceptionMessage) + // restore original stack trace + exception.setStackTrace(e.getStackTrace) + throw exception +} +logWarning(s"Service$serviceString could not bind on port $tryPort.") +} + } + // Should never happen + throw new SparkException(s"Failed to start service$serviceString on port $startPort") +} + +def startFromAvailable(rand: Boolean = false): (T, Int) = { + val ports = portRangeToList(sys.env.get("AVAILABLE_RAND_PORTS").get) --- End diff -- I'm not sure this is the best approach yet, but regardless we should name spark specific env variable with SPARK_* (SPARK_AVAILABLE_PORTS). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional comm
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user tnachen commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60255016 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @param takenPorts ports already used for that slave + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)], + takenPorts: List[Long] = List()): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// If we require a port that is taken we have to decline the offer since mesos +// shares all port ranges on the slave +val contained = for {port <- nonZeroPorts} + yield { +takenPorts.contains(port) + } + +if (contained.contains(true)) { + return false +} + +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) + +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]): (List[Resource], List[Resource], List[Long]) = { + +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") + +val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong, + conf.getInt("spark.blockManager.port", 0).toLong) + +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// reserve non zero ports first + +val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts) + +// reserve actual port numbers for zero ports - not set by the user + +val numOfZeroPorts = portsToCheck.count(_ == 0) + +val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, numOfZeroPorts) --- End diff -- What's the intention to pick random ports? Why not just pick the first few ones? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user tnachen commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60254817 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @param takenPorts ports already used for that slave + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)], + takenPorts: List[Long] = List()): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// If we require a port that is taken we have to decline the offer since mesos +// shares all port ranges on the slave +val contained = for {port <- nonZeroPorts} + yield { +takenPorts.contains(port) + } + +if (contained.contains(true)) { + return false +} + +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) + +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]): (List[Resource], List[Resource], List[Long]) = { + +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") + +val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong, + conf.getInt("spark.blockManager.port", 0).toLong) + +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// reserve non zero ports first + +val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts) + +// reserve actual port numbers for zero ports - not set by the user + +val numOfZeroPorts = portsToCheck.count(_ == 0) + +val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, numOfZeroPorts) + +val zeroResources = reservePorts(nonZeroResources._1, randPorts) + +val (resourcesLeft, resourcesToBeUsed) = createResources(nonZeroResources, zeroResources) + +(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts) + } + + private def createResources( + nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]), + zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo])) + : (List[Resource], List[Resource]) = { + +val resources = { + if (nonZero._2.isEmpty) { // no user ports were defined +(zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + + } else if (zero._2.isEmpty) { // no random ports were defined +(nonZero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + nonZero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + } + else { // we have user defined and random ports defined +val left = zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))} + +val used = nonZero._2.flatMap{port => + createMesosPortResource(port.value, Some(port.role))} ++ + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))} + +(left, used) + } +} +resources + } + + private case class PortRangeResourceInfo(role: String, value: List[(Long, Long)]) + + private def getRangeResourceWithRoleInfo(res: JList[Resource], name: String) + : List[PortRangeResourceInfo] = { +// A resource can have multiple values in the offer since it can either be from +// a specific role or wildcard. +res.asScala.filter(_.getName == name) + .map{res => PortRangeResourceInfo(res.getRole, res.getRanges.getRangeList
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user tnachen commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60254758 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @param takenPorts ports already used for that slave + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)], + takenPorts: List[Long] = List()): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// If we require a port that is taken we have to decline the offer since mesos +// shares all port ranges on the slave +val contained = for {port <- nonZeroPorts} + yield { +takenPorts.contains(port) + } + +if (contained.contains(true)) { + return false +} + +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) + +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]): (List[Resource], List[Resource], List[Long]) = { + +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") + +val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong, + conf.getInt("spark.blockManager.port", 0).toLong) + +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// reserve non zero ports first + +val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts) + +// reserve actual port numbers for zero ports - not set by the user + +val numOfZeroPorts = portsToCheck.count(_ == 0) + +val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, numOfZeroPorts) + +val zeroResources = reservePorts(nonZeroResources._1, randPorts) + +val (resourcesLeft, resourcesToBeUsed) = createResources(nonZeroResources, zeroResources) + +(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts) + } + + private def createResources( + nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]), + zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo])) + : (List[Resource], List[Resource]) = { + +val resources = { + if (nonZero._2.isEmpty) { // no user ports were defined +(zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + + } else if (zero._2.isEmpty) { // no random ports were defined +(nonZero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + nonZero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + } + else { // we have user defined and random ports defined +val left = zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))} + +val used = nonZero._2.flatMap{port => + createMesosPortResource(port.value, Some(port.role))} ++ + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))} + +(left, used) + } +} +resources + } + + private case class PortRangeResourceInfo(role: String, value: List[(Long, Long)]) + + private def getRangeResourceWithRoleInfo(res: JList[Resource], name: String) + : List[PortRangeResourceInfo] = { +// A resource can have multiple values in the offer since it can either be from +// a specific role or wildcard. +res.asScala.filter(_.getName == name) + .map{res => PortRangeResourceInfo(res.getRole, res.getRanges.getRangeList
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user tnachen commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60254665 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @param takenPorts ports already used for that slave + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)], + takenPorts: List[Long] = List()): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// If we require a port that is taken we have to decline the offer since mesos +// shares all port ranges on the slave +val contained = for {port <- nonZeroPorts} + yield { +takenPorts.contains(port) + } + +if (contained.contains(true)) { + return false +} + +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) + +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]): (List[Resource], List[Resource], List[Long]) = { + +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") + +val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong, + conf.getInt("spark.blockManager.port", 0).toLong) + +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// reserve non zero ports first + +val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts) + +// reserve actual port numbers for zero ports - not set by the user + +val numOfZeroPorts = portsToCheck.count(_ == 0) + +val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, numOfZeroPorts) + +val zeroResources = reservePorts(nonZeroResources._1, randPorts) + +val (resourcesLeft, resourcesToBeUsed) = createResources(nonZeroResources, zeroResources) + +(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts) + } + + private def createResources( + nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]), + zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo])) + : (List[Resource], List[Resource]) = { + +val resources = { + if (nonZero._2.isEmpty) { // no user ports were defined +(zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + + } else if (zero._2.isEmpty) { // no random ports were defined +(nonZero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + nonZero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + } + else { // we have user defined and random ports defined +val left = zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))} + +val used = nonZero._2.flatMap{port => + createMesosPortResource(port.value, Some(port.role))} ++ + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))} + +(left, used) + } +} +resources + } + + private case class PortRangeResourceInfo(role: String, value: List[(Long, Long)]) + + private def getRangeResourceWithRoleInfo(res: JList[Resource], name: String) + : List[PortRangeResourceInfo] = { +// A resource can have multiple values in the offer since it can either be from +// a specific role or wildcard. +res.asScala.filter(_.getName == name) + .map{res => PortRangeResourceInfo(res.getRole, res.getRanges.getRangeList
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user tnachen commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60254778 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @param takenPorts ports already used for that slave + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)], + takenPorts: List[Long] = List()): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// If we require a port that is taken we have to decline the offer since mesos +// shares all port ranges on the slave +val contained = for {port <- nonZeroPorts} + yield { +takenPorts.contains(port) + } + +if (contained.contains(true)) { + return false +} + +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) + +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]): (List[Resource], List[Resource], List[Long]) = { + +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") + +val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong, + conf.getInt("spark.blockManager.port", 0).toLong) + +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// reserve non zero ports first + +val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts) + +// reserve actual port numbers for zero ports - not set by the user + +val numOfZeroPorts = portsToCheck.count(_ == 0) + +val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, numOfZeroPorts) + +val zeroResources = reservePorts(nonZeroResources._1, randPorts) + +val (resourcesLeft, resourcesToBeUsed) = createResources(nonZeroResources, zeroResources) + +(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts) + } + + private def createResources( + nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]), + zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo])) + : (List[Resource], List[Resource]) = { + +val resources = { + if (nonZero._2.isEmpty) { // no user ports were defined +(zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + + } else if (zero._2.isEmpty) { // no random ports were defined +(nonZero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + nonZero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + } + else { // we have user defined and random ports defined +val left = zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))} + +val used = nonZero._2.flatMap{port => + createMesosPortResource(port.value, Some(port.role))} ++ + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))} + +(left, used) + } +} +resources + } + + private case class PortRangeResourceInfo(role: String, value: List[(Long, Long)]) + + private def getRangeResourceWithRoleInfo(res: JList[Resource], name: String) + : List[PortRangeResourceInfo] = { +// A resource can have multiple values in the offer since it can either be from +// a specific role or wildcard. +res.asScala.filter(_.getName == name) + .map{res => PortRangeResourceInfo(res.getRole, res.getRanges.getRangeList
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user tnachen commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60254689 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @param takenPorts ports already used for that slave + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)], + takenPorts: List[Long] = List()): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// If we require a port that is taken we have to decline the offer since mesos +// shares all port ranges on the slave +val contained = for {port <- nonZeroPorts} + yield { +takenPorts.contains(port) + } + +if (contained.contains(true)) { + return false +} + +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) + +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]): (List[Resource], List[Resource], List[Long]) = { + +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") + +val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong, + conf.getInt("spark.blockManager.port", 0).toLong) + +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// reserve non zero ports first + +val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts) + +// reserve actual port numbers for zero ports - not set by the user + +val numOfZeroPorts = portsToCheck.count(_ == 0) + +val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, numOfZeroPorts) + +val zeroResources = reservePorts(nonZeroResources._1, randPorts) + +val (resourcesLeft, resourcesToBeUsed) = createResources(nonZeroResources, zeroResources) + +(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts) + } + + private def createResources( + nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]), + zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo])) + : (List[Resource], List[Resource]) = { + +val resources = { + if (nonZero._2.isEmpty) { // no user ports were defined +(zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + + } else if (zero._2.isEmpty) { // no random ports were defined +(nonZero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + nonZero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + } + else { // we have user defined and random ports defined +val left = zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))} + +val used = nonZero._2.flatMap{port => + createMesosPortResource(port.value, Some(port.role))} ++ + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))} + +(left, used) + } +} +resources + } + + private case class PortRangeResourceInfo(role: String, value: List[(Long, Long)]) + + private def getRangeResourceWithRoleInfo(res: JList[Resource], name: String) + : List[PortRangeResourceInfo] = { +// A resource can have multiple values in the offer since it can either be from +// a specific role or wildcard. +res.asScala.filter(_.getName == name) + .map{res => PortRangeResourceInfo(res.getRole, res.getRanges.getRangeList
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user tnachen commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60254568 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @param takenPorts ports already used for that slave + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)], + takenPorts: List[Long] = List()): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// If we require a port that is taken we have to decline the offer since mesos +// shares all port ranges on the slave +val contained = for {port <- nonZeroPorts} + yield { +takenPorts.contains(port) + } + +if (contained.contains(true)) { + return false +} + +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) + +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]): (List[Resource], List[Resource], List[Long]) = { + +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") + +val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong, + conf.getInt("spark.blockManager.port", 0).toLong) + +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// reserve non zero ports first + +val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts) + +// reserve actual port numbers for zero ports - not set by the user + +val numOfZeroPorts = portsToCheck.count(_ == 0) + +val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, numOfZeroPorts) + +val zeroResources = reservePorts(nonZeroResources._1, randPorts) + +val (resourcesLeft, resourcesToBeUsed) = createResources(nonZeroResources, zeroResources) + +(resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts) + } + + private def createResources( + nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]), + zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo])) + : (List[Resource], List[Resource]) = { + +val resources = { + if (nonZero._2.isEmpty) { // no user ports were defined +(zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + + } else if (zero._2.isEmpty) { // no random ports were defined +(nonZero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + nonZero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + } + else { // we have user defined and random ports defined +val left = zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))} + +val used = nonZero._2.flatMap{port => + createMesosPortResource(port.value, Some(port.role))} ++ + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))} + +(left, used) + } +} +resources + } + + private case class PortRangeResourceInfo(role: String, value: List[(Long, Long)]) + + private def getRangeResourceWithRoleInfo(res: JList[Resource], name: String) + : List[PortRangeResourceInfo] = { +// A resource can have multiple values in the offer since it can either be from +// a specific role or wildcard. +res.asScala.filter(_.getName == name) + .map{res => PortRangeResourceInfo(res.getRole, res.getRanges.getRangeList
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user tnachen commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60254354 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @param takenPorts ports already used for that slave + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)], + takenPorts: List[Long] = List()): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// If we require a port that is taken we have to decline the offer since mesos +// shares all port ranges on the slave +val contained = for {port <- nonZeroPorts} + yield { +takenPorts.contains(port) + } + +if (contained.contains(true)) { + return false +} + +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) + +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]): (List[Resource], List[Resource], List[Long]) = { + +val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") + --- End diff -- Remove all the whitespaces in between --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user tnachen commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60254407 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @param takenPorts ports already used for that slave + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)], + takenPorts: List[Long] = List()): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// If we require a port that is taken we have to decline the offer since mesos +// shares all port ranges on the slave +val contained = for {port <- nonZeroPorts} + yield { +takenPorts.contains(port) + } + +if (contained.contains(true)) { + return false +} + +val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) + +// make sure we have enough ports to allocate per offer +ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]): (List[Resource], List[Resource], List[Long]) = { --- End diff -- Please specify return type --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user tnachen commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60253926 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @param takenPorts ports already used for that slave + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)], + takenPorts: List[Long] = List()): Boolean = { + +def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) +} + +val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) +val nonZeroPorts = portsToCheck.filter(_ != 0) + +// If we require a port that is taken we have to decline the offer since mesos +// shares all port ranges on the slave +val contained = for {port <- nonZeroPorts} --- End diff -- How about just: if (nonZeroPorts.exists(port => takenPorts.contains(port)) { return false } --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user tnachen commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60253411 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @param takenPorts ports already used for that slave + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)], --- End diff -- Each input parameter in each line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user tnachen commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60253302 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -171,7 +186,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging { /** * Partition the existing set of resources into two groups, those remaining to be * scheduled and those requested to be used for a new task. - * @param resources The full list of available resources +* +* @param resources The full list of available resources --- End diff -- Same --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user tnachen commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60253331 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -333,7 +350,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging { /** * Return the amount of memory to allocate to each executor, taking into account * container overheads. - * @param sc SparkContext to use to get `spark.mesos.executor.memoryOverhead` value +* +* @param sc SparkContext to use to get `spark.mesos.executor.memoryOverhead` value --- End diff -- Same --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user tnachen commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60253191 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -45,7 +45,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging { /** * Creates a new MesosSchedulerDriver that communicates to the Mesos master. - * @param masterUrl The url to connect to Mesos master +* +* @param masterUrl The url to connect to Mesos master --- End diff -- Is this intentional? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user tnachen commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60253095 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -154,7 +161,8 @@ private[spark] class CoarseMesosSchedulerBackend( startScheduler(driver) } - def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo = { + def createCommand(offer: Offer, numCores: Int, taskId: String, randPorts: List[Long] = List()) --- End diff -- I think according to spark style we should move the method parameters into each line in this case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60218121 --- Diff: core/src/main/scala/org/apache/spark/SparkEnv.scala --- @@ -244,6 +244,7 @@ object SparkEnv extends Logging { conf.set("spark.driver.port", rpcEnv.address.port.toString) } else if (rpcEnv.address != null) { conf.set("spark.executor.port", rpcEnv.address.port.toString) + logInfo(s"Starting executor with port: ${rpcEnv.address.port.toString}") --- End diff -- Sure. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60218050 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -75,6 +75,13 @@ private[spark] class CoarseMesosSchedulerBackend( val coresByTaskId = new HashMap[String, Int] var totalCoresAcquired = 0 + // Ports acquired so far + // SlaveID -> ports + val takenPortsPerSlave = new mutable.HashMap[String, List[Long]] --- End diff -- For now it is used by checkPorts utility method only, to check for the case where spark.executor.port or spark.blockManager.port have been defined and reserved on a slave and if so this port limits the number of execturs as we have discussed . So i keep track of what is allocated so i can deny an offer fast. Of course i could just check if those values are contained in the offered ports anyway. I guess i could refactor this part and ommit this state keeping if i dont miss anything here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on the pull request: https://github.com/apache/spark/pull/11157#issuecomment-211870773 Thnx @mgummelt. Yes i follow that. here is the document https://docs.google.com/document/d/1UUPQBtDCuo5M10REDRJiBFa0Vws0inemxcOP9LjTACw/edit?usp=sharing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11714][Mesos] Make Spark on Mesos honor...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60215292 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala --- @@ -154,7 +161,8 @@ private[spark] class CoarseMesosSchedulerBackend( startScheduler(driver) } - def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo = { + def createCommand(offer: Offer, numCores: Int, taskId: String, randPorts: List[Long] = List()) --- End diff -- Sure --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org