Github user dragos commented on a diff in the pull request: https://github.com/apache/spark/pull/8671#discussion_r39605193 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala --- @@ -202,55 +207,86 @@ private[spark] class MesosSchedulerBackend( } /** - * Method called by Mesos to offer resources on slaves. We respond by asking our active task sets - * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that - * tasks are balanced across the cluster. + * Return the usable Mesos offers and corresponding WorkerOffers. + * + * This method declines Mesos offers that don't meet minimum cpu, memory or attribute + * requirements. + * + * @param d Mesos SchedulerDriver to decline offers + * @param offers Mesos offers to be considered + * @return a pair of Mesos offers and corresponding WorkerOffer that can be used by the + * fine-grained scheduler. */ - override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { - inClassLoader() { - // Fail-fast on offers we know will be rejected - val (usableOffers, unUsableOffers) = offers.asScala.partition { o => - val mem = getResource(o.getResourcesList, "mem") - val cpus = getResource(o.getResourcesList, "cpus") - val slaveId = o.getSlaveId.getValue - val offerAttributes = toAttributeMap(o.getAttributesList) - - // check if all constraints are satisfield - // 1. Attribute constraints - // 2. Memory requirements - // 3. CPU requirements - need at least 1 for executor, 1 for task - val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) - val meetsMemoryRequirements = mem >= calculateTotalMemory(sc) - val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK) - - val meetsRequirements = - (meetsConstraints && meetsMemoryRequirements && meetsCPURequirements) || + private[spark] def usableWorkerOffers(d: SchedulerDriver, + offers: JList[Offer]): (Seq[Protos.Offer], Seq[WorkerOffer]) = { + // Fail-fast on offers we know will be rejected + val (usableOffers, unUsableOffers) = offers.asScala.partition { o => + val mem = getResource(o.getResourcesList, "mem") + val cpus = getResource(o.getResourcesList, "cpus") + val slaveId = o.getSlaveId.getValue + val offerAttributes = toAttributeMap(o.getAttributesList) + + // check if all constraints are satisfield + // 1. Attribute constraints + // 2. Memory requirements + // 3. CPU requirements - need at least 1 for executor, 1 for task + val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + val meetsMemoryRequirements = mem >= calculateTotalMemory(sc) + val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK) + + val meetsRequirements = + (meetsConstraints && meetsMemoryRequirements && meetsCPURequirements) || (slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK) - // add some debug messaging - val debugstr = if (meetsRequirements) "Accepting" else "Declining" - val id = o.getId.getValue - logDebug(s"$debugstr offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus") + // add some debug messaging + val debugstr = if (meetsRequirements) "Accepting" else "Declining" + val id = o.getId.getValue + logDebug(s"$debugstr offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus") + + meetsRequirements + } + + // Decline offers we ruled out immediately + unUsableOffers.foreach(o => d.declineOffer(o.getId)) + + var availableCores = Math.max(0, maxCores - totalCoresAcquired) - meetsRequirements + val workerOffers = (for (o <- usableOffers) yield { + val coresInOffer = getResource(o.getResourcesList, "cpus") + val cores = if (slaveIdToExecutorInfo.contains(o.getSlaveId.getValue)) { + coresInOffer.toInt + } else { + // If the Mesos executor has not been started on this slave yet, set aside a few + // cores for the Mesos executor by offering fewer cores to the Spark executor + availableCores -= mesosExecutorCores + (coresInOffer - mesosExecutorCores).toInt } - // Decline offers we ruled out immediately - unUsableOffers.foreach(o => d.declineOffer(o.getId)) + // check that we can still acquire cpus + val actualCores = Math.min(availableCores, cores).toInt --- End diff -- Ok, I'll refactor this condition.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org