Repository: spark Updated Branches: refs/heads/master 284836862 -> 193555f79
[SPARK-18935][MESOS] Fix dynamic reservations on mesos ## What changes were proposed in this pull request? - Solves the issue described in the ticket by preserving reservation and allocation info in all cases (port handling included). - upgrades to 1.4 - Adds extra debug level logging to make debugging easier in the future, for example we add reservation info when applicable. ``` 17/09/29 14:53:07 DEBUG MesosCoarseGrainedSchedulerBackend: Accepting offer: f20de49b-dee3-45dd-a3c1-73418b7de891-O32 with attributes: Map() allocation info: role: "spark-prive" reservation info: name: "ports" type: RANGES ranges { range { begin: 31000 end: 32000 } } role: "spark-prive" reservation { principal: "test" } allocation_info { role: "spark-prive" } ``` - Some style cleanup. ## How was this patch tested? Manually by running the example in the ticket with and without a principal. Specifically I tested it on a dc/os 1.10 cluster with 7 nodes and played with reservations. From the master node in order to reserve resources I executed: ```for i in 0 1 2 3 4 5 6 do curl -i \ -d slaveId=90ec65ea-1f7b-479f-a824-35d2527d6d26-S$i \ -d resources='[ { "name": "cpus", "type": "SCALAR", "scalar": { "value": 2 }, "role": "spark-role", "reservation": { "principal": "" } }, { "name": "mem", "type": "SCALAR", "scalar": { "value": 8026 }, "role": "spark-role", "reservation": { "principal": "" } } ]' \ -X POST http://master.mesos:5050/master/reserve done ``` Nodes had 4 cpus (m3.xlarge instances) and I reserved either 2 or 4 cpus (all for a role). I verified it launches tasks on nodes with reserved resources under `spark-role` role only if a) there are remaining resources for (*) default role and the spark driver has no role assigned to it. b) the spark driver has a role assigned to it and it is the same role used in reservations. I also tested this locally on my machine. Author: Stavros Kontopoulos <st.kontopou...@gmail.com> Closes #19390 from skonto/fix_dynamic_reservation. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/193555f7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/193555f7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/193555f7 Branch: refs/heads/master Commit: 193555f79cc73873613674a09a7c371688b6dbc7 Parents: 2848368 Author: Stavros Kontopoulos <st.kontopou...@gmail.com> Authored: Wed Nov 29 14:15:35 2017 -0800 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Wed Nov 29 14:15:35 2017 -0800 ---------------------------------------------------------------------- dev/deps/spark-deps-hadoop-2.6 | 2 +- dev/deps/spark-deps-hadoop-2.7 | 2 +- resource-managers/mesos/pom.xml | 2 +- .../cluster/mesos/MesosClusterScheduler.scala | 1 - .../MesosCoarseGrainedSchedulerBackend.scala | 17 +++- .../cluster/mesos/MesosSchedulerUtils.scala | 99 +++++++++++++------- 6 files changed, 80 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/193555f7/dev/deps/spark-deps-hadoop-2.6 ---------------------------------------------------------------------- diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 21c8a75..50ac6d1 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -138,7 +138,7 @@ lz4-java-1.4.0.jar machinist_2.11-0.6.1.jar macro-compat_2.11-1.1.1.jar mail-1.4.7.jar -mesos-1.3.0-shaded-protobuf.jar +mesos-1.4.0-shaded-protobuf.jar metrics-core-3.1.5.jar metrics-graphite-3.1.5.jar metrics-json-3.1.5.jar http://git-wip-us.apache.org/repos/asf/spark/blob/193555f7/dev/deps/spark-deps-hadoop-2.7 ---------------------------------------------------------------------- diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index 7173426..1b1e316 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -139,7 +139,7 @@ lz4-java-1.4.0.jar machinist_2.11-0.6.1.jar macro-compat_2.11-1.1.1.jar mail-1.4.7.jar -mesos-1.3.0-shaded-protobuf.jar +mesos-1.4.0-shaded-protobuf.jar metrics-core-3.1.5.jar metrics-graphite-3.1.5.jar metrics-json-3.1.5.jar http://git-wip-us.apache.org/repos/asf/spark/blob/193555f7/resource-managers/mesos/pom.xml ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/pom.xml b/resource-managers/mesos/pom.xml index de8f1c9..70d0c17 100644 --- a/resource-managers/mesos/pom.xml +++ b/resource-managers/mesos/pom.xml @@ -29,7 +29,7 @@ <name>Spark Project Mesos</name> <properties> <sbt.project.name>mesos</sbt.project.name> - <mesos.version>1.3.0</mesos.version> + <mesos.version>1.4.0</mesos.version> <mesos.classifier>shaded-protobuf</mesos.classifier> </properties> http://git-wip-us.apache.org/repos/asf/spark/blob/193555f7/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index c41283e..d224a73 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -36,7 +36,6 @@ import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionRes import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.Utils - /** * Tracks the current state of a Mesos Task that runs a Spark driver. * @param driverDescription Submitted driver description from http://git-wip-us.apache.org/repos/asf/spark/blob/193555f7/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index c392061..191415a 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -400,13 +400,20 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( val offerMem = getResource(offer.getResourcesList, "mem") val offerCpus = getResource(offer.getResourcesList, "cpus") val offerPorts = getRangeResource(offer.getResourcesList, "ports") + val offerReservationInfo = offer + .getResourcesList + .asScala + .find { r => r.getReservation != null } val id = offer.getId.getValue if (tasks.contains(offer.getId)) { // accept val offerTasks = tasks(offer.getId) logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + - s"mem: $offerMem cpu: $offerCpus ports: $offerPorts." + + offerReservationInfo.map(resInfo => + s"reservation info: ${resInfo.getReservation.toString}").getOrElse("") + + s"mem: $offerMem cpu: $offerCpus ports: $offerPorts " + + s"resources: ${offer.getResourcesList.asScala.mkString(",")}." + s" Launching ${offerTasks.size} Mesos tasks.") for (task <- offerTasks) { @@ -416,7 +423,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( val ports = getRangeResource(task.getResourcesList, "ports").mkString(",") logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus" + - s" ports: $ports") + s" ports: $ports" + s" on slave with slave id: ${task.getSlaveId.getValue} ") } driver.launchTasks( @@ -431,7 +438,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( } else { declineOffer( driver, - offer) + offer, + Some("Offer was declined due to unmet task launch constraints.")) } } } @@ -513,6 +521,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( totalGpusAcquired += taskGPUs gpusByTaskId(taskId) = taskGPUs } + } else { + logDebug(s"Cannot launch a task for offer with id: $offerId on slave " + + s"with id: $slaveId. Requirements were not met for this offer.") } } } http://git-wip-us.apache.org/repos/asf/spark/blob/193555f7/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 6fcb30a..e754503 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -28,7 +28,8 @@ import com.google.common.base.Splitter import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler, SchedulerDriver} import org.apache.mesos.Protos.{TaskState => MesosTaskState, _} import org.apache.mesos.Protos.FrameworkInfo.Capability -import org.apache.mesos.protobuf.{ByteString, GeneratedMessage} +import org.apache.mesos.Protos.Resource.ReservationInfo +import org.apache.mesos.protobuf.{ByteString, GeneratedMessageV3} import org.apache.spark.{SparkConf, SparkContext, SparkException} import org.apache.spark.TaskState @@ -36,8 +37,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.util.Utils - - /** * Shared trait for implementing a Mesos Scheduler. This holds common state and helper * methods and Mesos scheduler will use. @@ -46,6 +45,8 @@ trait MesosSchedulerUtils extends Logging { // Lock used to wait for scheduler to be registered private final val registerLatch = new CountDownLatch(1) + private final val ANY_ROLE = "*" + /** * Creates a new MesosSchedulerDriver that communicates to the Mesos master. * @@ -175,17 +176,36 @@ trait MesosSchedulerUtils extends Logging { registerLatch.countDown() } - def createResource(name: String, amount: Double, role: Option[String] = None): Resource = { + private def setReservationInfo( + reservationInfo: Option[ReservationInfo], + role: Option[String], + builder: Resource.Builder): Unit = { + if (!role.contains(ANY_ROLE)) { + reservationInfo.foreach { res => builder.setReservation(res) } + } + } + + def createResource( + name: String, + amount: Double, + role: Option[String] = None, + reservationInfo: Option[ReservationInfo] = None): Resource = { val builder = Resource.newBuilder() .setName(name) .setType(Value.Type.SCALAR) .setScalar(Value.Scalar.newBuilder().setValue(amount).build()) - role.foreach { r => builder.setRole(r) } - + setReservationInfo(reservationInfo, role, builder) builder.build() } + private def getReservation(resource: Resource): Option[ReservationInfo] = { + if (resource.hasReservation) { + Some(resource.getReservation) + } else { + None + } + } /** * Partition the existing set of resources into two groups, those remaining to be * scheduled and those requested to be used for a new task. @@ -203,14 +223,17 @@ trait MesosSchedulerUtils extends Logging { var requestedResources = new ArrayBuffer[Resource] val remainingResources = resources.asScala.map { case r => + val reservation = getReservation(r) if (remain > 0 && r.getType == Value.Type.SCALAR && r.getScalar.getValue > 0.0 && r.getName == resourceName) { val usage = Math.min(remain, r.getScalar.getValue) - requestedResources += createResource(resourceName, usage, Some(r.getRole)) + requestedResources += createResource(resourceName, usage, + Option(r.getRole), reservation) remain -= usage - createResource(resourceName, r.getScalar.getValue - usage, Some(r.getRole)) + createResource(resourceName, r.getScalar.getValue - usage, + Option(r.getRole), reservation) } else { r } @@ -228,16 +251,6 @@ trait MesosSchedulerUtils extends Logging { (attr.getName, attr.getText.getValue.split(',').toSet) } - - /** Build a Mesos resource protobuf object */ - protected def createResource(resourceName: String, quantity: Double): Protos.Resource = { - Resource.newBuilder() - .setName(resourceName) - .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(quantity).build()) - .build() - } - /** * Converts the attributes from the resource offer into a Map of name to Attribute Value * The attribute values are the mesos attribute types and they are @@ -245,7 +258,8 @@ trait MesosSchedulerUtils extends Logging { * @param offerAttributes the attributes offered * @return */ - protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = { + protected def toAttributeMap(offerAttributes: JList[Attribute]) + : Map[String, GeneratedMessageV3] = { offerAttributes.asScala.map { attr => val attrValue = attr.getType match { case Value.Type.SCALAR => attr.getScalar @@ -266,7 +280,7 @@ trait MesosSchedulerUtils extends Logging { */ def matchesAttributeRequirements( slaveOfferConstraints: Map[String, Set[String]], - offerAttributes: Map[String, GeneratedMessage]): Boolean = { + offerAttributes: Map[String, GeneratedMessageV3]): Boolean = { slaveOfferConstraints.forall { // offer has the required attribute and subsumes the required values for that attribute case (name, requiredValues) => @@ -427,10 +441,10 @@ trait MesosSchedulerUtils extends Logging { // partition port offers val (resourcesWithoutPorts, portResources) = filterPortResources(offeredResources) - val portsAndRoles = requestedPorts. - map(x => (x, findPortAndGetAssignedRangeRole(x, portResources))) + val portsAndResourceInfo = requestedPorts. + map { x => (x, findPortAndGetAssignedResourceInfo(x, portResources)) } - val assignedPortResources = createResourcesFromPorts(portsAndRoles) + val assignedPortResources = createResourcesFromPorts(portsAndResourceInfo) // ignore non-assigned port resources, they will be declined implicitly by mesos // no need for splitting port resources. @@ -450,16 +464,25 @@ trait MesosSchedulerUtils extends Logging { managedPortNames.map(conf.getLong(_, 0)).filter( _ != 0) } + private case class RoleResourceInfo( + role: String, + resInfo: Option[ReservationInfo]) + /** Creates a mesos resource for a specific port number. */ - private def createResourcesFromPorts(portsAndRoles: List[(Long, String)]) : List[Resource] = { - portsAndRoles.flatMap{ case (port, role) => - createMesosPortResource(List((port, port)), Some(role))} + private def createResourcesFromPorts( + portsAndResourcesInfo: List[(Long, RoleResourceInfo)]) + : List[Resource] = { + portsAndResourcesInfo.flatMap { case (port, rInfo) => + createMesosPortResource(List((port, port)), Option(rInfo.role), rInfo.resInfo)} } /** Helper to create mesos resources for specific port ranges. */ private def createMesosPortResource( ranges: List[(Long, Long)], - role: Option[String] = None): List[Resource] = { + role: Option[String] = None, + reservationInfo: Option[ReservationInfo] = None): List[Resource] = { + // for ranges we are going to use (user defined ports fall in there) create mesos resources + // for each range there is a role associated with it. ranges.map { case (rangeStart, rangeEnd) => val rangeValue = Value.Range.newBuilder() .setBegin(rangeStart) @@ -468,7 +491,8 @@ trait MesosSchedulerUtils extends Logging { .setName("ports") .setType(Value.Type.RANGES) .setRanges(Value.Ranges.newBuilder().addRange(rangeValue)) - role.foreach(r => builder.setRole(r)) + role.foreach { r => builder.setRole(r) } + setReservationInfo(reservationInfo, role, builder) builder.build() } } @@ -477,19 +501,21 @@ trait MesosSchedulerUtils extends Logging { * Helper to assign a port to an offered range and get the latter's role * info to use it later on. */ - private def findPortAndGetAssignedRangeRole(port: Long, portResources: List[Resource]) - : String = { + private def findPortAndGetAssignedResourceInfo(port: Long, portResources: List[Resource]) + : RoleResourceInfo = { val ranges = portResources. - map(resource => - (resource.getRole, resource.getRanges.getRangeList.asScala - .map(r => (r.getBegin, r.getEnd)).toList)) + map { resource => + val reservation = getReservation(resource) + (RoleResourceInfo(resource.getRole, reservation), + resource.getRanges.getRangeList.asScala.map(r => (r.getBegin, r.getEnd)).toList) + } - val rangePortRole = ranges - .find { case (role, rangeList) => rangeList + val rangePortResourceInfo = ranges + .find { case (resourceInfo, rangeList) => rangeList .exists{ case (rangeStart, rangeEnd) => rangeStart <= port & rangeEnd >= port}} // this is safe since we have previously checked about the ranges (see checkPorts method) - rangePortRole.map{ case (role, rangeList) => role}.get + rangePortResourceInfo.map{ case (resourceInfo, rangeList) => resourceInfo}.get } /** Retrieves the port resources from a list of mesos offered resources */ @@ -564,3 +590,4 @@ trait MesosSchedulerUtils extends Logging { } } } + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org