This is an automated email from the ASF dual-hosted git repository.

chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 775b757  Cleanup orphaned failed task launches (#4109)
775b757 is described below

commit 775b757ac7555c8371a5e03497622531984bf4f4
Author: tysonnorris <tysonnor...@gmail.com>
AuthorDate: Tue Nov 13 08:03:13 2018 -0800

    Cleanup orphaned failed task launches (#4109)
    
    update mesos-actor and cleanup orphaned failed task launches
    
    * review feedback
---
 common/scala/build.gradle                          |  2 +-
 .../apache/openwhisk/core/mesos/MesosTask.scala    | 59 +++++++++++-----------
 2 files changed, 31 insertions(+), 30 deletions(-)

diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 2ae4a29..500e322 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -64,7 +64,7 @@ dependencies {
     compile 'io.kamon:kamon-core_2.12:0.6.7'
     compile 'io.kamon:kamon-statsd_2.12:0.6.7'
     //for mesos
-    compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.13'
+    compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.14'
 
     //tracing support
     compile 'io.opentracing:opentracing-api:0.31.0'
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala
index b78e33f..373b123 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala
@@ -37,8 +37,6 @@ import com.adobe.api.platform.runtime.mesos.SubmitTask
 import com.adobe.api.platform.runtime.mesos.TaskDef
 import com.adobe.api.platform.runtime.mesos.User
 import java.time.Instant
-import org.apache.mesos.v1.Protos.TaskState
-import org.apache.mesos.v1.Protos.TaskStatus
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.util.Failure
@@ -142,9 +140,14 @@ object MesosTask {
           transid.finished(this, start, s"launched task ${taskId} at 
${taskDetails.hostname}:${taskDetails
             .hostports(0)}", logLevel = InfoLevel)
         case Failure(ate: AskTimeoutException) =>
-          transid.failed(this, start, ate.getMessage, ErrorLevel)
+          transid.failed(this, start, s"task launch timed out 
${ate.getMessage}", ErrorLevel)
           
MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_MESOS_CMD_TIMEOUT(LAUNCH_CMD))
-        case Failure(t) => transid.failed(this, start, t.getMessage, 
ErrorLevel)
+          //kill the task whose launch timed out
+          destroy(mesosClientActor, mesosConfig, taskId)
+        case Failure(t) =>
+          //kill the task whose launch timed out
+          destroy(mesosClientActor, mesosConfig, taskId)
+          transid.failed(this, start, s"task launch failed ${t.getMessage}", 
ErrorLevel)
       }
       .map(taskDetails => {
         val taskHost = taskDetails.hostname
@@ -155,7 +158,29 @@ object MesosTask {
       })
 
   }
+  private def destroy(mesosClientActor: ActorRef, mesosConfig: MesosConfig, 
taskId: String)(
+    implicit transid: TransactionId,
+    logging: Logging,
+    ec: ExecutionContext): Future[Unit] = {
+    val taskDeleteTimeout = Timeout(mesosConfig.timeouts.taskDelete)
 
+    val start = transid.started(
+      this,
+      LoggingMarkers.INVOKER_MESOS_CMD(MesosTask.KILL_CMD),
+      s"killing mesos taskid $taskId (timeout: ${taskDeleteTimeout})",
+      logLevel = InfoLevel)
+
+    mesosClientActor
+      .ask(DeleteTask(taskId))(taskDeleteTimeout)
+      .andThen {
+        case Success(_) => transid.finished(this, start, logLevel = InfoLevel)
+        case Failure(ate: AskTimeoutException) =>
+          transid.failed(this, start, s"task destroy timed out 
${ate.getMessage}", ErrorLevel)
+          
MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_MESOS_CMD_TIMEOUT(MesosTask.KILL_CMD))
+        case Failure(t) => transid.failed(this, start, s"task destroy failed 
${t.getMessage}", ErrorLevel)
+      }
+      .map(_ => {})
+  }
 }
 
 object JsonFormatters extends DefaultJsonProtocol {
@@ -171,7 +196,6 @@ class MesosTask(override protected val id: ContainerId,
                 mesosClientActor: ActorRef,
                 mesosConfig: MesosConfig)
     extends Container {
-  val taskDeleteTimeout = Timeout(mesosConfig.timeouts.taskLaunch)
 
   /** Stops the container from consuming CPU cycles. */
   override def suspend()(implicit transid: TransactionId): Future[Unit] = {
@@ -187,30 +211,7 @@ class MesosTask(override protected val id: ContainerId,
 
   /** Completely destroys this instance of the container. */
   override def destroy()(implicit transid: TransactionId): Future[Unit] = {
-    val start = transid.started(
-      this,
-      LoggingMarkers.INVOKER_MESOS_CMD(MesosTask.KILL_CMD),
-      s"killing mesos taskid $taskId (timeout: ${taskDeleteTimeout})",
-      logLevel = InfoLevel)
-
-    mesosClientActor
-      .ask(DeleteTask(taskId))(taskDeleteTimeout)
-      .mapTo[TaskStatus]
-      .andThen {
-        case Success(_) => transid.finished(this, start, logLevel = InfoLevel)
-        case Failure(ate: AskTimeoutException) =>
-          transid.failed(this, start, ate.getMessage, ErrorLevel)
-          
MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_MESOS_CMD_TIMEOUT(MesosTask.KILL_CMD))
-        case Failure(t) => transid.failed(this, start, t.getMessage, 
ErrorLevel)
-      }
-      .map(taskStatus => {
-        // verify that task ended in TASK_KILLED state (but don't fail if it 
didn't...)
-        if (taskStatus.getState != TaskState.TASK_KILLED) {
-          logging.error(this, s"task kill resulted in unexpected state 
${taskStatus.getState}")
-        } else {
-          logging.info(this, s"task killed ended with state 
${taskStatus.getState}")
-        }
-      })(ec)
+    MesosTask.destroy(mesosClientActor, mesosConfig, taskId)
   }
 
   /**

Reply via email to