Ngone51 commented on a change in pull request #29015:
URL: https://github.com/apache/spark/pull/29015#discussion_r453521642



##########
File path: 
core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
##########
@@ -49,6 +56,23 @@ class MasterWebUI(
       "/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = 
Set("POST")))
     attachHandler(createRedirectHandler(
       "/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = 
Set("POST")))
+    attachHandler(createServletHandler("/workers/kill", new HttpServlet {
+      override def doPost(req: HttpServletRequest, resp: HttpServletResponse): 
Unit = {
+        val hostPorts: Seq[String] = Option(req.getParameterValues("host"))
+          .getOrElse(Array[String]()).map(_.toLowerCase(Locale.ENGLISH)).toSeq
+        logInfo(s"Received request to decommission hostPorts $hostPorts from 
${req.getRemoteAddr}")

Review comment:
       nit: `decommission hostPorts` -> `decommission workers`?

##########
File path: core/src/main/scala/org/apache/spark/deploy/master/Master.scala
##########
@@ -863,7 +872,36 @@ private[deploy] class Master(
     true
   }
 
-  private def decommissionWorker(worker: WorkerInfo): Unit = {
+  private def handleDecommissionWorkers(hostPorts: Seq[String]): Integer = {
+    val hostsWithoutPorts = new HashSet[String]
+    val addressToRemove = new HashSet[RpcAddress]
+    hostPorts.foreach(hp => {

Review comment:
       we can first filter the valid workers and then apply 
`decommissionWorker` for each worker, e.g.
   
   `hostPorsts.filter(...).foreach(...)`

##########
File path: core/src/main/scala/org/apache/spark/deploy/master/Master.scala
##########
@@ -863,7 +872,36 @@ private[deploy] class Master(
     true
   }
 
-  private def decommissionWorker(worker: WorkerInfo): Unit = {
+  private def handleDecommissionWorkers(hostPorts: Seq[String]): Integer = {
+    val hostsWithoutPorts = new HashSet[String]
+    val addressToRemove = new HashSet[RpcAddress]
+    hostPorts.foreach(hp => {
+      val (host, port) = Utils.parseHostPort(hp)
+      if (port == 0) {
+        hostsWithoutPorts.add(host)
+      } else {
+        val rpcAddress = new RpcAddress(host, port)
+        if (addressToWorker.contains(rpcAddress)) {
+          addressToRemove.add(rpcAddress)
+        }
+      }
+    })
+
+    if (hostsWithoutPorts.nonEmpty) {
+      addressToRemove ++= addressToWorker.keys.filter(addr => 
hostsWithoutPorts.contains(addr.host))
+    }

Review comment:
       It seems that comparing the `host` only should be enough?

##########
File path: 
core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
##########
@@ -49,6 +56,23 @@ class MasterWebUI(
       "/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = 
Set("POST")))
     attachHandler(createRedirectHandler(
       "/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = 
Set("POST")))
+    attachHandler(createServletHandler("/workers/kill", new HttpServlet {
+      override def doPost(req: HttpServletRequest, resp: HttpServletResponse): 
Unit = {
+        val hostPorts: Seq[String] = Option(req.getParameterValues("host"))
+          .getOrElse(Array[String]()).map(_.toLowerCase(Locale.ENGLISH)).toSeq

Review comment:
       Why use `Locale.ENGLISH`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to