Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r62731739 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -356,4 +374,229 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)]): Boolean = { + + def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) + } + + val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) + val nonZeroPorts = portsToCheck.filter(_ != 0) + val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) + // make sure we have enough ports to allocate per offer + ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]) + : (List[Resource], List[Resource], List[Long]) = { + val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") + val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong, + conf.getInt("spark.blockManager.port", 0).toLong) + val nonZeroPorts = portsToCheck.filter(_ != 0) + // reserve non zero ports first + val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts) + // reserve actual port numbers for zero ports - not set by the user + val numOfZeroPorts = portsToCheck.count(_ == 0) + val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, numOfZeroPorts) + val zeroResources = reservePorts(nonZeroResources._1, randPorts) + val (resourcesLeft, resourcesToBeUsed) = createResources(nonZeroResources, zeroResources) + (resourcesLeft, resourcesToBeUsed, nonZeroPorts ++ randPorts) + } + + private def createResources( + nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]), + zero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo])) + : (List[Resource], List[Resource]) = { + val resources = { + if (nonZero._2.isEmpty) { // no user ports were defined + (zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + + } else if (zero._2.isEmpty) { // no random ports were defined + (nonZero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))}, + nonZero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))}) + } + else { // we have user defined and random ports defined + val left = zero._1.flatMap{port => createMesosPortResource(port.value, Some(port.role))} + val used = nonZero._2.flatMap{port => + createMesosPortResource(port.value, Some(port.role))} ++ + zero._2.flatMap{port => createMesosPortResource(port.value, Some(port.role))} + (left, used) + } + } + resources + } + + private case class PortRangeResourceInfo(role: String, value: List[(Long, Long)]) + + private def getRangeResourceWithRoleInfo(res: JList[Resource], name: String) + : List[PortRangeResourceInfo] = { + // A resource can have multiple values in the offer since it can either be from + // a specific role or wildcard. + // Extract role info and port range for every port resource in the offer + res.asScala.filter(_.getName == name) + .map{res => PortRangeResourceInfo(res.getRole, res.getRanges.getRangeList.asScala + .map(r => (r.getBegin, r.getEnd)).toList) }.toList + } + + /** Helper method to get a pair of assigned and remaining ports along with role info */ + private def reservePorts( + availablePortRanges: List[PortRangeResourceInfo], + wantedPorts: List[Long]) + : (List[PortRangeResourceInfo], List[PortRangeResourceInfo]) = { + if (wantedPorts.isEmpty) { // port list is empty we didnt consume any resources + return (availablePortRanges, List()) + } + var tmpLeft = availablePortRanges + val tmpRanges = for {port <- wantedPorts} + yield { + val ret = findPortAndSplitRange(port, tmpLeft) + val rangeToRemove = ret._1 + val diffRanges = tmpLeft.filterNot{r => r == rangeToRemove} + val newRangesLeft = diffRanges ++ List(ret._2).flatMap(p => p) + tmpLeft = newRangesLeft + ret + } + val rangesToRemove = tmpRanges.map(x => x._1) + val newRangesLeft = (availablePortRanges ++ tmpRanges.flatMap{x => x._2}) + .flatMap{r => removeRanges(r, rangesToRemove)} + val newRanges = tmpRanges.map{r => PortRangeResourceInfo(r._1.role, List((r._3, r._3)))} + // return a list of the resources left and one with the newly assigned ones + // we need to carry the role info as well + (newRangesLeft, newRanges) + } + + private def removeRanges( + rangeA: PortRangeResourceInfo, + rangesToRemove: List[PortRangeResourceInfo]): Option[PortRangeResourceInfo] = { + val ranges = rangeA.value.filterNot(rangesToRemove.flatMap{_.value}.toSet) + if (ranges.isEmpty) { + None + } else { + Some(PortRangeResourceInfo(rangeA.role, ranges)) + } + } + + private def createMesosPortResource( + ranges: List[(Long, Long)], + role: Option[String] = None): List[Resource] = { + ranges.map { range => + val rangeValue = Value.Range.newBuilder() + rangeValue.setBegin(range._1) + rangeValue.setEnd(range._2) + val builder = Resource.newBuilder() + .setName("ports") + .setType(Value.Type.RANGES) + .setRanges(Value.Ranges.newBuilder().addRange(rangeValue)) + role.foreach { r => builder.setRole(r) } + builder.build() + } + } + + private def pickRandomPortsFromRanges( + ranges: List[PortRangeResourceInfo], + numToPick: Int): List[Long] = { + if (numToPick == 0) { + return List() + } + val ports = scala.util.Random. + shuffle(ranges.flatMap(p => p.value.flatMap(r => (r._1 to r._2).toList)) + .distinct + ) + require(ports.size >= numToPick) + ports.take(numToPick) + } + + private def findPortAndSplitRange(port: Long, ranges: List[PortRangeResourceInfo]) + : (PortRangeResourceInfo, Option[PortRangeResourceInfo], Long) = { + val rangePortInfo = ranges + .map{p => val tmpList = List(p.value.filter(r => r._1 <= port & r._2 >= port)) + PortRangeResourceInfo(p.role, tmpList.head)}.filterNot(p => p.value.isEmpty) + .head + val range = rangePortInfo.value.head + val ret = { + if (port == range._1 && port == range._2) { + None + } + else if (port == range._1 && port != range._2) { + Some(PortRangeResourceInfo(rangePortInfo.role, List((port + 1, range._2)))) + } + else if (port == range._2 && port != range._2) { + Some(PortRangeResourceInfo(rangePortInfo.role, List((range._1, port - 1)))) + } + else { + // split range + val splitList = List((range._1, port - 1), (port + 1, range._2)) + Some(PortRangeResourceInfo(rangePortInfo.role, splitList)) + } + } + (rangePortInfo, ret, port) + } + + /** + * Retrieves the port resources from a list of mesos offered resources + * + * @param resources the mesos resources to parse + * @return the port resources only + */ + def getPortResources(resources: List[Resource]): (List[Resource], List[Resource]) = { + resources.partition {r => !(r.getType == Value.Type.RANGES & r.getName == "ports")} --- End diff -- double ampersand for consistency
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org