Github user skonto commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11157#discussion_r73215886
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
    @@ -357,4 +375,191 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
         
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", 
"120s")
       }
     
    +  /**
    +   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
    +   *
    +   * @param conf the Spark Config
    +   * @param ports the list of ports to check
    +   * @return true if ports are within range false otherwise
    +   */
    +  protected def checkPorts(conf: SparkConf, ports: List[(Long, Long)]): 
Boolean = {
    +
    +    def checkIfInRange(port: Long, ps: List[(Long, Long)]): Boolean = {
    +      ps.exists(r => r._1 <= port & r._2 >= port)
    +    }
    +
    +    val portsToCheck = nonZeroPortValuesFromConfig(conf)
    +    val withinRange = portsToCheck.forall(p => checkIfInRange(p, ports))
    +    // make sure we have enough ports to allocate per offer
    +    ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
    +  }
    +
    +  /**
    +   * Partitions port resources.
    +   *
    +   * @param portsToAssign non-zero ports to assign
    +   * @param offeredResources the ports offered
    +   * @return resources left, port resources to be used.
    +   */
    +  def partitionPortResources(portsToAssign: List[Long], offeredResources: 
List[Resource])
    +    : (List[Resource], List[Resource]) = {
    +    if (portsToAssign.isEmpty) {
    +      (offeredResources, List[Resource]())
    +    }
    +    // partition port offers
    +    val (resourcesWithoutPorts, portResources) = 
filterPortResources(offeredResources)
    +    val offeredPortRanges = 
getRangeResourceWithRoleInfo(portResources.asJava, "ports")
    +    // reserve non-zero ports
    +    val nonZeroResources = reservePorts(offeredPortRanges, portsToAssign)
    +
    +    createResourcesFromAssignedPorts(nonZeroResources)
    +  }
    +
    +  /**
    +   * Returns known port name used by the executor process.
    +   * @return the port name
    +   */
    +  def managedPortNames() : List[String] = List("spark.executor.port", 
"spark.blockManager.port")
    +
    +  /**
    +   * The values of the non-zero ports to be used by the executor process.
    +   * @param conf the spark config to use
    +   * @return the ono-zero values of the ports
    +   */
    +  def nonZeroPortValuesFromConfig(conf: SparkConf): List[Long] = {
    +    managedPortNames().map(conf.getLong(_, 0)).filter( _ != 0)
    +  }
    +
    +  /**
    +   * It gets a tuple for the non-zero port assigned resources.
    +   * First member of the tuple represents resources left while the second
    +   * resources used. A tuple is returned with final resources left (fist 
member)
    +   * and the resources used (second member).
    +   */
    +  private def createResourcesFromAssignedPorts(
    +      nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]))
    +    : (List[Resource], List[Resource]) = {
    +        (nonZero._1.flatMap{port => createMesosPortResource(port.range, 
Some(port.role))},
    +          nonZero._2.flatMap{port => createMesosPortResource(port.range, 
Some(port.role))})
    +  }
    +
    +  private case class PortRangeResourceInfo(role: String, range: 
List[(Long, Long)])
    +
    +  /**
    +   * A resource can have multiple values in the offer since it can either 
be from
    +   * a specific role or wildcard.
    +   * Extract role info and port range for every port resource in the offer
    +   */
    +  private def getRangeResourceWithRoleInfo(resources: JList[Resource], 
name: String)
    +    : List[PortRangeResourceInfo] = {
    +    resources.asScala.filter(_.getName == name).
    +      map{resource =>
    +        PortRangeResourceInfo(resource.getRole, 
resource.getRanges.getRangeList.asScala
    +        .map(r => (r.getBegin, r.getEnd)).toList)
    +      }.toList
    +  }
    +
    +  /** Helper method to get a pair of assigned and remaining ports along 
with role info */
    +  private def reservePorts(
    +      offeredPortRanges: List[PortRangeResourceInfo],
    +      requestedPorts: List[Long])
    +    : (List[PortRangeResourceInfo], List[PortRangeResourceInfo]) = {
    +    var tmpLeft = offeredPortRanges
    +    val tmpRanges = for {port <- requestedPorts}
    +      yield {
    +        val ret = findPortAndSplitRange(port, tmpLeft)
    +        val rangeToRemove = ret._1
    +        val diffRanges = tmpLeft.filterNot{r => r == rangeToRemove}
    +        val newRangesLeft = diffRanges ++ List(ret._2).flatMap(p => p)
    +        tmpLeft = newRangesLeft
    +        ret
    +      }
    +    val rangesToRemove = tmpRanges.map(x => x._1)
    +    val newRangesLeft = (offeredPortRanges ++ tmpRanges.flatMap{x => x._2})
    +      .flatMap{r => removeRanges(r, rangesToRemove)}
    +    val newRanges = tmpRanges.map{r => PortRangeResourceInfo(r._1.role, 
List((r._3, r._3)))}
    +    // return a list of the resources left and one with the newly assigned 
ones
    +    // we need to carry the role info as well
    +    (newRangesLeft, newRanges)
    +  }
    +
    +  private def removeRanges(
    +      rangeA: PortRangeResourceInfo,
    +      rangesToRemove: List[PortRangeResourceInfo]): 
Option[PortRangeResourceInfo] = {
    +    val ranges = 
rangeA.range.filterNot(rangesToRemove.flatMap{_.range}.toSet)
    +    if (ranges.isEmpty) {
    +      None
    +    } else {
    +      Some(PortRangeResourceInfo(rangeA.role, ranges))
    +    }
    +  }
    +
    +  private def createMesosPortResource(
    +      ranges: List[(Long, Long)],
    +      role: Option[String] = None): List[Resource] = {
    +    ranges.map { range =>
    +      val rangeValue = Value.Range.newBuilder()
    +      rangeValue.setBegin(range._1)
    +      rangeValue.setEnd(range._2)
    +      val builder = Resource.newBuilder()
    +        .setName("ports")
    +        .setType(Value.Type.RANGES)
    +        .setRanges(Value.Ranges.newBuilder().addRange(rangeValue))
    +      role.foreach { r => builder.setRole(r) }
    +      builder.build()
    +    }
    +  }
    +
    +  private def findPortAndSplitRange(port: Long, ranges: 
List[PortRangeResourceInfo])
    +    : (PortRangeResourceInfo, Option[PortRangeResourceInfo], Long) = {
    +    val rangePortInfo = ranges
    +      .map{p => val tmpList = List(p.range.filter(r => r._1 <= port & r._2 
>= port))
    +        PortRangeResourceInfo(p.role, tmpList.head)}.filterNot(p => 
p.range.isEmpty)
    +      .head
    +    val range = rangePortInfo.range.head
    +    val ret = {
    +      if (port == range._1 && port == range._2) {
    +        None
    +      }
    +      else if (port == range._1 && port != range._2) {
    +        Some(PortRangeResourceInfo(rangePortInfo.role, List((port + 1, 
range._2))))
    +      }
    +      else if (port == range._2 && port != range._2) {
    --- End diff --
    
    correct


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to