Repository: spark Updated Branches: refs/heads/master bf5987cbe -> ca3864d6e
[SPARK-19373][MESOS] Base spark.scheduler.minRegisteredResourceRatio on registered cores rather than accepted cores ## What changes were proposed in this pull request? See JIRA ## How was this patch tested? Unit tests, Mesos/Spark integration tests cc skonto susanxhuynh Author: Michael Gummelt <mgumm...@mesosphere.io> Closes #17045 from mgummelt/SPARK-19373-registered-resources. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca3864d6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca3864d6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca3864d6 Branch: refs/heads/master Commit: ca3864d6e090ca3e68a2ef0cf527e6e00c8c4f64 Parents: bf5987c Author: Michael Gummelt <mgumm...@mesosphere.io> Authored: Wed Mar 1 00:10:55 2017 +0100 Committer: Sean Owen <so...@cloudera.com> Committed: Wed Mar 1 00:10:55 2017 +0100 ---------------------------------------------------------------------- .../MesosCoarseGrainedSchedulerBackend.scala | 27 +++-- ...esosCoarseGrainedSchedulerBackendSuite.scala | 111 +++++++++---------- 2 files changed, 70 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ca3864d6/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index f555072..f69c223 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -54,14 +54,17 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( with org.apache.mesos.Scheduler with MesosSchedulerUtils { - val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures + // Blacklist a slave after this many failures + private val MAX_SLAVE_FAILURES = 2 - // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) - val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt + private val maxCoresOption = conf.getOption("spark.cores.max").map(_.toInt) - val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false) + // Maximum number of cores to acquire + private val maxCores = maxCoresOption.getOrElse(Int.MaxValue) - val maxGpus = conf.getInt("spark.mesos.gpus.max", 0) + private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false) + + private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0) private[this] val shutdownTimeoutMS = conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s") @@ -75,10 +78,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) // Cores we have acquired with each Mesos task ID - val coresByTaskId = new mutable.HashMap[String, Int] - val gpusByTaskId = new mutable.HashMap[String, Int] - var totalCoresAcquired = 0 - var totalGpusAcquired = 0 + private val coresByTaskId = new mutable.HashMap[String, Int] + private val gpusByTaskId = new mutable.HashMap[String, Int] + private var totalCoresAcquired = 0 + private var totalGpusAcquired = 0 // SlaveID -> Slave // This map accumulates entries for the duration of the job. Slaves are never deleted, because @@ -108,7 +111,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( // may lead to deadlocks since the superclass might also try to lock private val stateLock = new ReentrantLock - val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0) + private val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0) // Offer constraints private val slaveOfferConstraints = @@ -139,7 +142,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( securityManager.isAuthenticationEnabled()) } - var nextMesosTaskId = 0 + private var nextMesosTaskId = 0 @volatile var appId: String = _ @@ -256,7 +259,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( } override def sufficientResourcesRegistered(): Boolean = { - totalCoresAcquired >= maxCores * minRegisteredRatio + totalCoreCount.get >= maxCoresOption.getOrElse(0) * minRegisteredRatio } override def disconnected(d: org.apache.mesos.SchedulerDriver) {} http://git-wip-us.apache.org/repos/asf/spark/blob/ca3864d6/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index cdb3b68..78346e9 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -20,9 +20,7 @@ package org.apache.spark.scheduler.cluster.mesos import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ -import scala.concurrent.Promise import scala.reflect.ClassTag import org.apache.mesos.{Protos, Scheduler, SchedulerDriver} @@ -37,8 +35,8 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.internal.config._ import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient -import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor +import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RemoveExecutor} import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.mesos.Utils._ @@ -304,25 +302,29 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite } test("weburi is set in created scheduler driver") { - setBackend() + initializeSparkConf() + sc = new SparkContext(sparkConf) + val taskScheduler = mock[TaskSchedulerImpl] when(taskScheduler.sc).thenReturn(sc) + val driver = mock[SchedulerDriver] when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) + val securityManager = mock[SecurityManager] val backend = new MesosCoarseGrainedSchedulerBackend( - taskScheduler, sc, "master", securityManager) { + taskScheduler, sc, "master", securityManager) { override protected def createSchedulerDriver( - masterUrl: String, - scheduler: Scheduler, - sparkUser: String, - appName: String, - conf: SparkConf, - webuiUrl: Option[String] = None, - checkpoint: Option[Boolean] = None, - failoverTimeout: Option[Double] = None, - frameworkId: Option[String] = None): SchedulerDriver = { + masterUrl: String, + scheduler: Scheduler, + sparkUser: String, + appName: String, + conf: SparkConf, + webuiUrl: Option[String] = None, + checkpoint: Option[Boolean] = None, + failoverTimeout: Option[Double] = None, + frameworkId: Option[String] = None): SchedulerDriver = { markRegistered() assert(webuiUrl.isDefined) assert(webuiUrl.get.equals("http://webui")) @@ -419,37 +421,11 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(!dockerInfo.getForcePullImage) } - test("Do not call removeExecutor() after backend is stopped") { - setBackend() - - // launches a task on a valid offer - val offers = List(Resources(backend.executorMemory(sc), 1)) - offerResources(offers) - verifyTaskLaunched(driver, "o1") - - // launches a thread simulating status update - val statusUpdateThread = new Thread { - override def run(): Unit = { - while (!stopCalled) { - Thread.sleep(100) - } - - val status = createTaskStatus("0", "s1", TaskState.TASK_FINISHED) - backend.statusUpdate(driver, status) - } - }.start - - backend.stop() - // Any method of the backend involving sending messages to the driver endpoint should not - // be called after the backend is stopped. - verify(driverEndpoint, never()).askSync(isA(classOf[RemoveExecutor]))(any[ClassTag[_]]) - } - test("mesos supports spark.executor.uri") { val url = "spark.spark.spark.com" setBackend(Map( "spark.executor.uri" -> url - ), false) + ), null) val (mem, cpu) = (backend.executorMemory(sc), 4) @@ -465,7 +441,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite setBackend(Map( "spark.mesos.fetcherCache.enable" -> "true", "spark.executor.uri" -> url - ), false) + ), null) val offers = List(Resources(backend.executorMemory(sc), 1)) offerResources(offers) val launchedTasks = verifyTaskLaunched(driver, "o1") @@ -479,7 +455,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite setBackend(Map( "spark.mesos.fetcherCache.enable" -> "false", "spark.executor.uri" -> url - ), false) + ), null) val offers = List(Resources(backend.executorMemory(sc), 1)) offerResources(offers) val launchedTasks = verifyTaskLaunched(driver, "o1") @@ -504,8 +480,31 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(networkInfos.get(0).getName == "test-network-name") } + test("supports spark.scheduler.minRegisteredResourcesRatio") { + val expectedCores = 1 + setBackend(Map( + "spark.cores.max" -> expectedCores.toString, + "spark.scheduler.minRegisteredResourcesRatio" -> "1.0")) + + val offers = List(Resources(backend.executorMemory(sc), expectedCores)) + offerResources(offers) + val launchedTasks = verifyTaskLaunched(driver, "o1") + assert(!backend.isReady) + + registerMockExecutor(launchedTasks(0).getTaskId.getValue, "s1", expectedCores) + assert(backend.isReady) + } + private case class Resources(mem: Int, cpus: Int, gpus: Int = 0) + private def registerMockExecutor(executorId: String, slaveId: String, cores: Integer) = { + val mockEndpointRef = mock[RpcEndpointRef] + val mockAddress = mock[RpcAddress] + val message = RegisterExecutor(executorId, mockEndpointRef, slaveId, cores, Map.empty) + + backend.driverEndpoint.askSync[Boolean](message) + } + private def verifyDeclinedOffer(driver: SchedulerDriver, offerId: OfferID, filter: Boolean = false): Unit = { @@ -534,8 +533,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite private def createSchedulerBackend( taskScheduler: TaskSchedulerImpl, driver: SchedulerDriver, - shuffleClient: MesosExternalShuffleClient, - endpoint: RpcEndpointRef): MesosCoarseGrainedSchedulerBackend = { + shuffleClient: MesosExternalShuffleClient) = { val securityManager = mock[SecurityManager] val backend = new MesosCoarseGrainedSchedulerBackend( @@ -553,9 +551,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite override protected def getShuffleClient(): MesosExternalShuffleClient = shuffleClient - override protected def createDriverEndpointRef( - properties: ArrayBuffer[(String, String)]): RpcEndpointRef = endpoint - // override to avoid race condition with the driver thread on `mesosDriver` override def startScheduler(newDriver: SchedulerDriver): Unit = { mesosDriver = newDriver @@ -571,31 +566,35 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite backend } - private def setBackend(sparkConfVars: Map[String, String] = null, - setHome: Boolean = true) { + private def initializeSparkConf( + sparkConfVars: Map[String, String] = null, + home: String = "/path"): Unit = { sparkConf = (new SparkConf) .setMaster("local[*]") .setAppName("test-mesos-dynamic-alloc") .set("spark.mesos.driver.webui.url", "http://webui") - if (setHome) { - sparkConf.setSparkHome("/path") + if (home != null) { + sparkConf.setSparkHome(home) } if (sparkConfVars != null) { sparkConf.setAll(sparkConfVars) } + } + private def setBackend(sparkConfVars: Map[String, String] = null, home: String = "/path") { + initializeSparkConf(sparkConfVars, home) sc = new SparkContext(sparkConf) driver = mock[SchedulerDriver] when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) + taskScheduler = mock[TaskSchedulerImpl] when(taskScheduler.sc).thenReturn(sc) + externalShuffleClient = mock[MesosExternalShuffleClient] - driverEndpoint = mock[RpcEndpointRef] - when(driverEndpoint.ask(any())(any())).thenReturn(Promise().future) - backend = createSchedulerBackend(taskScheduler, driver, externalShuffleClient, driverEndpoint) + backend = createSchedulerBackend(taskScheduler, driver, externalShuffleClient) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org