[GitHub] [spark] Ngone51 commented on a change in pull request #24856: [SPARK-27823] [CORE] Refactor resource handling code

2019-06-14 Thread GitBox
Ngone51 commented on a change in pull request #24856: [SPARK-27823] [CORE] 
Refactor resource handling code
URL: https://github.com/apache/spark/pull/24856#discussion_r293850281
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/SparkContext.scala
 ##
 @@ -2723,44 +2689,50 @@ object SparkContext extends Logging {
 
   // Calculate the max slots each executor can provide based on resources 
available on each
   // executor and resources required by each task.
-  val taskResourcesAndCount = sc.conf.getTaskResourceRequirements()
-  val executorResourcesAndCounts = sc.conf.getAllWithPrefixAndSuffix(
-SPARK_EXECUTOR_RESOURCE_PREFIX, SPARK_RESOURCE_AMOUNT_SUFFIX).toMap
+  val taskResourceRequirements = parseTaskResourceRequirements(sc.conf)
+  val executorResourcesAndCounts =
+parseAllResourceRequests(sc.conf, SPARK_EXECUTOR_PREFIX)
+  .map(request => (request.id.resourceName, request.amount)).toMap
   var numSlots = execCores / taskCores
   var limitingResourceName = "CPU"
-  taskResourcesAndCount.foreach { case (rName, taskCount) =>
+
+  taskResourceRequirements.foreach { taskReq =>
 // Make sure the executor resources were specified through config.
-val execCount = executorResourcesAndCounts.getOrElse(rName,
-  throw new SparkException(
-s"The executor resource config: " +
-  s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_AMOUNT_SUFFIX} " +
-  "needs to be specified since a task requirement config: " +
-  s"${SPARK_TASK_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_AMOUNT_SUFFIX} was specified")
+val execCount = 
executorResourcesAndCounts.getOrElse(taskReq.resourceName,
+  throw new SparkException("The executor resource config: " +
+ResourceID(SPARK_EXECUTOR_PREFIX, taskReq.resourceName).amountConf 
+
+" needs to be specified since a task requirement config: " +
+ResourceID(SPARK_TASK_PREFIX, taskReq.resourceName).amountConf +
+" was specified")
 )
 // Make sure the executor resources are large enough to launch at 
least one task.
-if (execCount.toLong < taskCount.toLong) {
-  throw new SparkException(
-s"The executor resource config: " +
-  s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_AMOUNT_SUFFIX} " +
-  s"= $execCount has to be >= the task config: " +
-  s"${SPARK_TASK_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_AMOUNT_SUFFIX} = $taskCount")
+if (execCount < taskReq.amount) {
+  throw new SparkException("The executor resource config: " +
+ResourceID(SPARK_EXECUTOR_PREFIX, taskReq.resourceName).amountConf 
+
+s" = $execCount has to be >= the task config: " +
+ResourceID(SPARK_TASK_PREFIX, taskReq.resourceName).amountConf +
+s" = ${taskReq.amount}")
 }
 // Compare and update the max slots each executor can provide.
-val resourceNumSlots = execCount.toInt / taskCount
+val resourceNumSlots = execCount / taskReq.amount
 if (resourceNumSlots < numSlots) {
+  logWarning(s"The configuration of resource: ${taskReq.resourceName} 
" +
+s"(limits tasks to $resourceNumSlots) will result in wasted 
resources of resource " +
+s"${limitingResourceName} (would allow for $numSlots tasks). " +
+"Please adjust your configuration.")
 
 Review comment:
   I know it's previous logic, but isn't this warning a little redundant 
comparing to the following warning below ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #24856: [SPARK-27823] [CORE] Refactor resource handling code

2019-06-14 Thread GitBox
Ngone51 commented on a change in pull request #24856: [SPARK-27823] [CORE] 
Refactor resource handling code
URL: https://github.com/apache/spark/pull/24856#discussion_r293867298
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala
 ##
 @@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.resource
+
+import java.io.File
+import java.nio.file.{Files => JavaFiles}
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkException, 
SparkFunSuite}
+import org.apache.spark.TestUtils._
+import org.apache.spark.internal.config._
+import org.apache.spark.resource.ResourceUtils._
+import org.apache.spark.resource.TestResourceIDs._
+import org.apache.spark.util.Utils
+
+class ResourceUtilsSuite extends SparkFunSuite
 
 Review comment:
   case for the combination of resources file and discovery script ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #24856: [SPARK-27823] [CORE] Refactor resource handling code

2019-06-14 Thread GitBox
Ngone51 commented on a change in pull request #24856: [SPARK-27823] [CORE] 
Refactor resource handling code
URL: https://github.com/apache/spark/pull/24856#discussion_r293853139
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/SparkContext.scala
 ##
 @@ -2723,44 +2689,50 @@ object SparkContext extends Logging {
 
   // Calculate the max slots each executor can provide based on resources 
available on each
   // executor and resources required by each task.
-  val taskResourcesAndCount = sc.conf.getTaskResourceRequirements()
-  val executorResourcesAndCounts = sc.conf.getAllWithPrefixAndSuffix(
-SPARK_EXECUTOR_RESOURCE_PREFIX, SPARK_RESOURCE_AMOUNT_SUFFIX).toMap
+  val taskResourceRequirements = parseTaskResourceRequirements(sc.conf)
+  val executorResourcesAndCounts =
+parseAllResourceRequests(sc.conf, SPARK_EXECUTOR_PREFIX)
+  .map(request => (request.id.resourceName, request.amount)).toMap
   var numSlots = execCores / taskCores
   var limitingResourceName = "CPU"
-  taskResourcesAndCount.foreach { case (rName, taskCount) =>
+
+  taskResourceRequirements.foreach { taskReq =>
 // Make sure the executor resources were specified through config.
-val execCount = executorResourcesAndCounts.getOrElse(rName,
-  throw new SparkException(
-s"The executor resource config: " +
-  s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_AMOUNT_SUFFIX} " +
-  "needs to be specified since a task requirement config: " +
-  s"${SPARK_TASK_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_AMOUNT_SUFFIX} was specified")
+val execCount = 
executorResourcesAndCounts.getOrElse(taskReq.resourceName,
+  throw new SparkException("The executor resource config: " +
+ResourceID(SPARK_EXECUTOR_PREFIX, taskReq.resourceName).amountConf 
+
+" needs to be specified since a task requirement config: " +
+ResourceID(SPARK_TASK_PREFIX, taskReq.resourceName).amountConf +
+" was specified")
 )
 // Make sure the executor resources are large enough to launch at 
least one task.
-if (execCount.toLong < taskCount.toLong) {
-  throw new SparkException(
-s"The executor resource config: " +
-  s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_AMOUNT_SUFFIX} " +
-  s"= $execCount has to be >= the task config: " +
-  s"${SPARK_TASK_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_AMOUNT_SUFFIX} = $taskCount")
+if (execCount < taskReq.amount) {
+  throw new SparkException("The executor resource config: " +
+ResourceID(SPARK_EXECUTOR_PREFIX, taskReq.resourceName).amountConf 
+
+s" = $execCount has to be >= the task config: " +
+ResourceID(SPARK_TASK_PREFIX, taskReq.resourceName).amountConf +
+s" = ${taskReq.amount}")
 }
 // Compare and update the max slots each executor can provide.
-val resourceNumSlots = execCount.toInt / taskCount
+val resourceNumSlots = execCount / taskReq.amount
 if (resourceNumSlots < numSlots) {
+  logWarning(s"The configuration of resource: ${taskReq.resourceName} 
" +
+s"(limits tasks to $resourceNumSlots) will result in wasted 
resources of resource " +
+s"${limitingResourceName} (would allow for $numSlots tasks). " +
+"Please adjust your configuration.")
   numSlots = resourceNumSlots
-  limitingResourceName = rName
+  limitingResourceName = taskReq.resourceName
 }
   }
   // There have been checks above to make sure the executor resources were 
specified and are
   // large enough if any task resources were specified.
-  taskResourcesAndCount.foreach { case (rName, taskCount) =>
-val execCount = executorResourcesAndCounts(rName)
-if (taskCount.toInt * numSlots < execCount.toInt) {
-  val message = s"The configuration of resource: $rName (exec = 
${execCount.toInt}, " +
-s"task = ${taskCount}) will result in wasted resources due to 
resource " +
-s"${limitingResourceName} limiting the number of runnable tasks 
per executor to: " +
-s"${numSlots}. Please adjust your configuration."
+  taskResourceRequirements.foreach { taskReq =>
+val execCount = executorResourcesAndCounts(taskReq.resourceName)
+if (taskReq.amount * numSlots < execCount) {
+  val message = s"The configuration of resource: 
${taskReq.resourceName} " +
+s"(exec = ${execCount}, task = ${taskReq.amount}) will result in 
wasted " +
+s"resources due to resource ${limitingResourceName} limiting the 
number of " +
+s"runnable tasks per executor to: ${numSlots}. Please adjust your 
configuration."
 
 Review comment:
   I'd prefer to log warn the limited `

[GitHub] [spark] Ngone51 commented on a change in pull request #24856: [SPARK-27823] [CORE] Refactor resource handling code

2019-06-14 Thread GitBox
Ngone51 commented on a change in pull request #24856: [SPARK-27823] [CORE] 
Refactor resource handling code
URL: https://github.com/apache/spark/pull/24856#discussion_r293865557
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/resource/ResourceInformation.scala
 ##
 @@ -35,3 +41,25 @@ class ResourceInformation(
 
   override def toString: String = s"[name: ${name}, addresses: 
${addresses.mkString(",")}]"
 }
+
+private[spark] object ResourceInformation {
+  /**
+   * Parses a JSON string into a [[ResourceInformation]] instance.
+   */
+  def parseJson(json: String): ResourceInformation = {
+implicit val formats = DefaultFormats
+try {
+  parse(json).extract[ResourceInformationJson].toResourceInformation
+} catch {
+  case NonFatal(e) =>
+throw new SparkException(s"Error parsing JSON into 
ResourceInformation:\n$json\n", e)
 
 Review comment:
   Maybe, given tips of what is right JSON format for user ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #24856: [SPARK-27823] [CORE] Refactor resource handling code

2019-06-14 Thread GitBox
Ngone51 commented on a change in pull request #24856: [SPARK-27823] [CORE] 
Refactor resource handling code
URL: https://github.com/apache/spark/pull/24856#discussion_r293843677
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/SparkContext.scala
 ##
 @@ -2723,44 +2689,50 @@ object SparkContext extends Logging {
 
   // Calculate the max slots each executor can provide based on resources 
available on each
   // executor and resources required by each task.
-  val taskResourcesAndCount = sc.conf.getTaskResourceRequirements()
-  val executorResourcesAndCounts = sc.conf.getAllWithPrefixAndSuffix(
-SPARK_EXECUTOR_RESOURCE_PREFIX, SPARK_RESOURCE_AMOUNT_SUFFIX).toMap
+  val taskResourceRequirements = parseTaskResourceRequirements(sc.conf)
+  val executorResourcesAndCounts =
+parseAllResourceRequests(sc.conf, SPARK_EXECUTOR_PREFIX)
+  .map(request => (request.id.resourceName, request.amount)).toMap
   var numSlots = execCores / taskCores
   var limitingResourceName = "CPU"
-  taskResourcesAndCount.foreach { case (rName, taskCount) =>
+
+  taskResourceRequirements.foreach { taskReq =>
 // Make sure the executor resources were specified through config.
-val execCount = executorResourcesAndCounts.getOrElse(rName,
-  throw new SparkException(
-s"The executor resource config: " +
-  s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_AMOUNT_SUFFIX} " +
-  "needs to be specified since a task requirement config: " +
-  s"${SPARK_TASK_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_AMOUNT_SUFFIX} was specified")
+val execCount = 
executorResourcesAndCounts.getOrElse(taskReq.resourceName,
+  throw new SparkException("The executor resource config: " +
+ResourceID(SPARK_EXECUTOR_PREFIX, taskReq.resourceName).amountConf 
+
+" needs to be specified since a task requirement config: " +
+ResourceID(SPARK_TASK_PREFIX, taskReq.resourceName).amountConf +
+" was specified")
 )
 // Make sure the executor resources are large enough to launch at 
least one task.
-if (execCount.toLong < taskCount.toLong) {
-  throw new SparkException(
-s"The executor resource config: " +
-  s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_AMOUNT_SUFFIX} " +
-  s"= $execCount has to be >= the task config: " +
-  s"${SPARK_TASK_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_AMOUNT_SUFFIX} = $taskCount")
+if (execCount < taskReq.amount) {
+  throw new SparkException("The executor resource config: " +
+ResourceID(SPARK_EXECUTOR_PREFIX, taskReq.resourceName).amountConf 
+
+s" = $execCount has to be >= the task config: " +
 
 Review comment:
   Just for symmetry: executor resource config <-> task resource config


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #24856: [SPARK-27823] [CORE] Refactor resource handling code

2019-06-14 Thread GitBox
Ngone51 commented on a change in pull request #24856: [SPARK-27823] [CORE] 
Refactor resource handling code
URL: https://github.com/apache/spark/pull/24856#discussion_r293834805
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.resource
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+
+import scala.util.control.NonFatal
+
+import org.json4s.DefaultFormats
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.util.Utils.executeAndGetOutput
+
+/**
+ * Resource identifier.
+ * @param componentName spark.driver / spark.executor / spark.task
+ * @param resourceName  gpu, fpga, etc
+ */
+private[spark] case class ResourceID(componentName: String, resourceName: 
String) {
+  def confPrefix: String = s"$componentName.resource.$resourceName." // with 
ending dot
+  def amountConf: String = s"$confPrefix${ResourceUtils.AMOUNT}"
+  def discoveryScriptConf: String = 
s"$confPrefix${ResourceUtils.DISCOVERY_SCRIPT}"
+  def vendorConf: String = s"$confPrefix${ResourceUtils.VENDOR}"
+}
+
+private[spark] case class ResourceRequest(
+id: ResourceID,
+amount: Int,
+discoveryScript: Option[String],
+vendor: Option[String])
+
+private[spark] case class TaskResourceRequirement(resourceName: String, 
amount: Int)
+
+/**
+ * Case class representing allocated resource addresses for a specific 
resource.
+ * Cluster manager uses the JSON serialization of this case class to pass 
allocated resource info to
+ * driver and executors. See the ``--resourcesFile`` option there.
+ */
+private[spark] case class ResourceAllocation(id: ResourceID, addresses: 
Seq[String]) {
+  def toResourceInformation: ResourceInformation = {
+new ResourceInformation(id.resourceName, addresses.toArray)
+  }
+}
+
+private[spark] object ResourceUtils extends Logging {
+
+  // config suffixes
+  val DISCOVERY_SCRIPT = "discoveryScript"
+  val VENDOR = "vendor"
+  // user facing configs use .amount to allow to extend in the future,
+  // internally we currently only support addresses, so its just an integer 
count
+  val AMOUNT = "amount"
+
+  def parseResourceRequest(sparkConf: SparkConf, resourceId: ResourceID): 
ResourceRequest = {
+val settings = sparkConf.getAllWithPrefix(resourceId.confPrefix).toMap
+val amount = settings.getOrElse(AMOUNT,
+  throw new SparkException(s"You must specify an amount for 
${resourceId.resourceName}")
+).toInt
+val discoveryScript = settings.get(DISCOVERY_SCRIPT)
+val vendor = settings.get(VENDOR)
+ResourceRequest(resourceId, amount, discoveryScript, vendor)
+  }
+
+  def listResourceIds(sparkConf: SparkConf, componentName: String): 
Seq[ResourceID] = {
+sparkConf.getAllWithPrefix(s"$componentName.resource.").map { case (key, 
_) =>
+  key.substring(0, key.indexOf('.'))
+}.toSet.toSeq.map(name => ResourceID(componentName, name))
+  }
+
+  def parseAllResourceRequests(
+  sparkConf: SparkConf,
+  componentName: String): Seq[ResourceRequest] = {
+listResourceIds(sparkConf, componentName).map { id =>
+  parseResourceRequest(sparkConf, id)
+}
+  }
+
+  def parseTaskResourceRequirements(sparkConf: SparkConf): 
Seq[TaskResourceRequirement] = {
+parseAllResourceRequests(sparkConf, SPARK_TASK_PREFIX).map { request =>
+  TaskResourceRequirement(request.id.resourceName, request.amount)
+}
+  }
+
+  private def parseAllocatedFromJsonFile(resourcesFile: String): 
Seq[ResourceAllocation] = {
+implicit val formats = DefaultFormats
+val json = new String(Files.readAllBytes(Paths.get(resourcesFile)))
+try {
+  parse(json).extract[Seq[ResourceAllocation]]
+} catch {
+  case NonFatal(e) =>
+throw new SparkException(s"Error parsing resources file 
$resourcesFile", e)
+}
+  }
+
+  private def parseAllocatedOrDiscoverResources(
+  sparkConf: SparkConf,
+  componentName: String,
+  resourcesFileOpt: Option[String]): Seq[ResourceAllocation] = {
+  

[GitHub] [spark] Ngone51 commented on a change in pull request #24856: [SPARK-27823] [CORE] Refactor resource handling code

2019-06-14 Thread GitBox
Ngone51 commented on a change in pull request #24856: [SPARK-27823] [CORE] 
Refactor resource handling code
URL: https://github.com/apache/spark/pull/24856#discussion_r293835085
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.resource
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+
+import scala.util.control.NonFatal
+
+import org.json4s.DefaultFormats
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.util.Utils.executeAndGetOutput
+
+/**
+ * Resource identifier.
+ * @param componentName spark.driver / spark.executor / spark.task
+ * @param resourceName  gpu, fpga, etc
+ */
+private[spark] case class ResourceID(componentName: String, resourceName: 
String) {
+  def confPrefix: String = s"$componentName.resource.$resourceName." // with 
ending dot
+  def amountConf: String = s"$confPrefix${ResourceUtils.AMOUNT}"
+  def discoveryScriptConf: String = 
s"$confPrefix${ResourceUtils.DISCOVERY_SCRIPT}"
+  def vendorConf: String = s"$confPrefix${ResourceUtils.VENDOR}"
+}
+
+private[spark] case class ResourceRequest(
+id: ResourceID,
+amount: Int,
+discoveryScript: Option[String],
+vendor: Option[String])
+
+private[spark] case class TaskResourceRequirement(resourceName: String, 
amount: Int)
+
+/**
+ * Case class representing allocated resource addresses for a specific 
resource.
+ * Cluster manager uses the JSON serialization of this case class to pass 
allocated resource info to
+ * driver and executors. See the ``--resourcesFile`` option there.
+ */
+private[spark] case class ResourceAllocation(id: ResourceID, addresses: 
Seq[String]) {
+  def toResourceInformation: ResourceInformation = {
+new ResourceInformation(id.resourceName, addresses.toArray)
+  }
+}
+
+private[spark] object ResourceUtils extends Logging {
+
+  // config suffixes
+  val DISCOVERY_SCRIPT = "discoveryScript"
+  val VENDOR = "vendor"
+  // user facing configs use .amount to allow to extend in the future,
+  // internally we currently only support addresses, so its just an integer 
count
+  val AMOUNT = "amount"
+
+  def parseResourceRequest(sparkConf: SparkConf, resourceId: ResourceID): 
ResourceRequest = {
+val settings = sparkConf.getAllWithPrefix(resourceId.confPrefix).toMap
+val amount = settings.getOrElse(AMOUNT,
+  throw new SparkException(s"You must specify an amount for 
${resourceId.resourceName}")
+).toInt
+val discoveryScript = settings.get(DISCOVERY_SCRIPT)
+val vendor = settings.get(VENDOR)
+ResourceRequest(resourceId, amount, discoveryScript, vendor)
+  }
+
+  def listResourceIds(sparkConf: SparkConf, componentName: String): 
Seq[ResourceID] = {
+sparkConf.getAllWithPrefix(s"$componentName.resource.").map { case (key, 
_) =>
+  key.substring(0, key.indexOf('.'))
+}.toSet.toSeq.map(name => ResourceID(componentName, name))
+  }
+
+  def parseAllResourceRequests(
+  sparkConf: SparkConf,
+  componentName: String): Seq[ResourceRequest] = {
+listResourceIds(sparkConf, componentName).map { id =>
+  parseResourceRequest(sparkConf, id)
+}
+  }
+
+  def parseTaskResourceRequirements(sparkConf: SparkConf): 
Seq[TaskResourceRequirement] = {
+parseAllResourceRequests(sparkConf, SPARK_TASK_PREFIX).map { request =>
+  TaskResourceRequirement(request.id.resourceName, request.amount)
+}
+  }
+
+  private def parseAllocatedFromJsonFile(resourcesFile: String): 
Seq[ResourceAllocation] = {
+implicit val formats = DefaultFormats
+val json = new String(Files.readAllBytes(Paths.get(resourcesFile)))
+try {
+  parse(json).extract[Seq[ResourceAllocation]]
+} catch {
+  case NonFatal(e) =>
+throw new SparkException(s"Error parsing resources file 
$resourcesFile", e)
+}
+  }
+
+  private def parseAllocatedOrDiscoverResources(
+  sparkConf: SparkConf,
+  componentName: String,
+  resourcesFileOpt: Option[String]): Seq[ResourceAllocation] = {
+  

[GitHub] [spark] Ngone51 commented on a change in pull request #24856: [SPARK-27823] [CORE] Refactor resource handling code

2019-06-14 Thread GitBox
Ngone51 commented on a change in pull request #24856: [SPARK-27823] [CORE] 
Refactor resource handling code
URL: https://github.com/apache/spark/pull/24856#discussion_r293863256
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala
 ##
 @@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.resource
+
+import java.io.File
+import java.nio.file.{Files => JavaFiles}
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkException, 
SparkFunSuite}
+import org.apache.spark.TestUtils._
+import org.apache.spark.internal.config._
+import org.apache.spark.resource.ResourceUtils._
+import org.apache.spark.resource.TestResourceIDs._
+import org.apache.spark.util.Utils
+
+class ResourceUtilsSuite extends SparkFunSuite
+with LocalSparkContext {
+
+  test("ResourceID") {
+val componentName = "spark.test"
+val resourceName = "p100"
+val id = ResourceID(componentName, resourceName)
+val confPrefix = s"$componentName.resource.$resourceName."
+assert(id.confPrefix === confPrefix)
+assert(id.amountConf === s"${confPrefix}amount")
+assert(id.discoveryScriptConf === s"${confPrefix}discoveryScript")
+assert(id.vendorConf === s"${confPrefix}vendor")
+  }
+
+  test("Resource discoverer no addresses errors") {
+val conf = new SparkConf
+assume(!(Utils.isWindows))
+withTempDir { dir =>
+  val scriptPath = createTempScriptWithExpectedOutput(dir, 
"gpuDiscoverScript",
+"""{"name": "gpu"}""")
+  conf.set(EXECUTOR_GPU_ID.amountConf, "2")
+  conf.set(EXECUTOR_GPU_ID.discoveryScriptConf, scriptPath)
+
+  val error = intercept[IllegalArgumentException] {
+getOrDiscoverAllResources(conf, SPARK_EXECUTOR_PREFIX, None)
+  }.getMessage()
+  assert(error.contains("Resource: gpu, with " +
+"addresses:  is less than what the user requested: 2"))
+}
+  }
+
+  test("Resource discoverer multiple resource types") {
+val conf = new SparkConf
+assume(!(Utils.isWindows))
+withTempDir { dir =>
+  val gpuDiscovery = createTempScriptWithExpectedOutput(dir, 
"gpuDiscoveryScript",
+"""{"name": "gpu", "addresses": ["0", "1"]}""")
+  conf.set(EXECUTOR_GPU_ID.amountConf, "2")
+  conf.set(EXECUTOR_GPU_ID.discoveryScriptConf, gpuDiscovery)
+
+  val fpgaDiscovery = createTempScriptWithExpectedOutput(dir, 
"fpgDiscoverScript",
+"""{"name": "fpga", "addresses": ["f1", "f2", "f3"]}""")
+  conf.set(EXECUTOR_FPGA_ID.amountConf, "2")
+  conf.set(EXECUTOR_FPGA_ID.discoveryScriptConf, fpgaDiscovery)
+
+  val resources = getOrDiscoverAllResources(conf, SPARK_EXECUTOR_PREFIX, 
None)
+  assert(resources.size === 2)
+  val gpuValue = resources.get(GPU)
+  assert(gpuValue.nonEmpty, "Should have a gpu entry")
+  assert(gpuValue.get.name == "gpu", "name should be gpu")
+  assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes")
+  assert(gpuValue.get.addresses.deep == Array("0", "1").deep, "should have 
0,1 entries")
+
+  val fpgaValue = resources.get(FPGA)
+  assert(fpgaValue.nonEmpty, "Should have a gpu entry")
+  assert(fpgaValue.get.name == "fpga", "name should be fpga")
+  assert(fpgaValue.get.addresses.size == 3, "Should have 3 indexes")
+  assert(fpgaValue.get.addresses.deep == Array("f1", "f2", "f3").deep,
+"should have f1,f2,f3 entries")
+}
+  }
+
+  test("list resource ids") {
+val conf = new SparkConf
+conf.set(DRIVER_GPU_ID.amountConf, "2")
+var resources = listResourceIds(conf, SPARK_DRIVER_PREFIX)
+assert(resources.size === 1, "should only have GPU for resource")
+assert(resources(0).resourceName == GPU, "name should be gpu")
+
+conf.set(DRIVER_FPGA_ID.amountConf, "2")
+val resourcesMap = listResourceIds(conf, SPARK_DRIVER_PREFIX)
+  .map{ rId => (rId.resourceName, 1)}.toMap
+assert(resourcesMap.size === 2, "should only have GPU for resource")
+assert(resourcesMap.get(GPU).nonEmpty, "should have GPU")
+assert(resourcesMap.get(FPGA).nonEmpty, "should have FPGA")
+  }
+
+  test("parse resource request") {
+val conf = new SparkConf
+conf.set(DRI

[GitHub] [spark] Ngone51 commented on a change in pull request #24856: [SPARK-27823] [CORE] Refactor resource handling code

2019-06-14 Thread GitBox
Ngone51 commented on a change in pull request #24856: [SPARK-27823] [CORE] 
Refactor resource handling code
URL: https://github.com/apache/spark/pull/24856#discussion_r293868313
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/resource/ResourceInformation.scala
 ##
 @@ -35,3 +41,25 @@ class ResourceInformation(
 
   override def toString: String = s"[name: ${name}, addresses: 
${addresses.mkString(",")}]"
 }
+
+private[spark] object ResourceInformation {
+  /**
+   * Parses a JSON string into a [[ResourceInformation]] instance.
+   */
+  def parseJson(json: String): ResourceInformation = {
+implicit val formats = DefaultFormats
+try {
+  parse(json).extract[ResourceInformationJson].toResourceInformation
 
 Review comment:
   Shall we check duplicate addresses for user ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #24856: [SPARK-27823] [CORE] Refactor resource handling code

2019-06-14 Thread GitBox
Ngone51 commented on a change in pull request #24856: [SPARK-27823] [CORE] 
Refactor resource handling code
URL: https://github.com/apache/spark/pull/24856#discussion_r293863096
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala
 ##
 @@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.resource
+
+import java.io.File
+import java.nio.file.{Files => JavaFiles}
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkException, 
SparkFunSuite}
+import org.apache.spark.TestUtils._
+import org.apache.spark.internal.config._
+import org.apache.spark.resource.ResourceUtils._
+import org.apache.spark.resource.TestResourceIDs._
+import org.apache.spark.util.Utils
+
+class ResourceUtilsSuite extends SparkFunSuite
+with LocalSparkContext {
+
+  test("ResourceID") {
+val componentName = "spark.test"
+val resourceName = "p100"
+val id = ResourceID(componentName, resourceName)
+val confPrefix = s"$componentName.resource.$resourceName."
+assert(id.confPrefix === confPrefix)
+assert(id.amountConf === s"${confPrefix}amount")
+assert(id.discoveryScriptConf === s"${confPrefix}discoveryScript")
+assert(id.vendorConf === s"${confPrefix}vendor")
+  }
+
+  test("Resource discoverer no addresses errors") {
+val conf = new SparkConf
+assume(!(Utils.isWindows))
+withTempDir { dir =>
+  val scriptPath = createTempScriptWithExpectedOutput(dir, 
"gpuDiscoverScript",
+"""{"name": "gpu"}""")
+  conf.set(EXECUTOR_GPU_ID.amountConf, "2")
+  conf.set(EXECUTOR_GPU_ID.discoveryScriptConf, scriptPath)
+
+  val error = intercept[IllegalArgumentException] {
+getOrDiscoverAllResources(conf, SPARK_EXECUTOR_PREFIX, None)
+  }.getMessage()
+  assert(error.contains("Resource: gpu, with " +
+"addresses:  is less than what the user requested: 2"))
+}
+  }
+
+  test("Resource discoverer multiple resource types") {
+val conf = new SparkConf
+assume(!(Utils.isWindows))
+withTempDir { dir =>
+  val gpuDiscovery = createTempScriptWithExpectedOutput(dir, 
"gpuDiscoveryScript",
+"""{"name": "gpu", "addresses": ["0", "1"]}""")
+  conf.set(EXECUTOR_GPU_ID.amountConf, "2")
+  conf.set(EXECUTOR_GPU_ID.discoveryScriptConf, gpuDiscovery)
+
+  val fpgaDiscovery = createTempScriptWithExpectedOutput(dir, 
"fpgDiscoverScript",
+"""{"name": "fpga", "addresses": ["f1", "f2", "f3"]}""")
+  conf.set(EXECUTOR_FPGA_ID.amountConf, "2")
+  conf.set(EXECUTOR_FPGA_ID.discoveryScriptConf, fpgaDiscovery)
+
+  val resources = getOrDiscoverAllResources(conf, SPARK_EXECUTOR_PREFIX, 
None)
+  assert(resources.size === 2)
+  val gpuValue = resources.get(GPU)
+  assert(gpuValue.nonEmpty, "Should have a gpu entry")
+  assert(gpuValue.get.name == "gpu", "name should be gpu")
+  assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes")
+  assert(gpuValue.get.addresses.deep == Array("0", "1").deep, "should have 
0,1 entries")
+
+  val fpgaValue = resources.get(FPGA)
+  assert(fpgaValue.nonEmpty, "Should have a gpu entry")
+  assert(fpgaValue.get.name == "fpga", "name should be fpga")
+  assert(fpgaValue.get.addresses.size == 3, "Should have 3 indexes")
+  assert(fpgaValue.get.addresses.deep == Array("f1", "f2", "f3").deep,
+"should have f1,f2,f3 entries")
+}
+  }
+
+  test("list resource ids") {
+val conf = new SparkConf
+conf.set(DRIVER_GPU_ID.amountConf, "2")
+var resources = listResourceIds(conf, SPARK_DRIVER_PREFIX)
+assert(resources.size === 1, "should only have GPU for resource")
+assert(resources(0).resourceName == GPU, "name should be gpu")
+
+conf.set(DRIVER_FPGA_ID.amountConf, "2")
+val resourcesMap = listResourceIds(conf, SPARK_DRIVER_PREFIX)
+  .map{ rId => (rId.resourceName, 1)}.toMap
+assert(resourcesMap.size === 2, "should only have GPU for resource")
+assert(resourcesMap.get(GPU).nonEmpty, "should have GPU")
+assert(resourcesMap.get(FPGA).nonEmpty, "should have FPGA")
+  }
+
+  test("parse resource request") {
+val conf = new SparkConf
+conf.set(DRI

[GitHub] [spark] Ngone51 commented on a change in pull request #24856: [SPARK-27823] [CORE] Refactor resource handling code

2019-06-14 Thread GitBox
Ngone51 commented on a change in pull request #24856: [SPARK-27823] [CORE] 
Refactor resource handling code
URL: https://github.com/apache/spark/pull/24856#discussion_r293841356
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/SparkContext.scala
 ##
 @@ -2723,44 +2689,50 @@ object SparkContext extends Logging {
 
   // Calculate the max slots each executor can provide based on resources 
available on each
   // executor and resources required by each task.
-  val taskResourcesAndCount = sc.conf.getTaskResourceRequirements()
-  val executorResourcesAndCounts = sc.conf.getAllWithPrefixAndSuffix(
-SPARK_EXECUTOR_RESOURCE_PREFIX, SPARK_RESOURCE_AMOUNT_SUFFIX).toMap
+  val taskResourceRequirements = parseTaskResourceRequirements(sc.conf)
+  val executorResourcesAndCounts =
 
 Review comment:
   Shall we unify executor's `count` to `amount` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org