This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new cbad616 [SPARK-27371][CORE] Support GPU-aware resources scheduling in Standalone cbad616 is described below commit cbad616d4cb0c58993a88df14b5e30778c7f7e85 Author: wuyi <ngone_5...@163.com> AuthorDate: Fri Aug 9 07:49:03 2019 -0500 [SPARK-27371][CORE] Support GPU-aware resources scheduling in Standalone ## What changes were proposed in this pull request? In this PR, we implements a complete process of GPU-aware resources scheduling in Standalone. The whole process looks like: Worker sets up isolated resources when it starts up and registers to master along with its resources. And, Master picks up usable workers according to driver/executor's resource requirements to launch driver/executor on them. Then, Worker launches the driver/executor after preparing resources file, which is created under driver/executor's working directory, with specified resource addresses(told by master). When driver/executor finished, their resources could be recycled to worker. Finally, if a worker stops, it should always release its resources firstly. For the case of Workers and Drivers in **client** mode run on the same host, we introduce a config option named `spark.resources.coordinate.enable`(default true) to indicate whether Spark should coordinate resources for user. If `spark.resources.coordinate.enable=false`, user should be responsible for configuring different resources for Workers and Drivers when use resourcesFile or discovery script. If true, Spark would help user to assign different resources for Workers and Drivers. The solution for Spark to coordinate resources among Workers and Drivers is: Generally, use a shared file named *____allocated_resources____.json* to sync allocated resources info among Workers and Drivers on the same host. After a Worker or Driver found all resources using the configured resourcesFile and/or discovery script during launching, it should filter out available resources by excluding resources already allocated in *____allocated_resources____.json* and acquire resources from available resources according to its own requirement. After that, it should write its allocated resources along with its process id (pid) into *____allocated_resources____.json*. Pid (proposed by tgravescs) here used to check whether the allocated resources are still valid in case of Worker or Driver cras [...] Note that we'll always get a file lock before any access to file *____allocated_resources____.json* and release the lock finally. Futhermore, we appended resources info in `WorkerSchedulerStateResponse` to work around master change behaviour in HA mode. ## How was this patch tested? Added unit tests in WorkerSuite, MasterSuite, SparkContextSuite. Manually tested with client/cluster mode (e.g. multiple workers) in a single node Standalone. Closes #25047 from Ngone51/SPARK-27371. Authored-by: wuyi <ngone_5...@163.com> Signed-off-by: Thomas Graves <tgra...@apache.org> --- .gitignore | 1 + .../main/scala/org/apache/spark/SparkContext.scala | 34 +- .../spark/deploy/ApplicationDescription.scala | 5 +- .../scala/org/apache/spark/deploy/Client.scala | 7 +- .../org/apache/spark/deploy/DeployMessage.scala | 28 +- .../apache/spark/deploy/DriverDescription.scala | 5 +- .../apache/spark/deploy/LocalSparkCluster.scala | 3 +- .../spark/deploy/StandaloneResourceUtils.scala | 348 +++++++++++++++++++++ .../spark/deploy/master/ApplicationInfo.scala | 5 +- .../apache/spark/deploy/master/DriverInfo.scala | 8 + .../apache/spark/deploy/master/ExecutorDesc.scala | 6 +- .../org/apache/spark/deploy/master/Master.scala | 108 +++++-- .../apache/spark/deploy/master/WorkerInfo.scala | 58 +++- .../spark/deploy/rest/StandaloneRestServer.scala | 6 +- .../apache/spark/deploy/worker/DriverRunner.scala | 14 +- .../spark/deploy/worker/ExecutorRunner.scala | 11 +- .../org/apache/spark/deploy/worker/Worker.scala | 77 +++-- .../executor/CoarseGrainedExecutorBackend.scala | 4 +- .../org/apache/spark/internal/config/package.scala | 17 + .../ResourceAllocator.scala} | 17 +- .../org/apache/spark/resource/ResourceUtils.scala | 42 ++- .../spark/scheduler/ExecutorResourceInfo.scala | 77 +---- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 7 +- .../cluster/StandaloneSchedulerBackend.scala | 6 +- .../main/scala/org/apache/spark/util/Utils.scala | 42 +++ .../scala/org/apache/spark/SparkConfSuite.scala | 6 +- .../scala/org/apache/spark/SparkContextSuite.scala | 15 +- .../org/apache/spark/deploy/DeployTestUtils.scala | 4 +- .../apache/spark/deploy/master/MasterSuite.scala | 159 +++++++--- .../deploy/master/PersistenceEngineSuite.scala | 3 +- .../apache/spark/deploy/worker/WorkerSuite.scala | 170 +++++++++- .../CoarseGrainedExecutorBackendSuite.scala | 2 +- .../apache/spark/resource/ResourceUtilsSuite.scala | 2 +- .../apache/spark/resource/TestResourceIDs.scala | 4 + docs/configuration.md | 23 +- docs/spark-standalone.md | 31 ++ python/pyspark/tests/test_context.py | 3 +- python/pyspark/tests/test_taskcontext.py | 8 +- 38 files changed, 1127 insertions(+), 239 deletions(-) diff --git a/.gitignore b/.gitignore index 4b1ba1c..ae20c85 100644 --- a/.gitignore +++ b/.gitignore @@ -71,6 +71,7 @@ scalastyle-on-compile.generated.xml scalastyle-output.xml scalastyle.txt spark-*-bin-*.tgz +spark-resources/ spark-tests.log src_managed/ streaming-tests.log diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index aa71b21..396d712 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor import org.apache.spark.annotation.DeveloperApi import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} +import org.apache.spark.deploy.StandaloneResourceUtils._ import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat} import org.apache.spark.internal.Logging @@ -245,6 +246,15 @@ class SparkContext(config: SparkConf) extends Logging { def isLocal: Boolean = Utils.isLocalMaster(_conf) + private def isClientStandalone: Boolean = { + val isSparkCluster = master match { + case SparkMasterRegex.SPARK_REGEX(_) => true + case SparkMasterRegex.LOCAL_CLUSTER_REGEX(_, _, _) => true + case _ => false + } + deployMode == "client" && isSparkCluster + } + /** * @return true if context is stopped or in the midst of stopping. */ @@ -380,7 +390,18 @@ class SparkContext(config: SparkConf) extends Logging { _driverLogger = DriverLogger(_conf) val resourcesFileOpt = conf.get(DRIVER_RESOURCES_FILE) - _resources = getOrDiscoverAllResources(_conf, SPARK_DRIVER_PREFIX, resourcesFileOpt) + val allResources = getOrDiscoverAllResources(_conf, SPARK_DRIVER_PREFIX, resourcesFileOpt) + _resources = { + // driver submitted in client mode under Standalone may have conflicting resources with + // other drivers/workers on this host. We should sync driver's resources info into + // SPARK_RESOURCES/SPARK_RESOURCES_COORDINATE_DIR/ to avoid collision. + if (isClientStandalone) { + acquireResources(_conf, SPARK_DRIVER_PREFIX, allResources, Utils.getProcessId) + } else { + allResources + } + } + logResourceInfo(SPARK_DRIVER_PREFIX, _resources) // log out spark.app.name in the Spark driver logs logInfo(s"Submitted application: $appName") @@ -1911,8 +1932,10 @@ class SparkContext(config: SparkConf) extends Logging { ShutdownHookManager.removeShutdownHook(_shutdownHookRef) } - Utils.tryLogNonFatalError { - postApplicationEnd() + if (listenerBus != null) { + Utils.tryLogNonFatalError { + postApplicationEnd() + } } Utils.tryLogNonFatalError { _driverLogger.foreach(_.stop()) @@ -1960,6 +1983,9 @@ class SparkContext(config: SparkConf) extends Logging { Utils.tryLogNonFatalError { _progressBar.foreach(_.stop()) } + if (isClientStandalone) { + releaseResources(_conf, SPARK_DRIVER_PREFIX, _resources, Utils.getProcessId) + } _taskScheduler = null // TODO: Cache.stop()? if (_env != null) { @@ -2726,7 +2752,7 @@ object SparkContext extends Logging { // Calculate the max slots each executor can provide based on resources available on each // executor and resources required by each task. - val taskResourceRequirements = parseTaskResourceRequirements(sc.conf) + val taskResourceRequirements = parseResourceRequirements(sc.conf, SPARK_TASK_PREFIX) val executorResourcesAndAmounts = parseAllResourceRequests(sc.conf, SPARK_EXECUTOR_PREFIX) .map(request => (request.id.resourceName, request.amount)).toMap diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index c5c5c60..e11f497 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -19,6 +19,8 @@ package org.apache.spark.deploy import java.net.URI +import org.apache.spark.resource.ResourceRequirement + private[spark] case class ApplicationDescription( name: String, maxCores: Option[Int], @@ -32,7 +34,8 @@ private[spark] case class ApplicationDescription( // number of executors this application wants to start with, // only used if dynamic allocation is enabled initialExecutorLimit: Option[Int] = None, - user: String = System.getProperty("user.name", "<unknown>")) { + user: String = System.getProperty("user.name", "<unknown>"), + resourceReqsPerExecutor: Seq[ResourceRequirement] = Seq.empty) { override def toString: String = "ApplicationDescription(" + name + ")" } diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index ea7c902..648a8b1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -29,6 +29,7 @@ import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config.Network.RPC_ASK_TIMEOUT +import org.apache.spark.resource.ResourceUtils import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.util.{SparkExitCode, ThreadUtils, Utils} @@ -92,13 +93,15 @@ private class ClientEndpoint( val command = new Command(mainClass, Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions, sys.env, classPathEntries, libraryPathEntries, javaOpts) - + val driverResourceReqs = ResourceUtils.parseResourceRequirements(conf, + config.SPARK_DRIVER_PREFIX) val driverDescription = new DriverDescription( driverArgs.jarUrl, driverArgs.memory, driverArgs.cores, driverArgs.supervise, - command) + command, + driverResourceReqs) asyncSendToMasterAndForwardReply[SubmitDriverResponse]( RequestSubmitDriver(driverDescription)) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 5723b0f..3f1d1ae 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -24,6 +24,7 @@ import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo} import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.deploy.master.RecoveryState.MasterState import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner} +import org.apache.spark.resource.ResourceInformation import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} import org.apache.spark.util.Utils @@ -31,7 +32,6 @@ private[deploy] sealed trait DeployMessage extends Serializable /** Contains messages sent between Scheduler endpoint nodes. */ private[deploy] object DeployMessages { - // Worker to Master /** @@ -43,6 +43,7 @@ private[deploy] object DeployMessages { * @param memory the memory size of worker * @param workerWebUiUrl the worker Web UI address * @param masterAddress the master address used by the worker to connect + * @param resources the resources of worker */ case class RegisterWorker( id: String, @@ -52,7 +53,8 @@ private[deploy] object DeployMessages { cores: Int, memory: Int, workerWebUiUrl: String, - masterAddress: RpcAddress) + masterAddress: RpcAddress, + resources: Map[String, ResourceInformation] = Map.empty) extends DeployMessage { Utils.checkHost(host) assert (port > 0) @@ -72,8 +74,18 @@ private[deploy] object DeployMessages { exception: Option[Exception]) extends DeployMessage - case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription], - driverIds: Seq[String]) + case class WorkerExecutorStateResponse( + desc: ExecutorDescription, + resources: Map[String, ResourceInformation]) + + case class WorkerDriverStateResponse( + driverId: String, + resources: Map[String, ResourceInformation]) + + case class WorkerSchedulerStateResponse( + id: String, + execResponses: List[WorkerExecutorStateResponse], + driverResponses: Seq[WorkerDriverStateResponse]) /** * A worker will send this message to the master when it registers with the master. Then the @@ -118,10 +130,14 @@ private[deploy] object DeployMessages { execId: Int, appDesc: ApplicationDescription, cores: Int, - memory: Int) + memory: Int, + resources: Map[String, ResourceInformation] = Map.empty) extends DeployMessage - case class LaunchDriver(driverId: String, driverDesc: DriverDescription) extends DeployMessage + case class LaunchDriver( + driverId: String, + driverDesc: DriverDescription, + resources: Map[String, ResourceInformation] = Map.empty) extends DeployMessage case class KillDriver(driverId: String) extends DeployMessage diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala index 1f5626a..02c166b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala @@ -17,12 +17,15 @@ package org.apache.spark.deploy +import org.apache.spark.resource.ResourceRequirement + private[deploy] case class DriverDescription( jarUrl: String, mem: Int, cores: Int, supervise: Boolean, - command: Command) { + command: Command, + resourceReqs: Seq[ResourceRequirement] = Seq.empty) { override def toString: String = s"DriverDescription (${command.mainClass})" } diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index c1866b4..f1b58eb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -64,7 +64,8 @@ class LocalSparkCluster( /* Start the Workers */ for (workerNum <- 1 to numWorkers) { val workerEnv = Worker.startRpcEnvAndEndpoint(localHostname, 0, 0, coresPerWorker, - memoryPerWorker, masters, null, Some(workerNum), _conf) + memoryPerWorker, masters, null, Some(workerNum), _conf, + conf.get(config.Worker.SPARK_WORKER_RESOURCE_FILE)) workerRpcEnvs += workerEnv } diff --git a/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala b/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala new file mode 100644 index 0000000..b64a36f --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import java.io.{File, RandomAccessFile} +import java.nio.channels.{FileLock, OverlappingFileLockException} +import java.nio.file.Files + +import scala.collection.mutable +import scala.util.Random +import scala.util.control.NonFatal + +import org.json4s.{DefaultFormats, Extraction} +import org.json4s.jackson.JsonMethods.{compact, parse, render} + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{SPARK_RESOURCES_COORDINATE, SPARK_RESOURCES_DIR} +import org.apache.spark.resource.{ResourceAllocation, ResourceID, ResourceInformation, ResourceRequirement} +import org.apache.spark.resource.ResourceUtils.{parseResourceRequirements, withResourcesJson} +import org.apache.spark.util.Utils + +private[spark] object StandaloneResourceUtils extends Logging { + // These directory/files are used to coordinate the resources between + // the drivers/workers on the host in Spark Standalone. + val SPARK_RESOURCES_COORDINATE_DIR = "spark-resources" + val ALLOCATED_RESOURCES_FILE = "__allocated_resources__.json" + val RESOURCES_LOCK_FILE = "__allocated_resources__.lock" + + /** + * Resource allocation used in Standalone only, which tracks assignments with + * worker/driver(client only) pid. + */ + case class StandaloneResourceAllocation(pid: Int, allocations: Seq[ResourceAllocation]) { + // convert allocations to a resource information map + def toResourceInformationMap: Map[String, ResourceInformation] = { + allocations.map { allocation => + allocation.id.resourceName -> allocation.toResourceInformation + }.toMap + } + } + + /** + * Assigns (if coordinate needed) resources to workers/drivers from the same host to avoid + * address conflict. + * + * This function works in three steps. First, acquiring the lock on RESOURCES_LOCK_FILE + * to achieve synchronization among workers and drivers. Second, getting all allocated + * resources from ALLOCATED_RESOURCES_FILE and assigning isolated resources to the worker + * or driver after differentiating available resources in discovered resources from + * allocated resources. If available resources don't meet worker's or driver's requirement, + * try to update allocated resources by excluding the resource allocation if its related + * process has already terminated and do the assignment again. If still don't meet requirement, + * exception should be thrown. Third, updating ALLOCATED_RESOURCES_FILE with new allocated + * resources along with pid for the worker or driver. Then, return allocated resources + * information after releasing the lock. + * + * @param conf SparkConf + * @param componentName spark.driver / spark.worker + * @param resources the resources found by worker/driver on the host + * @param pid the process id of worker/driver to acquire resources. + * @return allocated resources for the worker/driver or throws exception if can't + * meet worker/driver's requirement + */ + def acquireResources( + conf: SparkConf, + componentName: String, + resources: Map[String, ResourceInformation], + pid: Int) + : Map[String, ResourceInformation] = { + if (!needCoordinate(conf)) { + return resources + } + val resourceRequirements = parseResourceRequirements(conf, componentName) + if (resourceRequirements.isEmpty) { + return Map.empty + } + val lock = acquireLock(conf) + try { + val resourcesFile = new File(getOrCreateResourcesDir(conf), ALLOCATED_RESOURCES_FILE) + // all allocated resources in ALLOCATED_RESOURCES_FILE, can be updated if any allocations' + // related processes detected to be terminated while checking pids below. + var origAllocation = Seq.empty[StandaloneResourceAllocation] + // Map[pid -> Map[resourceName -> Addresses[]]] + var allocated = { + if (resourcesFile.exists()) { + origAllocation = allocatedStandaloneResources(resourcesFile.getPath) + val allocations = origAllocation.map { resource => + val resourceMap = { + resource.allocations.map { allocation => + allocation.id.resourceName -> allocation.addresses.toArray + }.toMap + } + resource.pid -> resourceMap + }.toMap + allocations + } else { + Map.empty[Int, Map[String, Array[String]]] + } + } + + // new allocated resources for worker or driver, + // map from resource name to its allocated addresses. + var newAssignments: Map[String, Array[String]] = null + // Whether we've checked process status and we'll only do the check at most once. + // Do the check iff the available resources can't meet the requirements at the first time. + var checked = false + // Whether we need to keep allocating for the worker/driver and we'll only go through + // the loop at most twice. + var keepAllocating = true + while (keepAllocating) { + keepAllocating = false + // store the pid whose related allocated resources conflict with + // discovered resources passed in. + val pidsToCheck = mutable.Set[Int]() + newAssignments = resourceRequirements.map { req => + val rName = req.resourceName + val amount = req.amount + // initially, we must have available.length >= amount as we've done pre-check previously + var available = resources(rName).addresses + // gets available resource addresses by excluding all + // allocated resource addresses from discovered resources + allocated.foreach { a => + val thePid = a._1 + val resourceMap = a._2 + val assigned = resourceMap.getOrElse(rName, Array.empty) + val retained = available.diff(assigned) + // if len(retained) < len(available) after differ to assigned, then, there must be + // some conflicting resources addresses between available and assigned. So, we should + // store its pid here to check whether it's alive in case we don't find enough + // resources after traversal all allocated resources. + if (retained.length < available.length && !checked) { + pidsToCheck += thePid + } + if (retained.length >= amount) { + available = retained + } else if (checked) { + keepAllocating = false + throw new SparkException(s"No more resources available since they've already" + + s" assigned to other workers/drivers.") + } else { + keepAllocating = true + } + } + val assigned = { + if (keepAllocating) { // can't meet the requirement + // excludes the allocation whose related process has already been terminated. + val (invalid, valid) = allocated.partition { a => + pidsToCheck(a._1) && !(Utils.isTesting || Utils.isProcessRunning(a._1))} + allocated = valid + origAllocation = origAllocation.filter( + allocation => !invalid.contains(allocation.pid)) + checked = true + // note this is a meaningless return value, just to avoid creating any new object + available + } else { + available.take(amount) + } + } + rName -> assigned + }.toMap + } + val newAllocation = { + val allocations = newAssignments.map { case (rName, addresses) => + ResourceAllocation(ResourceID(componentName, rName), addresses) + }.toSeq + StandaloneResourceAllocation(pid, allocations) + } + writeResourceAllocationJson( + componentName, origAllocation ++ Seq(newAllocation), resourcesFile) + newAllocation.toResourceInformationMap + } finally { + releaseLock(lock) + } + } + + /** + * Frees (if coordinate needed) all the resources a worker/driver (pid) has in one shot + * to make those resources be available for other workers/drivers on the same host. + * @param conf SparkConf + * @param componentName spark.driver / spark.worker + * @param toRelease the resources expected to release + * @param pid the process id of worker/driver to release resources. + */ + def releaseResources( + conf: SparkConf, + componentName: String, + toRelease: Map[String, ResourceInformation], + pid: Int) + : Unit = { + if (!needCoordinate(conf)) { + return + } + if (toRelease != null && toRelease.nonEmpty) { + val lock = acquireLock(conf) + try { + val resourcesFile = new File(getOrCreateResourcesDir(conf), ALLOCATED_RESOURCES_FILE) + if (resourcesFile.exists()) { + val (target, others) = + allocatedStandaloneResources(resourcesFile.getPath).partition(_.pid == pid) + if (target.nonEmpty) { + if (others.isEmpty) { + if (!resourcesFile.delete()) { + logError(s"Failed to delete $ALLOCATED_RESOURCES_FILE.") + } + } else { + writeResourceAllocationJson(componentName, others, resourcesFile) + } + logDebug(s"$componentName(pid=$pid) released resources: ${toRelease.mkString("\n")}") + } else { + logWarning(s"$componentName(pid=$pid) has already released its resources.") + } + } + } finally { + releaseLock(lock) + } + } + } + + private def acquireLock(conf: SparkConf): FileLock = { + val resourcesDir = getOrCreateResourcesDir(conf) + val lockFile = new File(resourcesDir, RESOURCES_LOCK_FILE) + val lockFileChannel = new RandomAccessFile(lockFile, "rw").getChannel + var keepTry = true + var lock: FileLock = null + while (keepTry) { + try { + lock = lockFileChannel.lock() + logInfo(s"Acquired lock on $RESOURCES_LOCK_FILE.") + keepTry = false + } catch { + case e: OverlappingFileLockException => + // This exception throws when we're in LocalSparkCluster mode. FileLock is designed + // to be used across JVMs, but our LocalSparkCluster is designed to launch multiple + // workers in the same JVM. As a result, when an worker in LocalSparkCluster try to + // acquire the lock on `resources.lock` which already locked by other worker, we'll + // hit this exception. So, we should manually control it. + keepTry = true + // there may be multiple workers race for the lock, + // so, sleep for a random time to avoid possible conflict + val duration = Random.nextInt(1000) + 1000 + Thread.sleep(duration) + } + } + assert(lock != null, s"Acquired null lock on $RESOURCES_LOCK_FILE.") + lock + } + + private def releaseLock(lock: FileLock): Unit = { + try { + lock.release() + lock.channel().close() + logInfo(s"Released lock on $RESOURCES_LOCK_FILE.") + } catch { + case e: Exception => + logError(s"Error while releasing lock on $RESOURCES_LOCK_FILE.", e) + } + } + + private def getOrCreateResourcesDir(conf: SparkConf): File = { + val coordinateDir = new File(conf.get(SPARK_RESOURCES_DIR).getOrElse { + val sparkHome = if (Utils.isTesting) { + assert(sys.props.contains("spark.test.home") || + sys.env.contains("SPARK_HOME"), "spark.test.home or SPARK_HOME is not set.") + sys.props.getOrElse("spark.test.home", sys.env("SPARK_HOME")) + } else { + sys.env.getOrElse("SPARK_HOME", ".") + } + sparkHome + }) + val resourceDir = new File(coordinateDir, SPARK_RESOURCES_COORDINATE_DIR) + if (!resourceDir.exists()) { + Utils.createDirectory(resourceDir) + } + resourceDir + } + + private def allocatedStandaloneResources(resourcesFile: String) + : Seq[StandaloneResourceAllocation] = { + withResourcesJson[StandaloneResourceAllocation](resourcesFile) { json => + implicit val formats = DefaultFormats + parse(json).extract[Seq[StandaloneResourceAllocation]] + } + } + + /** + * Save the allocated resources of driver(cluster only) or executor into a JSON formatted + * resources file. Used in Standalone only. + * @param componentName spark.driver / spark.executor + * @param resources allocated resources for driver(cluster only) or executor + * @param dir the target directory used to place the resources file + * @return None if resources is empty or Some(file) which represents the resources file + */ + def prepareResourcesFile( + componentName: String, + resources: Map[String, ResourceInformation], + dir: File): Option[File] = { + if (resources.isEmpty) { + return None + } + + val compShortName = componentName.substring(componentName.lastIndexOf(".") + 1) + val tmpFile = Utils.tempFileWith(dir) + val allocations = resources.map { case (rName, rInfo) => + ResourceAllocation(ResourceID(componentName, rName), rInfo.addresses) + }.toSeq + try { + writeResourceAllocationJson(componentName, allocations, tmpFile) + } catch { + case NonFatal(e) => + val errMsg = s"Exception threw while preparing resource file for $compShortName" + logError(errMsg, e) + throw new SparkException(errMsg, e) + } + val resourcesFile = File.createTempFile(s"resource-$compShortName-", ".json", dir) + tmpFile.renameTo(resourcesFile) + Some(resourcesFile) + } + + private def writeResourceAllocationJson[T]( + componentName: String, + allocations: Seq[T], + jsonFile: File): Unit = { + implicit val formats = DefaultFormats + val allocationJson = Extraction.decompose(allocations) + Files.write(jsonFile.toPath, compact(render(allocationJson)).getBytes()) + } + + /** Whether needs to coordinate resources among workers and drivers for user */ + def needCoordinate(conf: SparkConf): Boolean = { + conf.get(SPARK_RESOURCES_COORDINATE) + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 53564d0..6c56807 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -23,6 +23,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.spark.deploy.ApplicationDescription +import org.apache.spark.resource.ResourceInformation import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.util.Utils @@ -82,8 +83,10 @@ private[spark] class ApplicationInfo( private[master] def addExecutor( worker: WorkerInfo, cores: Int, + resources: Map[String, ResourceInformation], useID: Option[Int] = None): ExecutorDesc = { - val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerExecutorMB) + val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, + desc.memoryPerExecutorMB, resources) executors(exec.id) = exec coresGranted += cores exec diff --git a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala index 8d5edae..bf68ba8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala @@ -20,6 +20,7 @@ package org.apache.spark.deploy.master import java.util.Date import org.apache.spark.deploy.DriverDescription +import org.apache.spark.resource.ResourceInformation import org.apache.spark.util.Utils private[deploy] class DriverInfo( @@ -34,6 +35,9 @@ private[deploy] class DriverInfo( @transient var exception: Option[Exception] = None /* Most recent worker assigned to this driver */ @transient var worker: Option[WorkerInfo] = None + // resources(e.f. gpu/fpga) allocated to this driver + // map from resource name to ResourceInformation + private var _resources: Map[String, ResourceInformation] = _ init() @@ -47,4 +51,8 @@ private[deploy] class DriverInfo( worker = None exception = None } + + def withResources(r: Map[String, ResourceInformation]): Unit = _resources = r + + def resources: Map[String, ResourceInformation] = _resources } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala index fc62b09..a8f8492 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala @@ -18,13 +18,17 @@ package org.apache.spark.deploy.master import org.apache.spark.deploy.{ExecutorDescription, ExecutorState} +import org.apache.spark.resource.ResourceInformation private[master] class ExecutorDesc( val id: Int, val application: ApplicationInfo, val worker: WorkerInfo, val cores: Int, - val memory: Int) { + val memory: Int, + // resources(e.f. gpu/fpga) allocated to this executor + // map from resource name to ResourceInformation + val resources: Map[String, ResourceInformation]) { var state = ExecutorState.LAUNCHING diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 3c0a49e..6765519 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -25,8 +25,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.util.Random import org.apache.spark.{SecurityManager, SparkConf, SparkException} -import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, - ExecutorState, SparkHadoopUtil} +import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState, SparkHadoopUtil} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.deploy.master.MasterMessages._ @@ -38,6 +37,7 @@ import org.apache.spark.internal.config.Deploy._ import org.apache.spark.internal.config.UI._ import org.apache.spark.internal.config.Worker._ import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances} +import org.apache.spark.resource.{ResourceRequirement, ResourceUtils} import org.apache.spark.rpc._ import org.apache.spark.serializer.{JavaSerializer, Serializer} import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils} @@ -244,7 +244,8 @@ private[deploy] class Master( System.exit(0) case RegisterWorker( - id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) => + id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, + masterAddress, resources) => logInfo("Registering worker %s:%d with %d cores, %s RAM".format( workerHost, workerPort, cores, Utils.megabytesToString(memory))) if (state == RecoveryState.STANDBY) { @@ -252,8 +253,9 @@ private[deploy] class Master( } else if (idToWorker.contains(id)) { workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress, true)) } else { + val workerResources = resources.map(r => r._1 -> WorkerResourceInfo(r._1, r._2.addresses)) val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, - workerRef, workerWebUiUrl) + workerRef, workerWebUiUrl, workerResources) if (registerWorker(worker)) { persistenceEngine.addWorker(worker) workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress, false)) @@ -361,24 +363,31 @@ private[deploy] class Master( if (canCompleteRecovery) { completeRecovery() } - case WorkerSchedulerStateResponse(workerId, executors, driverIds) => + case WorkerSchedulerStateResponse(workerId, execResponses, driverResponses) => idToWorker.get(workerId) match { case Some(worker) => logInfo("Worker has been re-registered: " + workerId) worker.state = WorkerState.ALIVE - val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined) + val validExecutors = execResponses.filter( + exec => idToApp.get(exec.desc.appId).isDefined) for (exec <- validExecutors) { - val app = idToApp(exec.appId) - val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId)) + val (execDesc, execResources) = (exec.desc, exec.resources) + val app = idToApp(execDesc.appId) + val execInfo = app.addExecutor( + worker, execDesc.cores, execResources, Some(execDesc.execId)) worker.addExecutor(execInfo) - execInfo.copyState(exec) + worker.recoverResources(execResources) + execInfo.copyState(execDesc) } - for (driverId <- driverIds) { + for (driver <- driverResponses) { + val (driverId, driverResource) = (driver.driverId, driver.resources) drivers.find(_.id == driverId).foreach { driver => driver.worker = Some(worker) driver.state = DriverState.RUNNING + driver.withResources(driverResource) + worker.recoverResources(driverResource) worker.addDriver(driver) } } @@ -614,24 +623,34 @@ private[deploy] class Master( val minCoresPerExecutor = coresPerExecutor.getOrElse(1) val oneExecutorPerWorker = coresPerExecutor.isEmpty val memoryPerExecutor = app.desc.memoryPerExecutorMB + val resourceReqsPerExecutor = app.desc.resourceReqsPerExecutor val numUsable = usableWorkers.length val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) /** Return whether the specified worker can launch an executor for this app. */ - def canLaunchExecutor(pos: Int): Boolean = { + def canLaunchExecutorForApp(pos: Int): Boolean = { val keepScheduling = coresToAssign >= minCoresPerExecutor val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor + val assignedExecutorNum = assignedExecutors(pos) // If we allow multiple executors per worker, then we can always launch new executors. // Otherwise, if there is already an executor on this worker, just give it more cores. - val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0 + val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutorNum == 0 if (launchingNewExecutor) { - val assignedMemory = assignedExecutors(pos) * memoryPerExecutor + val assignedMemory = assignedExecutorNum * memoryPerExecutor val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor + val assignedResources = resourceReqsPerExecutor.map { + req => req.resourceName -> req.amount * assignedExecutorNum + }.toMap + val resourcesFree = usableWorkers(pos).resourcesFree.map { + case (rName, free) => rName -> (free - assignedResources.getOrElse(rName, 0)) + } + val enoughResources = ResourceUtils.resourcesMeetRequirements( + resourcesFree, resourceReqsPerExecutor) val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit - keepScheduling && enoughCores && enoughMemory && underLimit + keepScheduling && enoughCores && enoughMemory && enoughResources && underLimit } else { // We're adding cores to an existing executor, so no need // to check memory and executor limits @@ -641,11 +660,11 @@ private[deploy] class Master( // Keep launching executors until no more workers can accommodate any // more executors, or if we have reached this application's limits - var freeWorkers = (0 until numUsable).filter(canLaunchExecutor) + var freeWorkers = (0 until numUsable).filter(canLaunchExecutorForApp) while (freeWorkers.nonEmpty) { freeWorkers.foreach { pos => var keepScheduling = true - while (keepScheduling && canLaunchExecutor(pos)) { + while (keepScheduling && canLaunchExecutorForApp(pos)) { coresToAssign -= minCoresPerExecutor assignedCores(pos) += minCoresPerExecutor @@ -666,7 +685,7 @@ private[deploy] class Master( } } } - freeWorkers = freeWorkers.filter(canLaunchExecutor) + freeWorkers = freeWorkers.filter(canLaunchExecutorForApp) } assignedCores } @@ -683,9 +702,11 @@ private[deploy] class Master( if (app.coresLeft >= coresPerExecutor) { // Filter out workers that don't have enough resources to launch an executor val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) - .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && - worker.coresFree >= coresPerExecutor) + .filter(canLaunchExecutor(_, app.desc)) .sortBy(_.coresFree).reverse + if (waitingApps.length == 1 && usableWorkers.isEmpty) { + logWarning(s"App ${app.id} requires more resource than any of Workers could have.") + } val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) // Now that we've decided how many cores to allocate on each worker, let's allocate them @@ -715,12 +736,44 @@ private[deploy] class Master( val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1) val coresToAssign = coresPerExecutor.getOrElse(assignedCores) for (i <- 1 to numExecutors) { - val exec = app.addExecutor(worker, coresToAssign) + val allocated = worker.acquireResources(app.desc.resourceReqsPerExecutor) + val exec = app.addExecutor(worker, coresToAssign, allocated) launchExecutor(worker, exec) app.state = ApplicationState.RUNNING } } + private def canLaunch( + worker: WorkerInfo, + memoryReq: Int, + coresReq: Int, + resourceRequirements: Seq[ResourceRequirement]) + : Boolean = { + val enoughMem = worker.memoryFree >= memoryReq + val enoughCores = worker.coresFree >= coresReq + val enoughResources = ResourceUtils.resourcesMeetRequirements( + worker.resourcesFree, resourceRequirements) + enoughMem && enoughCores && enoughResources + } + + /** + * @return whether the worker could launch the driver represented by DriverDescription + */ + private def canLaunchDriver(worker: WorkerInfo, desc: DriverDescription): Boolean = { + canLaunch(worker, desc.mem, desc.cores, desc.resourceReqs) + } + + /** + * @return whether the worker could launch the executor according to application's requirement + */ + private def canLaunchExecutor(worker: WorkerInfo, desc: ApplicationDescription): Boolean = { + canLaunch( + worker, + desc.memoryPerExecutorMB, + desc.coresPerExecutor.getOrElse(1), + desc.resourceReqsPerExecutor) + } + /** * Schedule the currently available resources among waiting apps. This method will be called * every time a new app joins or resource availability changes. @@ -738,17 +791,24 @@ private[deploy] class Master( // start from the last worker that was assigned a driver, and continue onwards until we have // explored all alive workers. var launched = false + var isClusterIdle = true var numWorkersVisited = 0 while (numWorkersVisited < numWorkersAlive && !launched) { val worker = shuffledAliveWorkers(curPos) + isClusterIdle = worker.drivers.isEmpty && worker.executors.isEmpty numWorkersVisited += 1 - if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { + if (canLaunchDriver(worker, driver.desc)) { + val allocated = worker.acquireResources(driver.desc.resourceReqs) + driver.withResources(allocated) launchDriver(worker, driver) waitingDrivers -= driver launched = true } curPos = (curPos + 1) % numWorkersAlive } + if (!launched && isClusterIdle) { + logWarning(s"Driver ${driver.id} requires more resource than any of Workers could have.") + } } startExecutorsOnWorkers() } @@ -756,8 +816,8 @@ private[deploy] class Master( private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) - worker.endpoint.send(LaunchExecutor(masterUrl, - exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)) + worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id, + exec.application.desc, exec.cores, exec.memory, exec.resources)) exec.application.driver.send( ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)) } @@ -1021,7 +1081,7 @@ private[deploy] class Master( logInfo("Launching driver " + driver.id + " on worker " + worker.id) worker.addDriver(driver) driver.worker = Some(worker) - worker.endpoint.send(LaunchDriver(driver.id, driver.desc)) + worker.endpoint.send(LaunchDriver(driver.id, driver.desc, driver.resources)) driver.state = DriverState.RUNNING } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index c87d6e2..d485db4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -19,9 +19,24 @@ package org.apache.spark.deploy.master import scala.collection.mutable +import org.apache.spark.resource.{ResourceAllocator, ResourceInformation, ResourceRequirement} import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.util.Utils +private[spark] case class WorkerResourceInfo(name: String, addresses: Seq[String]) + extends ResourceAllocator(name, addresses) { + + def toResourceInformation(): ResourceInformation = { + new ResourceInformation(name, addresses.toArray) + } + + def acquire(amount: Int): ResourceInformation = { + val allocated = availableAddrs.take(amount) + acquire(allocated) + new ResourceInformation(name, allocated.toArray) + } +} + private[spark] class WorkerInfo( val id: String, val host: String, @@ -29,7 +44,9 @@ private[spark] class WorkerInfo( val cores: Int, val memory: Int, val endpoint: RpcEndpointRef, - val webUiAddress: String) + val webUiAddress: String, + val resources: Map[String, WorkerResourceInfo], + val pid: Int = 0) extends Serializable { Utils.checkHost(host) @@ -47,6 +64,11 @@ private[spark] class WorkerInfo( def coresFree: Int = cores - coresUsed def memoryFree: Int = memory - memoryUsed + def resourcesFree: Map[String, Int] = { + resources.map { case (rName, rInfo) => + rName -> rInfo.availableAddrs.length + } + } private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException { in.defaultReadObject() @@ -78,6 +100,7 @@ private[spark] class WorkerInfo( executors -= exec.fullId coresUsed -= exec.cores memoryUsed -= exec.memory + releaseResources(exec.resources) } } @@ -95,6 +118,7 @@ private[spark] class WorkerInfo( drivers -= driver.id memoryUsed -= driver.desc.mem coresUsed -= driver.desc.cores + releaseResources(driver.resources) } def setState(state: WorkerState.Value): Unit = { @@ -102,4 +126,36 @@ private[spark] class WorkerInfo( } def isAlive(): Boolean = this.state == WorkerState.ALIVE + + /** + * acquire specified amount resources for driver/executor from the worker + * @param resourceReqs the resources requirement from driver/executor + */ + def acquireResources(resourceReqs: Seq[ResourceRequirement]) + : Map[String, ResourceInformation] = { + resourceReqs.map { req => + val rName = req.resourceName + val amount = req.amount + rName -> resources(rName).acquire(amount) + }.toMap + } + + /** + * used during master recovery + */ + def recoverResources(expected: Map[String, ResourceInformation]): Unit = { + expected.foreach { case (rName, rInfo) => + resources(rName).acquire(rInfo.addresses) + } + } + + /** + * release resources to worker from the driver/executor + * @param allocated the resources which allocated to driver/executor previously + */ + private def releaseResources(allocated: Map[String, ResourceInformation]): Unit = { + allocated.foreach { case (rName, rInfo) => + resources(rName).release(rInfo.addresses) + } + } } diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index f912ed6..c060ef9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -25,6 +25,7 @@ import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription} import org.apache.spark.deploy.ClientArguments._ import org.apache.spark.internal.config import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.resource.ResourceUtils import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.util.Utils @@ -174,8 +175,11 @@ private[rest] class StandaloneSubmitRequestServlet( val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY) val actualDriverCores = driverCores.map(_.toInt).getOrElse(DEFAULT_CORES) val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE) + val driverResourceReqs = ResourceUtils.parseResourceRequirements(conf, + config.SPARK_DRIVER_PREFIX) new DriverDescription( - appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, command) + appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, command, + driverResourceReqs) } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 0c88119..4934722 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -28,10 +28,13 @@ import com.google.common.io.Files import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil} import org.apache.spark.deploy.DeployMessages.DriverStateChanged +import org.apache.spark.deploy.StandaloneResourceUtils.prepareResourcesFile import org.apache.spark.deploy.master.DriverState import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{DRIVER_RESOURCES_FILE, SPARK_DRIVER_PREFIX} import org.apache.spark.internal.config.Worker.WORKER_DRIVER_TERMINATE_TIMEOUT +import org.apache.spark.resource.ResourceInformation import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.util.{Clock, ShutdownHookManager, SystemClock, Utils} @@ -47,7 +50,8 @@ private[deploy] class DriverRunner( val driverDesc: DriverDescription, val worker: RpcEndpointRef, val workerUrl: String, - val securityManager: SecurityManager) + val securityManager: SecurityManager, + val resources: Map[String, ResourceInformation] = Map.empty) extends Logging { @volatile private var process: Option[Process] = None @@ -171,6 +175,7 @@ private[deploy] class DriverRunner( private[worker] def prepareAndRunDriver(): Int = { val driverDir = createWorkingDirectory() val localJarFilename = downloadUserJar(driverDir) + val resourceFileOpt = prepareResourcesFile(SPARK_DRIVER_PREFIX, resources, driverDir) def substituteVariables(argument: String): String = argument match { case "{{WORKER_URL}}" => workerUrl @@ -178,9 +183,12 @@ private[deploy] class DriverRunner( case other => other } + // config resource file for driver, which would be used to load resources when driver starts up + val javaOpts = driverDesc.command.javaOpts ++ resourceFileOpt.map(f => + Seq(s"-D${DRIVER_RESOURCES_FILE.key}=${f.getAbsolutePath}")).getOrElse(Seq.empty) // TODO: If we add ability to submit multiple jars they should also be added here - val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager, - driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables) + val builder = CommandUtils.buildProcessBuilder(driverDesc.command.copy(javaOpts = javaOpts), + securityManager, driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables) runDriver(builder, driverDir, driverDesc.supervise) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 6f1484c..9793910 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -27,8 +27,11 @@ import com.google.common.io.Files import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged +import org.apache.spark.deploy.StandaloneResourceUtils.prepareResourcesFile import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.SPARK_EXECUTOR_PREFIX import org.apache.spark.internal.config.UI._ +import org.apache.spark.resource.{ResourceInformation, ResourceUtils} import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.util.{ShutdownHookManager, Utils} import org.apache.spark.util.logging.FileAppender @@ -54,7 +57,8 @@ private[deploy] class ExecutorRunner( val workerUrl: String, conf: SparkConf, val appLocalDirs: Seq[String], - @volatile var state: ExecutorState.Value) + @volatile var state: ExecutorState.Value, + val resources: Map[String, ResourceInformation] = Map.empty) extends Logging { private val fullId = appId + "/" + execId @@ -143,11 +147,14 @@ private[deploy] class ExecutorRunner( */ private def fetchAndRunExecutor() { try { + val resourceFileOpt = prepareResourcesFile(SPARK_EXECUTOR_PREFIX, resources, executorDir) // Launch the process + val arguments = appDesc.command.arguments ++ resourceFileOpt.map(f => + Seq("--resourcesFile", f.getAbsolutePath)).getOrElse(Seq.empty) val subsOpts = appDesc.command.javaOpts.map { Utils.substituteAppNExecIds(_, appId, execId.toString) } - val subsCommand = appDesc.command.copy(javaOpts = subsOpts) + val subsCommand = appDesc.command.copy(arguments = arguments, javaOpts = subsOpts) val builder = CommandUtils.buildProcessBuilder(subsCommand, new SecurityManager(conf), memory, sparkHome.getAbsolutePath, substituteVariables) val command = builder.command() diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index ac7a1b9..899593d 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -17,8 +17,7 @@ package org.apache.spark.deploy.worker -import java.io.File -import java.io.IOException +import java.io.{File, IOException} import java.text.SimpleDateFormat import java.util.{Date, Locale, UUID} import java.util.concurrent._ @@ -34,6 +33,7 @@ import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.{Command, ExecutorDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.ExternalShuffleService +import org.apache.spark.deploy.StandaloneResourceUtils._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.internal.{config, Logging} @@ -44,7 +44,7 @@ import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances} import org.apache.spark.resource.ResourceInformation import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.rpc._ -import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils} +import org.apache.spark.util.{SignalUtils, SparkUncaughtExceptionHandler, ThreadUtils, Utils} private[deploy] class Worker( override val rpcEnv: RpcEnv, @@ -57,7 +57,8 @@ private[deploy] class Worker( val conf: SparkConf, val securityMgr: SecurityManager, resourceFileOpt: Option[String] = None, - externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null) + externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null, + pid: Int = Utils.getProcessId) extends ThreadSafeRpcEndpoint with Logging { private val host = rpcEnv.address.host @@ -180,7 +181,7 @@ private[deploy] class Worker( ) // visible for tests - private[deploy] var resources: Map[String, ResourceInformation] = _ + private[deploy] var resources: Map[String, ResourceInformation] = Map.empty var coresUsed = 0 var memoryUsed = 0 @@ -190,19 +191,8 @@ private[deploy] class Worker( private def createWorkDir() { workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work")) - try { - // This sporadically fails - not sure why ... !workDir.exists() && !workDir.mkdirs() - // So attempting to create and then check if directory was created or not. - workDir.mkdirs() - if ( !workDir.exists() || !workDir.isDirectory) { - logError("Failed to create work directory " + workDir) - System.exit(1) - } - assert (workDir.isDirectory) - } catch { - case e: Exception => - logError("Failed to create work directory " + workDir, e) - System.exit(1) + if (!Utils.createDirectory(workDir)) { + System.exit(1) } } @@ -214,6 +204,7 @@ private[deploy] class Worker( logInfo("Spark home: " + sparkHome) createWorkDir() startExternalShuffleService() + releaseResourcesOnInterrupt() setupWorkerResources() webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() @@ -227,13 +218,29 @@ private[deploy] class Worker( metricsSystem.getServletHandlers.foreach(webUi.attachHandler) } + /** + * Used to catch the TERM signal from sbin/stop-slave.sh and + * release resources before Worker exits + */ + private def releaseResourcesOnInterrupt(): Unit = { + SignalUtils.register("TERM") { + releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid) + false + } + } + private def setupWorkerResources(): Unit = { try { - resources = getOrDiscoverAllResources(conf, SPARK_WORKER_PREFIX, resourceFileOpt) + val allResources = getOrDiscoverAllResources(conf, SPARK_WORKER_PREFIX, resourceFileOpt) + resources = acquireResources(conf, SPARK_WORKER_PREFIX, allResources, pid) + logResourceInfo(SPARK_WORKER_PREFIX, resources) } catch { case e: Exception => logError("Failed to setup worker resources: ", e) - System.exit(1) + releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid) + if (!Utils.isTesting) { + System.exit(1) + } } } @@ -349,6 +356,7 @@ private[deploy] class Worker( TimeUnit.SECONDS)) } } else { + releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid) logError("All masters are unresponsive! Giving up.") System.exit(1) } @@ -405,7 +413,8 @@ private[deploy] class Worker( cores, memory, workerWebUiUrl, - masterEndpoint.address)) + masterEndpoint.address, + resources)) } private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized { @@ -446,6 +455,7 @@ private[deploy] class Worker( case RegisterWorkerFailed(message) => if (!registered) { logError("Worker registration failed: " + message) + releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid) System.exit(1) } @@ -506,15 +516,20 @@ private[deploy] class Worker( logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL) changeMaster(masterRef, masterWebUiUrl, masterRef.address) - val execs = executors.values. - map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state)) - masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq)) + val executorResponses = executors.values.map { e => + WorkerExecutorStateResponse(new ExecutorDescription( + e.appId, e.execId, e.cores, e.state), e.resources) + } + val driverResponses = drivers.keys.map { id => + WorkerDriverStateResponse(id, drivers(id).resources)} + masterRef.send(WorkerSchedulerStateResponse( + workerId, executorResponses.toList, driverResponses.toSeq)) case ReconnectWorker(masterUrl) => logInfo(s"Master with url $masterUrl requested this worker to reconnect.") registerWithMaster() - case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => + case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, resources_) => if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") } else { @@ -567,7 +582,8 @@ private[deploy] class Worker( workerUri, conf, appLocalDirs, - ExecutorState.LAUNCHING) + ExecutorState.LAUNCHING, + resources_) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ @@ -601,7 +617,7 @@ private[deploy] class Worker( } } - case LaunchDriver(driverId, driverDesc) => + case LaunchDriver(driverId, driverDesc, resources_) => logInfo(s"Asked to launch driver $driverId") val driver = new DriverRunner( conf, @@ -611,7 +627,8 @@ private[deploy] class Worker( driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)), self, workerUri, - securityMgr) + securityMgr, + resources_) drivers(driverId) = driver driver.start() @@ -701,6 +718,7 @@ private[deploy] class Worker( } override def onStop() { + releaseResources(conf, SPARK_WORKER_PREFIX, resources, pid) cleanupThreadExecutor.shutdownNow() metricsSystem.report() cancelLastRegistrationRetry() @@ -835,8 +853,9 @@ private[deploy] object Worker extends Logging { val securityMgr = new SecurityManager(conf) val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr) val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL) + val pid = if (Utils.isTesting) workerNumber.get else Utils.getProcessId rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory, - masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr, resourceFileOpt)) + masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr, resourceFileOpt, pid = pid)) rpcEnv } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 98e5aa6..a42a928 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -90,11 +90,13 @@ private[spark] class CoarseGrainedExecutorBackend( // visible for testing def parseOrFindResources(resourcesFileOpt: Option[String]): Map[String, ResourceInformation] = { // only parse the resources if a task requires them - val resourceInfo = if (parseTaskResourceRequirements(env.conf).nonEmpty) { + val resourceInfo = if (parseResourceRequirements(env.conf, SPARK_TASK_PREFIX).nonEmpty) { val resources = getOrDiscoverAllResources(env.conf, SPARK_EXECUTOR_PREFIX, resourcesFileOpt) if (resources.isEmpty) { throw new SparkException("User specified resources per task via: " + s"$SPARK_TASK_PREFIX, but can't find any resources available on the executor.") + } else { + logResourceInfo(SPARK_EXECUTOR_PREFIX, resources) } resources } else { diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index e014721..214675b 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -36,6 +36,23 @@ package object config { private[spark] val SPARK_EXECUTOR_PREFIX = "spark.executor" private[spark] val SPARK_TASK_PREFIX = "spark.task" + private[spark] val SPARK_RESOURCES_COORDINATE = + ConfigBuilder("spark.resources.coordinate.enable") + .doc("Whether to coordinate resources automatically among workers/drivers(client only) " + + "in Standalone. If false, the user is responsible for configuring different resources " + + "for workers/drivers that run on the same host.") + .booleanConf + .createWithDefault(true) + + private[spark] val SPARK_RESOURCES_DIR = + ConfigBuilder("spark.resources.dir") + .doc("Directory used to coordinate resources among workers/drivers(client only) in " + + "Standalone. Default is SPARK_HOME. Make sure to use the same directory for worker " + + "and drivers in client mode that run on the same host. Don't clean up this directory " + + "while workers/drivers are still alive to avoid the most likely resources conflict. ") + .stringConf + .createOptional + private[spark] val DRIVER_RESOURCES_FILE = ConfigBuilder("spark.driver.resourcesFile") .internal() diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala similarity index 86% copy from core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala copy to core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala index c75931d..719f34db 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.scheduler +package org.apache.spark.resource import scala.collection.mutable @@ -23,17 +23,12 @@ import org.apache.spark.SparkException import org.apache.spark.util.collection.OpenHashMap /** - * Class to hold information about a type of Resource on an Executor. This information is managed - * by SchedulerBackend, and TaskScheduler shall schedule tasks on idle Executors based on the - * information. + * Class used to help executor/worker allocate resources * Please note that this class is intended to be used in a single thread. - * @param name Resource name - * @param addresses Resource addresses provided by the executor + * @param name Resource name, e.g. gpu/fpga + * @param addresses Resource addresses provided by the executor/worker */ -private[spark] class ExecutorResourceInfo( - val name: String, - addresses: Seq[String]) extends Serializable { - +class ResourceAllocator(name: String, addresses: Seq[String]) extends Serializable { /** * Map from an address to its availability, the value `true` means the address is available, * while value `false` means the address is assigned. @@ -52,7 +47,7 @@ private[spark] class ExecutorResourceInfo( * Sequence of currently assigned resource addresses. * Exposed for testing only. */ - private[scheduler] def assignedAddrs: Seq[String] = addressAvailabilityMap + private[spark] def assignedAddrs: Seq[String] = addressAvailabilityMap .flatMap { case (addr, available) => if (!available) Some(addr) else None }.toSeq diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala index 6926586..150ba09 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala @@ -27,7 +27,6 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging -import org.apache.spark.internal.config._ import org.apache.spark.util.Utils.executeAndGetOutput /** @@ -48,7 +47,7 @@ private[spark] case class ResourceRequest( discoveryScript: Option[String], vendor: Option[String]) -private[spark] case class TaskResourceRequirement(resourceName: String, amount: Int) +private[spark] case class ResourceRequirement(resourceName: String, amount: Int) /** * Case class representing allocated resource addresses for a specific resource. @@ -62,7 +61,6 @@ private[spark] case class ResourceAllocation(id: ResourceID, addresses: Seq[Stri } private[spark] object ResourceUtils extends Logging { - // config suffixes val DISCOVERY_SCRIPT = "discoveryScript" val VENDOR = "vendor" @@ -94,23 +92,39 @@ private[spark] object ResourceUtils extends Logging { } } - def parseTaskResourceRequirements(sparkConf: SparkConf): Seq[TaskResourceRequirement] = { - parseAllResourceRequests(sparkConf, SPARK_TASK_PREFIX).map { request => - TaskResourceRequirement(request.id.resourceName, request.amount) + def parseResourceRequirements(sparkConf: SparkConf, componentName: String) + : Seq[ResourceRequirement] = { + parseAllResourceRequests(sparkConf, componentName).map { request => + ResourceRequirement(request.id.resourceName, request.amount) + } + } + + def resourcesMeetRequirements( + resourcesFree: Map[String, Int], + resourceRequirements: Seq[ResourceRequirement]) + : Boolean = { + resourceRequirements.forall { req => + resourcesFree.getOrElse(req.resourceName, 0) >= req.amount } } - private def parseAllocatedFromJsonFile(resourcesFile: String): Seq[ResourceAllocation] = { - implicit val formats = DefaultFormats + def withResourcesJson[T](resourcesFile: String)(extract: String => Seq[T]): Seq[T] = { val json = new String(Files.readAllBytes(Paths.get(resourcesFile))) try { - parse(json).extract[Seq[ResourceAllocation]] + extract(json) } catch { case NonFatal(e) => throw new SparkException(s"Error parsing resources file $resourcesFile", e) } } + def parseAllocatedFromJsonFile(resourcesFile: String): Seq[ResourceAllocation] = { + withResourcesJson[ResourceAllocation](resourcesFile) { json => + implicit val formats = DefaultFormats + parse(json).extract[Seq[ResourceAllocation]] + } + } + private def parseAllocatedOrDiscoverResources( sparkConf: SparkConf, componentName: String, @@ -154,10 +168,14 @@ private[spark] object ResourceUtils extends Logging { val allocations = parseAllocatedOrDiscoverResources(sparkConf, componentName, resourcesFileOpt) assertAllResourceAllocationsMeetRequests(allocations, requests) val resourceInfoMap = allocations.map(a => (a.id.resourceName, a.toResourceInformation)).toMap + resourceInfoMap + } + + def logResourceInfo(componentName: String, resources: Map[String, ResourceInformation]) + : Unit = { logInfo("==============================================================") - logInfo(s"Resources for $componentName:\n${resourceInfoMap.mkString("\n")}") + logInfo(s"Resources for $componentName:\n${resources.mkString("\n")}") logInfo("==============================================================") - resourceInfoMap } // visible for test @@ -175,7 +193,7 @@ private[spark] object ResourceUtils extends Logging { "doesn't exist!") } } else { - throw new SparkException(s"User is expecting to use resource: $resourceName but " + + throw new SparkException(s"User is expecting to use resource: $resourceName, but " + "didn't specify a discovery script!") } if (!result.name.equals(resourceName)) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala index c75931d..f05281e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala @@ -17,85 +17,14 @@ package org.apache.spark.scheduler -import scala.collection.mutable - -import org.apache.spark.SparkException -import org.apache.spark.util.collection.OpenHashMap +import org.apache.spark.resource.ResourceAllocator /** * Class to hold information about a type of Resource on an Executor. This information is managed * by SchedulerBackend, and TaskScheduler shall schedule tasks on idle Executors based on the * information. - * Please note that this class is intended to be used in a single thread. * @param name Resource name * @param addresses Resource addresses provided by the executor */ -private[spark] class ExecutorResourceInfo( - val name: String, - addresses: Seq[String]) extends Serializable { - - /** - * Map from an address to its availability, the value `true` means the address is available, - * while value `false` means the address is assigned. - * TODO Use [[OpenHashMap]] instead to gain better performance. - */ - private val addressAvailabilityMap = mutable.HashMap(addresses.map(_ -> true): _*) - - /** - * Sequence of currently available resource addresses. - */ - def availableAddrs: Seq[String] = addressAvailabilityMap.flatMap { case (addr, available) => - if (available) Some(addr) else None - }.toSeq - - /** - * Sequence of currently assigned resource addresses. - * Exposed for testing only. - */ - private[scheduler] def assignedAddrs: Seq[String] = addressAvailabilityMap - .flatMap { case (addr, available) => - if (!available) Some(addr) else None - }.toSeq - - /** - * Acquire a sequence of resource addresses (to a launched task), these addresses must be - * available. When the task finishes, it will return the acquired resource addresses. - * Throw an Exception if an address is not available or doesn't exist. - */ - def acquire(addrs: Seq[String]): Unit = { - addrs.foreach { address => - if (!addressAvailabilityMap.contains(address)) { - throw new SparkException(s"Try to acquire an address that doesn't exist. $name address " + - s"$address doesn't exist.") - } - val isAvailable = addressAvailabilityMap(address) - if (isAvailable) { - addressAvailabilityMap(address) = false - } else { - throw new SparkException(s"Try to acquire an address that is not available. $name " + - s"address $address is not available.") - } - } - } - - /** - * Release a sequence of resource addresses, these addresses must have been assigned. Resource - * addresses are released when a task has finished. - * Throw an Exception if an address is not assigned or doesn't exist. - */ - def release(addrs: Seq[String]): Unit = { - addrs.foreach { address => - if (!addressAvailabilityMap.contains(address)) { - throw new SparkException(s"Try to release an address that doesn't exist. $name address " + - s"$address doesn't exist.") - } - val isAvailable = addressAvailabilityMap(address) - if (!isAvailable) { - addressAvailabilityMap(address) = true - } else { - throw new SparkException(s"Try to release an address that is not assigned. $name " + - s"address $address is not assigned.") - } - } - } -} +private[spark] class ExecutorResourceInfo(name: String, addresses: Seq[String]) + extends ResourceAllocator(name, addresses) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 2e3e0a2..1496dff 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -94,7 +94,7 @@ private[spark] class TaskSchedulerImpl( val CPUS_PER_TASK = conf.get(config.CPUS_PER_TASK) // Resources to request per task - val resourcesReqsPerTask = ResourceUtils.parseTaskResourceRequirements(sc.conf) + val resourcesReqsPerTask = ResourceUtils.parseResourceRequirements(sc.conf, SPARK_TASK_PREFIX) // TaskSetManagers are not thread safe, so any access to one should be synchronized // on this class. Protected by `this` @@ -383,9 +383,8 @@ private[spark] class TaskSchedulerImpl( * Check whether the resources from the WorkerOffer are enough to run at least one task. */ private def resourcesMeetTaskRequirements(resources: Map[String, Buffer[String]]): Boolean = { - resourcesReqsPerTask.forall { req => - resources.contains(req.resourceName) && resources(req.resourceName).size >= req.amount - } + val resourcesFree = resources.map(r => r._1 -> r._2.length) + ResourceUtils.resourcesMeetRequirements(resourcesFree, resourcesReqsPerTask) } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index e0605fe..2025a7d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -28,6 +28,7 @@ import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientL import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} +import org.apache.spark.resource.ResourceUtils import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler._ import org.apache.spark.util.Utils @@ -112,8 +113,11 @@ private[spark] class StandaloneSchedulerBackend( } else { None } + val executorResourceReqs = ResourceUtils.parseResourceRequirements(conf, + config.SPARK_EXECUTOR_PREFIX) val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit) + webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit, + resourceReqsPerExecutor = executorResourceReqs) client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf) client.start() launcherBackend.setState(SparkAppHandle.State.SUBMITTED) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 3ad67f4..9c1f21f 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -270,6 +270,26 @@ private[spark] object Utils extends Logging { } /** + * Create a directory given the abstract pathname + * @return true, if the directory is successfully created; otherwise, return false. + */ + def createDirectory(dir: File): Boolean = { + try { + // This sporadically fails - not sure why ... !dir.exists() && !dir.mkdirs() + // So attempting to create and then check if directory was created or not. + dir.mkdirs() + if ( !dir.exists() || !dir.isDirectory) { + logError(s"Failed to create directory " + dir) + } + dir.isDirectory + } catch { + case e: Exception => + logError(s"Failed to create directory " + dir, e) + false + } + } + + /** * Create a directory inside the given parent directory. The directory is guaranteed to be * newly created, and is not marked for automatic deletion. */ @@ -2555,6 +2575,28 @@ private[spark] object Utils extends Logging { } /** + * Given a process id, return true if the process is still running. + */ + def isProcessRunning(pid: Int): Boolean = { + val process = executeCommand(Seq("kill", "-0", pid.toString)) + process.waitFor(10, TimeUnit.SECONDS) + process.exitValue() == 0 + } + + /** + * Returns the pid of this JVM process. + */ + def getProcessId: Int = { + val PROCESS = "(\\d+)@(.*)".r + val name = getProcessName() + name match { + case PROCESS(pid, _) => pid.toInt + case _ => + throw new SparkException(s"Unexpected process name: $name, expected to be PID@hostname.") + } + } + + /** * Returns the name of this JVM process. This is OS dependent but typically (OSX, Linux, Windows), * this is formatted as PID@hostname. */ diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 202b85d..9f00131 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -440,7 +440,8 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst conf.set(TASK_GPU_ID.amountConf, "2") conf.set(TASK_FPGA_ID.amountConf, "1") var taskResourceRequirement = - parseTaskResourceRequirements(conf).map(req => (req.resourceName, req.amount)).toMap + parseResourceRequirements(conf, SPARK_TASK_PREFIX) + .map(req => (req.resourceName, req.amount)).toMap assert(taskResourceRequirement.size == 2) assert(taskResourceRequirement(GPU) == 2) @@ -450,7 +451,8 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst // Ignore invalid prefix conf.set(ResourceID("spark.invalid.prefix", FPGA).amountConf, "1") taskResourceRequirement = - parseTaskResourceRequirements(conf).map(req => (req.resourceName, req.amount)).toMap + parseResourceRequirements(conf, SPARK_TASK_PREFIX) + .map(req => (req.resourceName, req.amount)).toMap assert(taskResourceRequirement.size == 1) assert(taskResourceRequirement.get(FPGA).isEmpty) } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index fed3ae3..c1402bd 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -756,7 +756,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu val conf = new SparkConf() .setMaster("local-cluster[1, 1, 1024]") .setAppName("test-cluster") - conf.set(DRIVER_GPU_ID.amountConf, "1") + conf.set(DRIVER_GPU_ID.amountConf, "2") conf.set(DRIVER_GPU_ID.discoveryScriptConf, scriptPath) sc = new SparkContext(conf) @@ -783,7 +783,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu .set(DRIVER_RESOURCES_FILE, resourcesFile) .setMaster("local-cluster[1, 1, 1024]") .setAppName("test-cluster") - conf.set(DRIVER_GPU_ID.amountConf, "1") + conf.set(DRIVER_GPU_ID.amountConf, "3") conf.set(DRIVER_GPU_ID.discoveryScriptConf, scriptPath) sc = new SparkContext(conf) @@ -850,26 +850,27 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu assume(!(Utils.isWindows)) withTempDir { dir => val discoveryScript = createTempScriptWithExpectedOutput(dir, "resourceDiscoveryScript", - """{"name": "gpu","addresses":["0", "1", "2"]}""") + """{"name": "gpu","addresses":["0", "1", "2", "3", "4", "5", "6", "7", "8"]}""") val conf = new SparkConf() .setMaster("local-cluster[3, 3, 1024]") .setAppName("test-cluster") - conf.set(TASK_GPU_ID.amountConf, "1") + conf.set(WORKER_GPU_ID.amountConf, "3") + conf.set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript) + conf.set(TASK_GPU_ID.amountConf, "3") conf.set(EXECUTOR_GPU_ID.amountConf, "3") - conf.set(EXECUTOR_GPU_ID.discoveryScriptConf, discoveryScript) sc = new SparkContext(conf) // Ensure all executors has started TestUtils.waitUntilExecutorsUp(sc, 3, 60000) - val rdd = sc.makeRDD(1 to 10, 9).mapPartitions { it => + val rdd = sc.makeRDD(1 to 10, 3).mapPartitions { it => val context = TaskContext.get() context.resources().get(GPU).get.addresses.iterator } val gpus = rdd.collect() - assert(gpus.sorted === Seq("0", "0", "0", "1", "1", "1", "2", "2", "2")) + assert(gpus.sorted === Seq("0", "1", "2", "3", "4", "5", "6", "7", "8")) eventually(timeout(10.seconds)) { assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0) diff --git a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala index 784981e..a2c4669 100644 --- a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala @@ -18,7 +18,6 @@ package org.apache.spark.deploy import java.io.File -import java.util.Date import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo} @@ -50,7 +49,8 @@ private[deploy] object DeployTestUtils { createDriverDesc(), JsonConstants.submitDate) def createWorkerInfo(): WorkerInfo = { - val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, "http://publicAddress:80") + val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, + "http://publicAddress:80", Map.empty) workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis workerInfo } diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index f19e998..9ce046a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -42,6 +42,8 @@ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Deploy._ import org.apache.spark.internal.config.UI._ import org.apache.spark.internal.config.Worker._ +import org.apache.spark.resource.{ResourceInformation, ResourceRequirement} +import org.apache.spark.resource.ResourceUtils.{FPGA, GPU} import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.serializer @@ -68,17 +70,23 @@ class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extend }) } - val appDesc = DeployTestUtils.createAppDesc() + var appDesc = DeployTestUtils.createAppDesc() val drivers = mutable.HashSet[String]() + val driverResources = new mutable.HashMap[String, Map[String, Set[String]]] + val execResources = new mutable.HashMap[String, Map[String, Set[String]]] override def receive: PartialFunction[Any, Unit] = { case RegisteredWorker(masterRef, _, _, _) => masterRef.send(WorkerLatestState(id, Nil, drivers.toSeq)) - case LaunchDriver(driverId, desc) => + case LaunchExecutor(_, appId, execId, _, _, _, resources_) => + execResources(appId + "/" + execId) = resources_.map(r => (r._1, r._2.addresses.toSet)) + case LaunchDriver(driverId, desc, resources_) => drivers += driverId + driverResources(driverId) = resources_.map(r => (r._1, r._2.addresses.toSet)) master.send(RegisterApplication(appDesc, newDriver(driverId))) case KillDriver(driverId) => master.send(DriverStateChanged(driverId, DriverState.KILLED, None)) drivers -= driverId + driverResources.remove(driverId) driverIdToAppId.get(driverId) match { case Some(appId) => apps.remove(appId) @@ -93,7 +101,7 @@ class MockExecutorLaunchFailWorker(master: RpcEndpointRef, conf: SparkConf = new extends MockWorker(master, conf) { var failedCnt = 0 override def receive: PartialFunction[Any, Unit] = { - case LaunchExecutor(_, appId, execId, _, _, _) => + case LaunchExecutor(_, appId, execId, _, _, _, _) => failedCnt += 1 master.send(ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)) case otherMsg => super.receive(otherMsg) @@ -167,7 +175,8 @@ class MasterSuite extends SparkFunSuite cores = 0, memory = 0, endpoint = null, - webUiAddress = "http://localhost:80" + webUiAddress = "http://localhost:80", + Map.empty ) val (rpcEnv, _, _) = @@ -248,9 +257,12 @@ class MasterSuite extends SparkFunSuite // Application state should be WAITING when "MasterChangeAcknowledged" event executed. fakeAppInfo.state should be(ApplicationState.WAITING) } - - master.self.send( - WorkerSchedulerStateResponse(fakeWorkerInfo.id, fakeExecutors, Seq(fakeDriverInfo.id))) + val execResponse = fakeExecutors.map(exec => + WorkerExecutorStateResponse(exec, Map.empty[String, ResourceInformation])) + val driverResponse = WorkerDriverStateResponse( + fakeDriverInfo.id, Map.empty[String, ResourceInformation]) + master.self.send(WorkerSchedulerStateResponse( + fakeWorkerInfo.id, execResponse, Seq(driverResponse))) eventually(timeout(5.seconds), interval(100.milliseconds)) { getState(master) should be(RecoveryState.ALIVE) @@ -545,6 +557,16 @@ class MasterSuite extends SparkFunSuite _master } + def makeAliveMaster(conf: SparkConf = new SparkConf): Master = { + val master = makeMaster(conf) + master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master) + eventually(timeout(10.seconds)) { + val masterState = master.self.askSync[MasterStateResponse](RequestMasterState) + assert(masterState.status === RecoveryState.ALIVE, "Master is not alive") + } + master + } + private def makeAppInfo( memoryPerExecutorMb: Int, coresPerExecutor: Option[Int] = None, @@ -563,7 +585,8 @@ class MasterSuite extends SparkFunSuite val endpointRef = mock(classOf[RpcEndpointRef]) val mockAddress = mock(classOf[RpcAddress]) when(endpointRef.address).thenReturn(mockAddress) - new WorkerInfo(workerId, "host", 100, cores, memoryMb, endpointRef, "http://localhost:80") + new WorkerInfo(workerId, "host", 100, cores, memoryMb, + endpointRef, "http://localhost:80", Map.empty) } private def scheduleExecutorsOnWorkers( @@ -575,13 +598,7 @@ class MasterSuite extends SparkFunSuite } test("SPARK-13604: Master should ask Worker kill unknown executors and drivers") { - val master = makeMaster() - master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master) - eventually(timeout(10.seconds)) { - val masterState = master.self.askSync[MasterStateResponse](RequestMasterState) - assert(masterState.status === RecoveryState.ALIVE, "Master is not alive") - } - + val master = makeAliveMaster() val killedExecutors = new ConcurrentLinkedQueue[(String, Int)]() val killedDrivers = new ConcurrentLinkedQueue[String]() val fakeWorker = master.rpcEnv.setupEndpoint("worker", new RpcEndpoint { @@ -614,13 +631,7 @@ class MasterSuite extends SparkFunSuite } test("SPARK-20529: Master should reply the address received from worker") { - val master = makeMaster() - master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master) - eventually(timeout(10.seconds)) { - val masterState = master.self.askSync[MasterStateResponse](RequestMasterState) - assert(masterState.status === RecoveryState.ALIVE, "Master is not alive") - } - + val master = makeAliveMaster() @volatile var receivedMasterAddress: RpcAddress = null val fakeWorker = master.rpcEnv.setupEndpoint("worker", new RpcEndpoint { override val rpcEnv: RpcEnv = master.rpcEnv @@ -647,13 +658,7 @@ class MasterSuite extends SparkFunSuite } test("SPARK-27510: Master should avoid dead loop while launching executor failed in Worker") { - val master = makeMaster() - master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master) - eventually(timeout(10.seconds)) { - val masterState = master.self.askSync[MasterStateResponse](RequestMasterState) - assert(masterState.status === RecoveryState.ALIVE, "Master is not alive") - } - + val master = makeAliveMaster() var worker: MockExecutorLaunchFailWorker = null try { worker = new MockExecutorLaunchFailWorker(master.self) @@ -697,12 +702,7 @@ class MasterSuite extends SparkFunSuite test("SPARK-19900: there should be a corresponding driver for the app after relaunching driver") { val conf = new SparkConf().set(WORKER_TIMEOUT, 1L) - val master = makeMaster(conf) - master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master) - eventually(timeout(10.seconds)) { - val masterState = master.self.askSync[MasterStateResponse](RequestMasterState) - assert(masterState.status === RecoveryState.ALIVE, "Master is not alive") - } + val master = makeAliveMaster(conf) var worker1: MockWorker = null var worker2: MockWorker = null try { @@ -770,6 +770,95 @@ class MasterSuite extends SparkFunSuite } } + test("assign/recycle resources to/from driver") { + val master = makeAliveMaster() + val masterRef = master.self + val resourceReqs = Seq(ResourceRequirement(GPU, 3), ResourceRequirement(FPGA, 3)) + val driver = DeployTestUtils.createDriverDesc().copy(resourceReqs = resourceReqs) + val driverId = masterRef.askSync[SubmitDriverResponse]( + RequestSubmitDriver(driver)).driverId.get + var status = masterRef.askSync[DriverStatusResponse](RequestDriverStatus(driverId)) + assert(status.state === Some(DriverState.SUBMITTED)) + val worker = new MockWorker(masterRef) + worker.rpcEnv.setupEndpoint(s"worker", worker) + val resources = Map(GPU -> new ResourceInformation(GPU, Array("0", "1", "2")), + FPGA -> new ResourceInformation(FPGA, Array("f1", "f2", "f3"))) + val regMsg = RegisterWorker(worker.id, "localhost", 7077, worker.self, 10, 1024, + "http://localhost:8080", RpcAddress("localhost", 10000), resources) + masterRef.send(regMsg) + eventually(timeout(10.seconds)) { + status = masterRef.askSync[DriverStatusResponse](RequestDriverStatus(driverId)) + assert(status.state === Some(DriverState.RUNNING)) + assert(worker.drivers.head === driverId) + assert(worker.driverResources(driverId) === Map(GPU -> Set("0", "1", "2"), + FPGA -> Set("f1", "f2", "f3"))) + val workerResources = master.workers.head.resources + assert(workerResources(GPU).availableAddrs.length === 0) + assert(workerResources(GPU).assignedAddrs.toSet === Set("0", "1", "2")) + assert(workerResources(FPGA).availableAddrs.length === 0) + assert(workerResources(FPGA).assignedAddrs.toSet === Set("f1", "f2", "f3")) + } + val driverFinished = DriverStateChanged(driverId, DriverState.FINISHED, None) + masterRef.send(driverFinished) + eventually(timeout(10.seconds)) { + val workerResources = master.workers.head.resources + assert(workerResources(GPU).availableAddrs.length === 3) + assert(workerResources(GPU).assignedAddrs.toSet === Set()) + assert(workerResources(FPGA).availableAddrs.length === 3) + assert(workerResources(FPGA).assignedAddrs.toSet === Set()) + } + } + + test("assign/recycle resources to/from executor") { + + def makeWorkerAndRegister( + master: RpcEndpointRef, + workerResourceReqs: Map[String, Int] = Map.empty) + : MockWorker = { + val worker = new MockWorker(master) + worker.rpcEnv.setupEndpoint(s"worker", worker) + val resources = workerResourceReqs.map { case (rName, amount) => + val shortName = rName.charAt(0) + val addresses = (0 until amount).map(i => s"$shortName$i").toArray + rName -> new ResourceInformation(rName, addresses) + } + val reg = RegisterWorker(worker.id, "localhost", 8077, worker.self, 10, 2048, + "http://localhost:8080", RpcAddress("localhost", 10000), resources) + master.send(reg) + worker + } + + val master = makeAliveMaster() + val masterRef = master.self + val resourceReqs = Seq(ResourceRequirement(GPU, 3), ResourceRequirement(FPGA, 3)) + val worker = makeWorkerAndRegister(masterRef, Map(GPU -> 6, FPGA -> 6)) + worker.appDesc = worker.appDesc.copy(resourceReqsPerExecutor = resourceReqs) + val driver = DeployTestUtils.createDriverDesc().copy(resourceReqs = resourceReqs) + val driverId = masterRef.askSync[SubmitDriverResponse](RequestSubmitDriver(driver)).driverId + val status = masterRef.askSync[DriverStatusResponse](RequestDriverStatus(driverId.get)) + assert(status.state === Some(DriverState.RUNNING)) + val workerResources = master.workers.head.resources + eventually(timeout(10.seconds)) { + assert(workerResources(GPU).availableAddrs.length === 0) + assert(workerResources(FPGA).availableAddrs.length === 0) + assert(worker.driverResources.size === 1) + assert(worker.execResources.size === 1) + val driverResources = worker.driverResources.head._2 + val execResources = worker.execResources.head._2 + val gpuAddrs = driverResources(GPU).union(execResources(GPU)) + val fpgaAddrs = driverResources(FPGA).union(execResources(FPGA)) + assert(gpuAddrs === Set("g0", "g1", "g2", "g3", "g4", "g5")) + assert(fpgaAddrs === Set("f0", "f1", "f2", "f3", "f4", "f5")) + } + val appId = worker.apps.head._1 + masterRef.send(UnregisterApplication(appId)) + masterRef.send(DriverStateChanged(driverId.get, DriverState.FINISHED, None)) + eventually(timeout(10.seconds)) { + assert(workerResources(GPU).availableAddrs.length === 6) + assert(workerResources(FPGA).availableAddrs.length === 6) + } + } + private def getDrivers(master: Master): HashSet[DriverInfo] = { master.invokePrivate(_drivers()) } diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala index 3d8a46b..3960762 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala @@ -86,7 +86,8 @@ class PersistenceEngineSuite extends SparkFunSuite { cores = 0, memory = 0, endpoint = workerEndpoint, - webUiAddress = "http://localhost:80") + webUiAddress = "http://localhost:80", + Map.empty) persistenceEngine.addWorker(workerToPersist) diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala index 37e5fbc..bb541b4 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala @@ -23,6 +23,7 @@ import java.util.function.Supplier import scala.concurrent.duration._ +import org.json4s.{DefaultFormats, Extraction} import org.mockito.{Mock, MockitoAnnotations} import org.mockito.Answers.RETURNS_SMART_NULLS import org.mockito.ArgumentMatchers.any @@ -32,11 +33,16 @@ import org.scalatest.{BeforeAndAfter, Matchers} import org.scalatest.concurrent.Eventually.{eventually, interval, timeout} import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.TestUtils.{createTempJsonFile, createTempScriptWithExpectedOutput} import org.apache.spark.deploy.{Command, ExecutorState, ExternalShuffleService} import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged, WorkDirCleanup} +import org.apache.spark.deploy.StandaloneResourceUtils.{ALLOCATED_RESOURCES_FILE, SPARK_RESOURCES_COORDINATE_DIR} import org.apache.spark.deploy.master.DriverState import org.apache.spark.internal.config import org.apache.spark.internal.config.Worker._ +import org.apache.spark.resource.{ResourceAllocation, ResourceInformation} +import org.apache.spark.resource.ResourceUtils._ +import org.apache.spark.resource.TestResourceIDs.{WORKER_FPGA_ID, WORKER_GPU_ID} import org.apache.spark.rpc.{RpcAddress, RpcEnv} import org.apache.spark.util.Utils @@ -51,17 +57,36 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { } def conf(opts: (String, String)*): SparkConf = new SparkConf(loadDefaults = false).setAll(opts) + implicit val formats = DefaultFormats + private var _worker: Worker = _ private def makeWorker( - conf: SparkConf, - shuffleServiceSupplier: Supplier[ExternalShuffleService] = null): Worker = { + conf: SparkConf = new SparkConf(), + shuffleServiceSupplier: Supplier[ExternalShuffleService] = null, + pid: Int = Utils.getProcessId, + local: Boolean = false): Worker = { assert(_worker === null, "Some Worker's RpcEnv is leaked in tests") val securityMgr = new SecurityManager(conf) val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, securityMgr) - _worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)), - "Worker", "/tmp", conf, securityMgr, None, shuffleServiceSupplier) - _worker + val resourcesFile = conf.get(SPARK_WORKER_RESOURCE_FILE) + val localWorker = new Worker(rpcEnv, 50000, 20, 1234 * 5, + Array.fill(1)(RpcAddress("1.2.3.4", 1234)), "Worker", "/tmp", + conf, securityMgr, resourcesFile, shuffleServiceSupplier, pid) + if (local) { + localWorker + } else { + _worker = localWorker + _worker + } + } + + private def assertResourcesFileDeleted(): Unit = { + assert(sys.props.contains("spark.test.home")) + val sparkHome = sys.props.get("spark.test.home") + val resourceFile = new File(sparkHome + "/" + SPARK_RESOURCES_COORDINATE_DIR, + ALLOCATED_RESOURCES_FILE) + assert(!resourceFile.exists()) } before { @@ -218,6 +243,141 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { } } + test("worker could be launched without any resources") { + val worker = makeWorker() + worker.rpcEnv.setupEndpoint("worker", worker) + eventually(timeout(10.seconds)) { + assert(worker.resources === Map.empty) + worker.rpcEnv.shutdown() + worker.rpcEnv.awaitTermination() + } + assertResourcesFileDeleted() + } + + test("worker could load resources from resources file while launching") { + val conf = new SparkConf() + withTempDir { dir => + val gpuArgs = ResourceAllocation(WORKER_GPU_ID, Seq("0", "1")) + val fpgaArgs = + ResourceAllocation(WORKER_FPGA_ID, Seq("f1", "f2", "f3")) + val ja = Extraction.decompose(Seq(gpuArgs, fpgaArgs)) + val f1 = createTempJsonFile(dir, "resources", ja) + conf.set(SPARK_WORKER_RESOURCE_FILE.key, f1) + conf.set(WORKER_GPU_ID.amountConf, "2") + conf.set(WORKER_FPGA_ID.amountConf, "3") + val worker = makeWorker(conf) + worker.rpcEnv.setupEndpoint("worker", worker) + eventually(timeout(10.seconds)) { + assert(worker.resources === Map(GPU -> gpuArgs.toResourceInformation, + FPGA -> fpgaArgs.toResourceInformation)) + worker.rpcEnv.shutdown() + worker.rpcEnv.awaitTermination() + } + assertResourcesFileDeleted() + } + } + + test("worker could load resources from discovery script while launching") { + val conf = new SparkConf() + withTempDir { dir => + val scriptPath = createTempScriptWithExpectedOutput(dir, "fpgaDiscoverScript", + """{"name": "fpga","addresses":["f1", "f2", "f3"]}""") + conf.set(WORKER_FPGA_ID.discoveryScriptConf, scriptPath) + conf.set(WORKER_FPGA_ID.amountConf, "3") + val worker = makeWorker(conf) + worker.rpcEnv.setupEndpoint("worker", worker) + eventually(timeout(10.seconds)) { + assert(worker.resources === Map(FPGA -> + new ResourceInformation(FPGA, Array("f1", "f2", "f3")))) + worker.rpcEnv.shutdown() + worker.rpcEnv.awaitTermination() + } + assertResourcesFileDeleted() + } + } + + test("worker could load resources from resources file and discovery script while launching") { + val conf = new SparkConf() + withTempDir { dir => + val gpuArgs = ResourceAllocation(WORKER_GPU_ID, Seq("0", "1")) + val ja = Extraction.decompose(Seq(gpuArgs)) + val resourcesPath = createTempJsonFile(dir, "resources", ja) + val scriptPath = createTempScriptWithExpectedOutput(dir, "fpgaDiscoverScript", + """{"name": "fpga","addresses":["f1", "f2", "f3"]}""") + conf.set(SPARK_WORKER_RESOURCE_FILE.key, resourcesPath) + conf.set(WORKER_FPGA_ID.discoveryScriptConf, scriptPath) + conf.set(WORKER_FPGA_ID.amountConf, "3") + conf.set(WORKER_GPU_ID.amountConf, "2") + val worker = makeWorker(conf) + worker.rpcEnv.setupEndpoint("worker", worker) + eventually(timeout(10.seconds)) { + assert(worker.resources === Map(GPU -> gpuArgs.toResourceInformation, + FPGA -> new ResourceInformation(FPGA, Array("f1", "f2", "f3")))) + worker.rpcEnv.shutdown() + worker.rpcEnv.awaitTermination() + } + assertResourcesFileDeleted() + } + } + + test("Workers run on the same host should avoid resources conflict when coordinate is on") { + val conf = new SparkConf() + withTempDir { dir => + val scriptPath = createTempScriptWithExpectedOutput(dir, "fpgaDiscoverScript", + """{"name": "fpga","addresses":["f1", "f2", "f3", "f4", "f5"]}""") + conf.set(WORKER_FPGA_ID.discoveryScriptConf, scriptPath) + conf.set(WORKER_FPGA_ID.amountConf, "2") + val workers = (0 until 3).map(id => makeWorker(conf, pid = id, local = true)) + workers.zipWithIndex.foreach{case (w, i) => w.rpcEnv.setupEndpoint(s"worker$i", w)} + eventually(timeout(20.seconds)) { + val (empty, nonEmpty) = workers.partition(_.resources.isEmpty) + assert(empty.length === 1) + assert(nonEmpty.length === 2) + val totalResources = nonEmpty.flatMap(_.resources(FPGA).addresses).toSet.toSeq.sorted + assert(totalResources === Seq("f1", "f2", "f3", "f4")) + workers.foreach(_.rpcEnv.shutdown()) + workers.foreach(_.rpcEnv.awaitTermination()) + } + assertResourcesFileDeleted() + } + } + + test("Workers run on the same host should load resources naively when coordinate is off") { + val conf = new SparkConf() + // disable coordination + conf.set(config.SPARK_RESOURCES_COORDINATE, false) + withTempDir { dir => + val gpuArgs = ResourceAllocation(WORKER_GPU_ID, Seq("g0", "g1")) + val ja = Extraction.decompose(Seq(gpuArgs)) + val resourcesPath = createTempJsonFile(dir, "resources", ja) + val scriptPath = createTempScriptWithExpectedOutput(dir, "fpgaDiscoverScript", + """{"name": "fpga","addresses":["f1", "f2", "f3", "f4", "f5"]}""") + conf.set(SPARK_WORKER_RESOURCE_FILE.key, resourcesPath) + conf.set(WORKER_GPU_ID.amountConf, "2") + conf.set(WORKER_FPGA_ID.discoveryScriptConf, scriptPath) + conf.set(WORKER_FPGA_ID.amountConf, "2") + val workers = (0 until 3).map(id => makeWorker(conf, pid = id, local = true)) + workers.zipWithIndex.foreach{case (w, i) => w.rpcEnv.setupEndpoint(s"worker$i", w)} + eventually(timeout(20.seconds)) { + val (empty, nonEmpty) = workers.partition(_.resources.isEmpty) + assert(empty.length === 0) + assert(nonEmpty.length === 3) + // Each Worker should get the same resources from resources file and discovery script + // without coordination. Note that, normally, we must config different resources + // for workers run on the same host when coordinate config is off. Test here is used + // to validate the different behaviour comparing to the above test when coordinate config + // is on, so we admit the resources collision here. + nonEmpty.foreach { worker => + assert(worker.resources === Map(GPU -> gpuArgs.toResourceInformation, + FPGA -> new ResourceInformation(FPGA, Array("f1", "f2", "f3", "f4", "f5")))) + } + workers.foreach(_.rpcEnv.shutdown()) + workers.foreach(_.rpcEnv.awaitTermination()) + } + assertResourcesFileDeleted() + } + } + test("cleanup non-shuffle files after executor exits when config " + "spark.storage.cleanupFilesAfterExecutorExit=true") { testCleanupFilesWithConfig(true) diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala index 693b0ee..64d99a5 100644 --- a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala @@ -157,7 +157,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val parsedResources = backend.parseOrFindResources(Some(f1)) }.getMessage() - assert(error.contains("User is expecting to use resource: gpu but didn't specify a " + + assert(error.contains("User is expecting to use resource: gpu, but didn't specify a " + "discovery script!")) } } diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala index 51a92e0..c2ecc96 100644 --- a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala @@ -253,7 +253,7 @@ class ResourceUtilsSuite extends SparkFunSuite discoverResource(request) }.getMessage() - assert(error.contains("User is expecting to use resource: gpu but " + + assert(error.contains("User is expecting to use resource: gpu, but " + "didn't specify a discovery script!")) } } diff --git a/core/src/test/scala/org/apache/spark/resource/TestResourceIDs.scala b/core/src/test/scala/org/apache/spark/resource/TestResourceIDs.scala index 6d2c07d..c4509e9 100644 --- a/core/src/test/scala/org/apache/spark/resource/TestResourceIDs.scala +++ b/core/src/test/scala/org/apache/spark/resource/TestResourceIDs.scala @@ -18,14 +18,18 @@ package org.apache.spark.resource import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX, SPARK_EXECUTOR_PREFIX, SPARK_TASK_PREFIX} +import org.apache.spark.internal.config.Worker.SPARK_WORKER_PREFIX import org.apache.spark.resource.ResourceUtils.{FPGA, GPU} object TestResourceIDs { val DRIVER_GPU_ID = ResourceID(SPARK_DRIVER_PREFIX, GPU) val EXECUTOR_GPU_ID = ResourceID(SPARK_EXECUTOR_PREFIX, GPU) val TASK_GPU_ID = ResourceID(SPARK_TASK_PREFIX, GPU) + val WORKER_GPU_ID = ResourceID(SPARK_WORKER_PREFIX, GPU) val DRIVER_FPGA_ID = ResourceID(SPARK_DRIVER_PREFIX, FPGA) val EXECUTOR_FPGA_ID = ResourceID(SPARK_EXECUTOR_PREFIX, FPGA) val TASK_FPGA_ID = ResourceID(SPARK_TASK_PREFIX, FPGA) + val WORKER_FPGA_ID = ResourceID(SPARK_WORKER_PREFIX, FPGA) + } diff --git a/docs/configuration.md b/docs/configuration.md index 57a5321..8454547 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -194,6 +194,25 @@ of the most common options to set are: </td> </tr> <tr> + <td><code>spark.resources.coordinate.enable</code></td> + <td>true</td> + <td> + Whether to coordinate resources automatically among workers/drivers(client only) + in Standalone. If false, the user is responsible for configuring different resources + for workers/drivers that run on the same host. + </td> +</tr> +<tr> + <td><code>spark.resources.dir</code></td> + <td>SPARK_HOME</td> + <td> + Directory used to coordinate resources among workers/drivers(client only) in Standalone. + Default is SPARK_HOME. Make sure to use the same directory for worker and drivers in + client mode that run on the same host. Don't clean up this directory while workers/drivers + are still alive to avoid the most likely resources conflict. + </td> +</tr> +<tr> <td><code>spark.driver.resource.{resourceName}.amount</code></td> <td>0</td> <td> @@ -209,7 +228,9 @@ of the most common options to set are: <td> A script for the driver to run to discover a particular resource type. This should write to STDOUT a JSON string in the format of the ResourceInformation class. This has a - name and an array of addresses. + name and an array of addresses. For a client-submitted driver in Standalone, discovery + script must assign different resource addresses to this driver comparing to workers' and + other dirvers' when <code>spark.resources.coordinate.enable</code> is off. </td> </tr> <tr> diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 2ca3ee6..bc77469 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -243,6 +243,37 @@ SPARK_MASTER_OPTS supports the following system properties: receives no heartbeats. </td> </tr> +<tr> + <td><code>spark.worker.resource.{resourceName}.amount</code></td> + <td>(none)</td> + <td> + Amount of a particular resource to use on the worker. + </td> +</tr> +<tr> + <td><code>spark.worker.resource.{resourceName}.discoveryScript</code></td> + <td>(none)</td> + <td> + Path to resource discovery script, which is used to find a particular resource while worker starting up. + And the output of the script should be formatted like the <code>ResourceInformation</code> class. + When <code>spark.resources.coordinate.enable</code> is off, the discovery script must assign different + resources for workers and drivers in client mode that run on the same host to avoid resource conflict. + </td> +</tr> +<tr> + <td><code>spark.worker.resourcesFile</code></td> + <td>(none)</td> + <td> + Path to resources file which is used to find various resources while worker starting up. + The content of resources file should be formatted like <code> + [[{"id":{"componentName": "spark.worker","resourceName":"gpu"},"addresses":["0","1","2"]}]]</code>. + When <code>spark.resources.coordinate.enable</code> is off, resources file must assign different + resources for workers and drivers in client mode that run on the same host to avoid resource conflict. + If a particular resource is not found in the resources file, the discovery script would be used to + find that resource. If the discovery script also does not find the resources, the worker will fail + to start up. + </td> +</tr> </table> SPARK_WORKER_OPTS supports the following system properties: diff --git a/python/pyspark/tests/test_context.py b/python/pyspark/tests/test_context.py index bcd5d06..3f3150b 100644 --- a/python/pyspark/tests/test_context.py +++ b/python/pyspark/tests/test_context.py @@ -273,7 +273,8 @@ class ContextTestsWithResources(unittest.TestCase): self.tempFile.close() os.chmod(self.tempFile.name, stat.S_IRWXU | stat.S_IXGRP | stat.S_IRGRP | stat.S_IROTH | stat.S_IXOTH) - conf = SparkConf().set("spark.driver.resource.gpu.amount", "1") + conf = SparkConf().set("spark.test.home", SPARK_HOME) + conf = conf.set("spark.driver.resource.gpu.amount", "1") conf = conf.set("spark.driver.resource.gpu.discoveryScript", self.tempFile.name) self.sc = SparkContext('local-cluster[2,1,1024]', class_name, conf=conf) diff --git a/python/pyspark/tests/test_taskcontext.py b/python/pyspark/tests/test_taskcontext.py index 66357b6..66c5f9f 100644 --- a/python/pyspark/tests/test_taskcontext.py +++ b/python/pyspark/tests/test_taskcontext.py @@ -23,7 +23,7 @@ import time import unittest from pyspark import SparkConf, SparkContext, TaskContext, BarrierTaskContext -from pyspark.testing.utils import PySparkTestCase +from pyspark.testing.utils import PySparkTestCase, SPARK_HOME class TaskContextTests(PySparkTestCase): @@ -194,9 +194,11 @@ class TaskContextTestsWithResources(unittest.TestCase): self.tempFile.close() os.chmod(self.tempFile.name, stat.S_IRWXU | stat.S_IXGRP | stat.S_IRGRP | stat.S_IROTH | stat.S_IXOTH) - conf = SparkConf().set("spark.task.resource.gpu.amount", "1") + conf = SparkConf().set("spark.test.home", SPARK_HOME) + conf = conf.set("spark.worker.resource.gpu.discoveryScript", self.tempFile.name) + conf = conf.set("spark.worker.resource.gpu.amount", 1) + conf = conf.set("spark.task.resource.gpu.amount", "1") conf = conf.set("spark.executor.resource.gpu.amount", "1") - conf = conf.set("spark.executor.resource.gpu.discoveryScript", self.tempFile.name) self.sc = SparkContext('local-cluster[2,1,1024]', class_name, conf=conf) def test_resources(self): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org