This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cb18d9  [SPARK-29151][CORE] Support fractional resources for task 
resource scheduling
3cb18d9 is described below

commit 3cb18d90c441bbaa64c693e276793b670213e599
Author: Alessandro Bellina <abell...@nvidia.com>
AuthorDate: Tue Nov 5 08:57:43 2019 -0600

    [SPARK-29151][CORE] Support fractional resources for task resource 
scheduling
    
    ### What changes were proposed in this pull request?
    This PR adds the ability for tasks to request fractional resources, in 
order to be able to execute more than 1 task per resource. For example, if you 
have 1 GPU in the executor, and the task configuration is 0.5 GPU/task, the 
executor can schedule two tasks to run on that 1 GPU.
    
    ### Why are the changes needed?
    Currently there is no good way to share a resource such that multiple tasks 
can run on a single unit. This allows multiple tasks to share an executor 
resource.
    
    ### Does this PR introduce any user-facing change?
    Yes: There is a configuration change where `spark.task.resource.[resource 
type].amount` can now be fractional.
    
    ### How was this patch tested?
    Unit tests and manually on standalone mode, and yarn.
    
    Closes #26078 from abellina/SPARK-29151.
    
    Authored-by: Alessandro Bellina <abell...@nvidia.com>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../main/scala/org/apache/spark/SparkContext.scala | 21 ++++++--
 .../apache/spark/deploy/master/WorkerInfo.scala    |  1 +
 .../apache/spark/resource/ResourceAllocator.scala  | 39 +++++++++++----
 .../org/apache/spark/resource/ResourceUtils.scala  | 58 ++++++++++++++++++++--
 .../spark/scheduler/ExecutorResourceInfo.scala     |  7 ++-
 .../cluster/CoarseGrainedSchedulerBackend.scala    | 15 +++++-
 .../org/apache/spark/HeartbeatReceiverSuite.scala  |  1 +
 .../scala/org/apache/spark/SparkConfSuite.scala    | 51 +++++++++++++++++++
 .../scala/org/apache/spark/SparkContextSuite.scala |  3 +-
 .../deploy/StandaloneDynamicAllocationSuite.scala  |  1 +
 .../CoarseGrainedSchedulerBackendSuite.scala       |  1 +
 .../scheduler/ExecutorResourceInfoSuite.scala      | 34 +++++++++++--
 docs/configuration.md                              | 12 +++--
 13 files changed, 214 insertions(+), 30 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index cad88ad..3cea2ef 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2799,7 +2799,10 @@ object SparkContext extends Logging {
             s" = ${taskReq.amount}")
         }
         // Compare and update the max slots each executor can provide.
-        val resourceNumSlots = execAmount / taskReq.amount
+        // If the configured amount per task was < 1.0, a task is subdividing
+        // executor resources. If the amount per task was > 1.0, the task wants
+        // multiple executor resources.
+        val resourceNumSlots = Math.floor(execAmount * taskReq.numParts / 
taskReq.amount).toInt
         if (resourceNumSlots < numSlots) {
           numSlots = resourceNumSlots
           limitingResourceName = taskReq.resourceName
@@ -2809,11 +2812,19 @@ object SparkContext extends Logging {
       // large enough if any task resources were specified.
       taskResourceRequirements.foreach { taskReq =>
         val execAmount = executorResourcesAndAmounts(taskReq.resourceName)
-        if (taskReq.amount * numSlots < execAmount) {
+        if ((numSlots * taskReq.amount / taskReq.numParts) < execAmount) {
+          val taskReqStr = if (taskReq.numParts > 1) {
+            s"${taskReq.amount}/${taskReq.numParts}"
+          } else {
+            s"${taskReq.amount}"
+          }
+          val resourceNumSlots = Math.floor(execAmount * 
taskReq.numParts/taskReq.amount).toInt
           val message = s"The configuration of resource: 
${taskReq.resourceName} " +
-            s"(exec = ${execAmount}, task = ${taskReq.amount}) will result in 
wasted " +
-            s"resources due to resource ${limitingResourceName} limiting the 
number of " +
-            s"runnable tasks per executor to: ${numSlots}. Please adjust your 
configuration."
+            s"(exec = ${execAmount}, task = ${taskReqStr}, " +
+            s"runnable tasks = ${resourceNumSlots}) will " +
+            s"result in wasted resources due to resource 
${limitingResourceName} limiting the " +
+            s"number of runnable tasks per executor to: ${numSlots}. Please 
adjust " +
+            s"your configuration."
           if (Utils.isTesting) {
             throw new SparkException(message)
           } else {
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index 4845881..0137e2b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -28,6 +28,7 @@ private[spark] case class WorkerResourceInfo(name: String, 
addresses: Seq[String
 
   override protected def resourceName = this.name
   override protected def resourceAddresses = this.addresses
+  override protected def slotsPerAddress: Int = 1
 
   def acquire(amount: Int): ResourceInformation = {
     val allocated = availableAddrs.take(amount)
diff --git 
a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
index e64fadc..22272a0 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
@@ -30,27 +30,44 @@ trait ResourceAllocator {
 
   protected def resourceName: String
   protected def resourceAddresses: Seq[String]
+  protected def slotsPerAddress: Int
 
   /**
-   * Map from an address to its availability, the value `true` means the 
address is available,
-   * while value `false` means the address is assigned.
+   * Map from an address to its availability, a value > 0 means the address is 
available,
+   * while value of 0 means the address is fully assigned.
+   *
+   * For task resources ([[org.apache.spark.scheduler.ExecutorResourceInfo]]), 
this value
+   * can be a multiple, such that each address can be allocated up to 
[[slotsPerAddress]]
+   * times.
+   *
    * TODO Use [[OpenHashMap]] instead to gain better performance.
    */
-  private lazy val addressAvailabilityMap = 
mutable.HashMap(resourceAddresses.map(_ -> true): _*)
+  private lazy val addressAvailabilityMap = {
+    mutable.HashMap(resourceAddresses.map(_ -> slotsPerAddress): _*)
+  }
 
   /**
    * Sequence of currently available resource addresses.
+   *
+   * With [[slotsPerAddress]] greater than 1, [[availableAddrs]] can contain 
duplicate addresses
+   * e.g. with [[slotsPerAddress]] == 2, availableAddrs for addresses 0 and 1 
can look like
+   * Seq("0", "0", "1"), where address 0 has two assignments available, and 1 
has one.
    */
-  def availableAddrs: Seq[String] = addressAvailabilityMap.flatMap { case 
(addr, available) =>
-    if (available) Some(addr) else None
-  }.toSeq
+  def availableAddrs: Seq[String] = addressAvailabilityMap
+    .flatMap { case (addr, available) =>
+      (0 until available).map(_ => addr)
+    }.toSeq
 
   /**
    * Sequence of currently assigned resource addresses.
+   *
+   * With [[slotsPerAddress]] greater than 1, [[assignedAddrs]] can contain 
duplicate addresses
+   * e.g. with [[slotsPerAddress]] == 2, assignedAddrs for addresses 0 and 1 
can look like
+   * Seq("0", "1", "1"), where address 0 was assigned once, and 1 was assigned 
twice.
    */
   private[spark] def assignedAddrs: Seq[String] = addressAvailabilityMap
     .flatMap { case (addr, available) =>
-      if (!available) Some(addr) else None
+      (0 until slotsPerAddress - available).map(_ => addr)
     }.toSeq
 
   /**
@@ -65,8 +82,8 @@ trait ResourceAllocator {
           s"address $address doesn't exist.")
       }
       val isAvailable = addressAvailabilityMap(address)
-      if (isAvailable) {
-        addressAvailabilityMap(address) = false
+      if (isAvailable > 0) {
+        addressAvailabilityMap(address) = addressAvailabilityMap(address) - 1
       } else {
         throw new SparkException("Try to acquire an address that is not 
available. " +
           s"$resourceName address $address is not available.")
@@ -86,8 +103,8 @@ trait ResourceAllocator {
           s"address $address doesn't exist.")
       }
       val isAvailable = addressAvailabilityMap(address)
-      if (!isAvailable) {
-        addressAvailabilityMap(address) = true
+      if (isAvailable < slotsPerAddress) {
+        addressAvailabilityMap(address) = addressAvailabilityMap(address) + 1
       } else {
         throw new SparkException(s"Try to release an address that is not 
assigned. $resourceName " +
           s"address $address is not assigned.")
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
index 150ba09..e5ae7a9 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
@@ -27,6 +27,7 @@ import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.SPARK_TASK_PREFIX
 import org.apache.spark.util.Utils.executeAndGetOutput
 
 /**
@@ -41,13 +42,44 @@ private[spark] case class ResourceID(componentName: String, 
resourceName: String
   def vendorConf: String = s"$confPrefix${ResourceUtils.VENDOR}"
 }
 
+/**
+ * Case class that represents a resource request at the executor level.
+ *
+ * The class used when discovering resources (using the discovery script),
+ * or via the context as it is parsing configuration, for 
SPARK_EXECUTOR_PREFIX.
+ *
+ * @param id object identifying the resource
+ * @param amount integer amount for the resource. Note that for a request 
(executor level),
+ *               fractional resources does not make sense, so amount is an 
integer.
+ * @param discoveryScript optional discovery script file name
+ * @param vendor optional vendor name
+ */
 private[spark] case class ResourceRequest(
     id: ResourceID,
     amount: Int,
     discoveryScript: Option[String],
     vendor: Option[String])
 
-private[spark] case class ResourceRequirement(resourceName: String, amount: 
Int)
+/**
+ * Case class that represents resource requirements for a component in a
+ * an application (components are driver, executor or task).
+ *
+ * A configuration of spark.task.resource.[resourceName].amount = 4, equates 
to:
+ * amount = 4, and numParts = 1.
+ *
+ * A configuration of spark.task.resource.[resourceName].amount = 0.25, 
equates to:
+ * amount = 1, and numParts = 4.
+ *
+ * @param resourceName gpu, fpga, etc.
+ * @param amount whole units of the resource we expect (e.g. 1 gpus, 2 fpgas)
+ * @param numParts if not 1, the number of ways a whole resource is subdivided.
+ *                 This is always an integer greater than or equal to 1,
+ *                 where 1 is whole resource, 2 is divide a resource in two, 
and so on.
+ */
+private[spark] case class ResourceRequirement(
+    resourceName: String,
+    amount: Int,
+    numParts: Int = 1)
 
 /**
  * Case class representing allocated resource addresses for a specific 
resource.
@@ -94,8 +126,28 @@ private[spark] object ResourceUtils extends Logging {
 
   def parseResourceRequirements(sparkConf: SparkConf, componentName: String)
     : Seq[ResourceRequirement] = {
-    parseAllResourceRequests(sparkConf, componentName).map { request =>
-      ResourceRequirement(request.id.resourceName, request.amount)
+    listResourceIds(sparkConf, componentName).map { resourceId =>
+      val settings = sparkConf.getAllWithPrefix(resourceId.confPrefix).toMap
+      val amountDouble = settings.getOrElse(AMOUNT,
+        throw new SparkException(s"You must specify an amount for 
${resourceId.resourceName}")
+      ).toDouble
+      val (amount, parts) = if 
(componentName.equalsIgnoreCase(SPARK_TASK_PREFIX)) {
+        val parts = if (amountDouble <= 0.5) {
+          Math.floor(1.0 / amountDouble).toInt
+        } else if (amountDouble % 1 != 0) {
+          throw new SparkException(
+            s"The resource amount ${amountDouble} must be either <= 0.5, or a 
whole number.")
+        } else {
+          1
+        }
+        (Math.ceil(amountDouble).toInt, parts)
+      } else if (amountDouble % 1 != 0) {
+        throw new SparkException(
+          s"Only tasks support fractional resources, please check your 
$componentName settings")
+      } else {
+        (amountDouble.toInt, 1)
+      }
+      ResourceRequirement(resourceId.resourceName, amount, parts)
     }
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala
index 0204760..fd04db8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala
@@ -25,10 +25,15 @@ import org.apache.spark.resource.{ResourceAllocator, 
ResourceInformation}
  * information.
  * @param name Resource name
  * @param addresses Resource addresses provided by the executor
+ * @param numParts Number of ways each resource is subdivided when scheduling 
tasks
  */
-private[spark] class ExecutorResourceInfo(name: String, addresses: Seq[String])
+private[spark] class ExecutorResourceInfo(
+    name: String,
+    addresses: Seq[String],
+    numParts: Int)
   extends ResourceInformation(name, addresses.toArray) with ResourceAllocator {
 
   override protected def resourceName = this.name
   override protected def resourceAddresses = this.addresses
+  override protected def slotsPerAddress: Int = numParts
 }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 6e990d1..ea045e6 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -33,6 +33,7 @@ import org.apache.spark.executor.ExecutorLogUrlHandler
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Network._
+import org.apache.spark.resource.ResourceRequirement
 import org.apache.spark.rpc._
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -68,6 +69,13 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     conf.get(SCHEDULER_MAX_REGISTERED_RESOURCE_WAITING_TIME))
   private val createTimeNs = System.nanoTime()
 
+  private val taskResourceNumParts: Map[String, Int] =
+    if (scheduler.resourcesReqsPerTask != null) {
+      scheduler.resourcesReqsPerTask.map(req => req.resourceName -> 
req.numParts).toMap
+    } else {
+      Map.empty
+    }
+
   // Accessing `executorDataMap` in the inherited methods from 
ThreadSafeRpcEndpoint doesn't need
   // any protection. But accessing `executorDataMap` out of the inherited 
methods must be
   // protected by `CoarseGrainedSchedulerBackend.this`. Besides, 
`executorDataMap` should only
@@ -215,7 +223,12 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
           totalCoreCount.addAndGet(cores)
           totalRegisteredExecutors.addAndGet(1)
           val resourcesInfo = resources.map{ case (k, v) =>
-            (v.name, new ExecutorResourceInfo(v.name, v.addresses))}
+            (v.name,
+             new ExecutorResourceInfo(v.name, v.addresses,
+               // tell the executor it can schedule resources up to numParts 
times,
+               // as configured by the user, or set to 1 as that is the 
default (1 task/resource)
+               taskResourceNumParts.getOrElse(v.name, 1)))
+          }
           val data = new ExecutorData(executorRef, executorAddress, hostname,
             cores, cores, logUrlHandler.applyPattern(logUrls, attributes), 
attributes,
             resourcesInfo)
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala 
b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 2a25171..595fc73 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -74,6 +74,7 @@ class HeartbeatReceiverSuite
     scheduler = mock(classOf[TaskSchedulerImpl])
     when(sc.taskScheduler).thenReturn(scheduler)
     when(scheduler.nodeBlacklist).thenReturn(Predef.Set[String]())
+    when(scheduler.resourcesReqsPerTask).thenReturn(Seq.empty)
     when(scheduler.sc).thenReturn(sc)
     heartbeatReceiverClock = new ManualClock
     heartbeatReceiver = new HeartbeatReceiver(sc, heartbeatReceiverClock)
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 0ac6ba2..b91759c 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -456,6 +456,57 @@ class SparkConfSuite extends SparkFunSuite with 
LocalSparkContext with ResetSyst
     assert(taskResourceRequirement.size == 1)
     assert(taskResourceRequirement.get(FPGA).isEmpty)
   }
+
+  test("Ensure that we can configure fractional resources for a task") {
+    val ratioSlots = Seq(
+      (0.10, 10), (0.11, 9), (0.125, 8), (0.14, 7), (0.16, 6),
+      (0.20, 5), (0.25, 4), (0.33, 3), (0.5, 2), (1.0, 1),
+      // if the amount is fractional greater than 0.5 and less than 1.0 we 
throw
+      (0.51, 1), (0.9, 1),
+      // if the amount is greater than one is not whole, we throw
+      (1.5, 0), (2.5, 0),
+      // it's ok if the amount is whole, and greater than 1
+      // parts are 1 because we get a whole part of a resource
+      (2.0, 1), (3.0, 1), (4.0, 1))
+    ratioSlots.foreach {
+      case (ratio, slots) =>
+        val conf = new SparkConf()
+        conf.set(TASK_GPU_ID.amountConf, ratio.toString)
+        if (ratio > 0.5 && ratio % 1 != 0) {
+          assertThrows[SparkException] {
+            parseResourceRequirements(conf, SPARK_TASK_PREFIX)
+          }
+        } else {
+          val reqs = parseResourceRequirements(conf, SPARK_TASK_PREFIX)
+          assert(reqs.size == 1)
+          assert(reqs.head.amount == Math.ceil(ratio).toInt)
+          assert(reqs.head.numParts == slots)
+        }
+    }
+  }
+
+  test("Non-task resources are never fractional") {
+    val ratioSlots = Seq(
+      // if the amount provided is not a whole number, we throw
+      (0.25, 0), (0.5, 0), (1.5, 0),
+      // otherwise we are successful at parsing resources
+      (1.0, 1), (2.0, 2), (3.0, 3))
+    ratioSlots.foreach {
+      case (ratio, slots) =>
+        val conf = new SparkConf()
+        conf.set(EXECUTOR_GPU_ID.amountConf, ratio.toString)
+        if (ratio % 1 != 0) {
+          assertThrows[SparkException] {
+            parseResourceRequirements(conf, SPARK_EXECUTOR_PREFIX)
+          }
+        } else {
+          val reqs = parseResourceRequirements(conf, SPARK_EXECUTOR_PREFIX)
+          assert(reqs.size == 1)
+          assert(reqs.head.amount == slots)
+          assert(reqs.head.numParts == 1)
+        }
+    }
+  }
 }
 
 class Class1 {}
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 4fd8628..712ed9b 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -842,7 +842,8 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
       sc = new SparkContext(conf)
     }.getMessage()
 
-    assert(error.contains("The configuration of resource: gpu (exec = 4, task 
= 2) will result " +
+    assert(error.contains(
+      "The configuration of resource: gpu (exec = 4, task = 2, runnable tasks 
= 2) will result " +
       "in wasted resources due to resource CPU limiting the number of runnable 
tasks per " +
       "executor to: 1. Please adjust your configuration."))
   }
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 29c210f..1775878 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -506,6 +506,7 @@ class StandaloneDynamicAllocationSuite
     val taskScheduler = mock(classOf[TaskSchedulerImpl])
     when(taskScheduler.nodeBlacklist()).thenReturn(Set("blacklisted-host"))
     when(taskScheduler.resourceOffers(any())).thenReturn(Nil)
+    when(taskScheduler.resourcesReqsPerTask).thenReturn(Seq.empty)
     when(taskScheduler.sc).thenReturn(sc)
 
     val rpcEnv = RpcEnv.create("test-rpcenv", "localhost", 0, conf, 
securityManager)
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index 6152214..8a16ae6 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -282,6 +282,7 @@ private class CSMockExternalClusterManager extends 
ExternalClusterManager {
     when(ts.applicationAttemptId()).thenReturn(Some("attempt1"))
     when(ts.schedulingMode).thenReturn(SchedulingMode.FIFO)
     when(ts.nodeBlacklist()).thenReturn(Set.empty[String])
+    when(ts.resourcesReqsPerTask).thenReturn(Seq.empty)
     ts
   }
 
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala
index 0109d1f..388d4e2 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala
@@ -26,7 +26,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite {
 
   test("Track Executor Resource information") {
     // Init Executor Resource.
-    val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3"))
+    val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3"), 
1)
     assert(info.availableAddrs.sorted sameElements Seq("0", "1", "2", "3"))
     assert(info.assignedAddrs.isEmpty)
 
@@ -43,7 +43,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite {
 
   test("Don't allow acquire address that is not available") {
     // Init Executor Resource.
-    val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3"))
+    val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3"), 
1)
     // Acquire some addresses.
     info.acquire(Seq("0", "1"))
     assert(!info.availableAddrs.contains("1"))
@@ -56,7 +56,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite {
 
   test("Don't allow acquire address that doesn't exist") {
     // Init Executor Resource.
-    val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3"))
+    val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3"), 
1)
     assert(!info.availableAddrs.contains("4"))
     // Acquire an address that doesn't exist
     val e = intercept[SparkException] {
@@ -67,7 +67,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite {
 
   test("Don't allow release address that is not assigned") {
     // Init Executor Resource.
-    val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3"))
+    val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3"), 
1)
     // Acquire addresses
     info.acquire(Array("0", "1"))
     assert(!info.assignedAddrs.contains("2"))
@@ -80,7 +80,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite {
 
   test("Don't allow release address that doesn't exist") {
     // Init Executor Resource.
-    val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3"))
+    val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3"), 
1)
     assert(!info.assignedAddrs.contains("4"))
     // Release an address that doesn't exist
     val e = intercept[SparkException] {
@@ -88,4 +88,28 @@ class ExecutorResourceInfoSuite extends SparkFunSuite {
     }
     assert(e.getMessage.contains("Try to release an address that doesn't 
exist."))
   }
+
+  test("Ensure that we can acquire the same fractions of a resource from an 
executor") {
+    val slotSeq = Seq(10, 9, 8, 7, 6, 5, 4, 3, 2, 1)
+    val addresses = ArrayBuffer("0", "1", "2", "3")
+    slotSeq.foreach { slots =>
+      val info = new ExecutorResourceInfo(GPU, addresses, slots)
+      for (_ <- 0 until slots) {
+        addresses.foreach(addr => info.acquire(Seq(addr)))
+      }
+
+      // assert that each address was assigned `slots` times
+      info.assignedAddrs
+        .groupBy(identity)
+        .mapValues(_.size)
+        .foreach(x => assert(x._2 == slots))
+
+      addresses.foreach { addr =>
+        assertThrows[SparkException] {
+          info.acquire(Seq(addr))
+        }
+        assert(!info.availableAddrs.contains(addr))
+      }
+    }
+  }
 }
diff --git a/docs/configuration.md b/docs/configuration.md
index 7fdcf4a..97ea1fb 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1997,9 +1997,15 @@ Apart from these, the following properties are also 
available, and may be useful
   <td><code>spark.task.resource.{resourceName}.amount</code></td>
   <td>1</td>
   <td>
-    Amount of a particular resource type to allocate for each task. If this is 
specified
-    you must also provide the executor config 
<code>spark.executor.resource.{resourceName}.amount</code>
-    and any corresponding discovery configs so that your executors are created 
with that resource type.
+    Amount of a particular resource type to allocate for each task, note that 
this can be a double.
+    If this is specified you must also provide the executor config 
+    <code>spark.executor.resource.{resourceName}.amount</code> and any 
corresponding discovery configs 
+    so that your executors are created with that resource type. In addition to 
whole amounts, 
+    a fractional amount (for example, 0.25, which means 1/4th of a resource) 
may be specified. 
+    Fractional amounts must be less than or equal to 0.5, or in other words, 
the minimum amount of
+    resource sharing is 2 tasks per resource. Additionally, fractional amounts 
are floored 
+    in order to assign resource slots (e.g. a 0.2222 configuration, or 
1/0.2222 slots will become 
+    4 tasks/resource, not 5).
   </td>
 </tr>
 <tr>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to