tgravescs commented on code in PR #43494: URL: https://github.com/apache/spark/pull/43494#discussion_r1430385324
########## core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala: ########## @@ -20,6 +20,61 @@ package org.apache.spark.resource import scala.collection.mutable import org.apache.spark.SparkException +import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE + +private[spark] object ResourceAmountUtils { + /** + * Using "double" to do the resource calculation may encounter a problem of precision loss. Eg + * + * scala> val taskAmount = 1.0 / 9 + * taskAmount: Double = 0.1111111111111111 + * + * scala> var total = 1.0 + * total: Double = 1.0 + * + * scala> for (i <- 1 to 9 ) { + * | if (total >= taskAmount) { + * | total -= taskAmount + * | println(s"assign $taskAmount for task $i, total left: $total") + * | } else { + * | println(s"ERROR Can't assign $taskAmount for task $i, total left: $total") + * | } + * | } + * assign 0.1111111111111111 for task 1, total left: 0.8888888888888888 + * assign 0.1111111111111111 for task 2, total left: 0.7777777777777777 + * assign 0.1111111111111111 for task 3, total left: 0.6666666666666665 + * assign 0.1111111111111111 for task 4, total left: 0.5555555555555554 + * assign 0.1111111111111111 for task 5, total left: 0.44444444444444425 + * assign 0.1111111111111111 for task 6, total left: 0.33333333333333315 + * assign 0.1111111111111111 for task 7, total left: 0.22222222222222204 + * assign 0.1111111111111111 for task 8, total left: 0.11111111111111094 + * ERROR Can't assign 0.1111111111111111 for task 9, total left: 0.11111111111111094 + * + * So we multiply ONE_ENTIRE_RESOURCE to convert the double to long to avoid this limitation. + * Double can display up to 16 decimal places, so we set the factor to + * 10, 000, 000, 000, 000, 000L. + */ + final val ONE_ENTIRE_RESOURCE: Long = 10000000000000000L + + def isOneEntireResource(amount: Long): Boolean = amount == ONE_ENTIRE_RESOURCE + + def toInternalResource(amount: Double): Long = (amount * ONE_ENTIRE_RESOURCE).toLong + + private[spark] def toInternalResource(resources: Map[String, Double]): Map[String, Long] = { + resources.map { case (k, v) => k -> toInternalResource(v) } + } + + def toFractionalResource(amount: Long): Double = amount.toDouble / ONE_ENTIRE_RESOURCE + + private[spark] def toFractionalResource(resources: Map[String, Long]): Map[String, Double] = { + resources.map { case (k, v) => k -> toFractionalResource(v) } + } + + private[spark] def toInternalResourceMapMap(resources: Map[String, Map[String, Double]]): Review Comment: this seems to only be used for testing? If so can we just move to utility function in that suite since these are private to spark. ########## core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala: ########## @@ -192,7 +181,9 @@ private[spark] class CoarseGrainedExecutorBackend( } else { val taskDesc = TaskDescription.decode(data.value) logInfo("Got assigned task " + taskDesc.taskId) - taskResources.put(taskDesc.taskId, taskDesc.resources) + // Convert resources amounts into ResourceInformation + val resources = taskDesc.resources.map { case (rName, addressesAmounts) => Review Comment: this is not used, remove it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org