Github user tnachen commented on a diff in the pull request: https://github.com/apache/spark/pull/11157#discussion_r60255016 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -353,4 +371,247 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param sc the Spark Context + * @param ports the list of ports to check + * @param takenPorts ports already used for that slave + * @return true if ports are within range false otherwise + */ + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)], + takenPorts: List[Long] = List()): Boolean = { + + def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) + } + + val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0), + sc.conf.getInt("spark.blockManager.port", 0)) + val nonZeroPorts = portsToCheck.filter(_ != 0) + + // If we require a port that is taken we have to decline the offer since mesos + // shares all port ranges on the slave + val contained = for {port <- nonZeroPorts} + yield { + takenPorts.contains(port) + } + + if (contained.contains(true)) { + return false + } + + val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) + + // make sure we have enough ports to allocate per offer + ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } + + /** + * Partitions port resources. + * + * @param conf the spark config + * @param ports the ports offered + * @return resources left, port resources to be used and the list of assigned ports + */ + def partitionPorts( + conf: SparkConf, + ports: List[Resource]): (List[Resource], List[Resource], List[Long]) = { + + val taskPortRanges = getRangeResourceWithRoleInfo(ports.asJava, "ports") + + val portsToCheck = List(conf.getInt("spark.executor.port", 0).toLong, + conf.getInt("spark.blockManager.port", 0).toLong) + + val nonZeroPorts = portsToCheck.filter(_ != 0) + + // reserve non zero ports first + + val nonZeroResources = reservePorts(taskPortRanges, nonZeroPorts) + + // reserve actual port numbers for zero ports - not set by the user + + val numOfZeroPorts = portsToCheck.count(_ == 0) + + val randPorts = pickRandomPortsFromRanges(nonZeroResources._1, numOfZeroPorts) --- End diff -- What's the intention to pick random ports? Why not just pick the first few ones?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org