This is an automated email from the ASF dual-hosted git repository. chetanm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 775b757 Cleanup orphaned failed task launches (#4109) 775b757 is described below commit 775b757ac7555c8371a5e03497622531984bf4f4 Author: tysonnorris <tysonnor...@gmail.com> AuthorDate: Tue Nov 13 08:03:13 2018 -0800 Cleanup orphaned failed task launches (#4109) update mesos-actor and cleanup orphaned failed task launches * review feedback --- common/scala/build.gradle | 2 +- .../apache/openwhisk/core/mesos/MesosTask.scala | 59 +++++++++++----------- 2 files changed, 31 insertions(+), 30 deletions(-) diff --git a/common/scala/build.gradle b/common/scala/build.gradle index 2ae4a29..500e322 100644 --- a/common/scala/build.gradle +++ b/common/scala/build.gradle @@ -64,7 +64,7 @@ dependencies { compile 'io.kamon:kamon-core_2.12:0.6.7' compile 'io.kamon:kamon-statsd_2.12:0.6.7' //for mesos - compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.13' + compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.14' //tracing support compile 'io.opentracing:opentracing-api:0.31.0' diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala index b78e33f..373b123 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala @@ -37,8 +37,6 @@ import com.adobe.api.platform.runtime.mesos.SubmitTask import com.adobe.api.platform.runtime.mesos.TaskDef import com.adobe.api.platform.runtime.mesos.User import java.time.Instant -import org.apache.mesos.v1.Protos.TaskState -import org.apache.mesos.v1.Protos.TaskStatus import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.util.Failure @@ -142,9 +140,14 @@ object MesosTask { transid.finished(this, start, s"launched task ${taskId} at ${taskDetails.hostname}:${taskDetails .hostports(0)}", logLevel = InfoLevel) case Failure(ate: AskTimeoutException) => - transid.failed(this, start, ate.getMessage, ErrorLevel) + transid.failed(this, start, s"task launch timed out ${ate.getMessage}", ErrorLevel) MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_MESOS_CMD_TIMEOUT(LAUNCH_CMD)) - case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel) + //kill the task whose launch timed out + destroy(mesosClientActor, mesosConfig, taskId) + case Failure(t) => + //kill the task whose launch timed out + destroy(mesosClientActor, mesosConfig, taskId) + transid.failed(this, start, s"task launch failed ${t.getMessage}", ErrorLevel) } .map(taskDetails => { val taskHost = taskDetails.hostname @@ -155,7 +158,29 @@ object MesosTask { }) } + private def destroy(mesosClientActor: ActorRef, mesosConfig: MesosConfig, taskId: String)( + implicit transid: TransactionId, + logging: Logging, + ec: ExecutionContext): Future[Unit] = { + val taskDeleteTimeout = Timeout(mesosConfig.timeouts.taskDelete) + val start = transid.started( + this, + LoggingMarkers.INVOKER_MESOS_CMD(MesosTask.KILL_CMD), + s"killing mesos taskid $taskId (timeout: ${taskDeleteTimeout})", + logLevel = InfoLevel) + + mesosClientActor + .ask(DeleteTask(taskId))(taskDeleteTimeout) + .andThen { + case Success(_) => transid.finished(this, start, logLevel = InfoLevel) + case Failure(ate: AskTimeoutException) => + transid.failed(this, start, s"task destroy timed out ${ate.getMessage}", ErrorLevel) + MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_MESOS_CMD_TIMEOUT(MesosTask.KILL_CMD)) + case Failure(t) => transid.failed(this, start, s"task destroy failed ${t.getMessage}", ErrorLevel) + } + .map(_ => {}) + } } object JsonFormatters extends DefaultJsonProtocol { @@ -171,7 +196,6 @@ class MesosTask(override protected val id: ContainerId, mesosClientActor: ActorRef, mesosConfig: MesosConfig) extends Container { - val taskDeleteTimeout = Timeout(mesosConfig.timeouts.taskLaunch) /** Stops the container from consuming CPU cycles. */ override def suspend()(implicit transid: TransactionId): Future[Unit] = { @@ -187,30 +211,7 @@ class MesosTask(override protected val id: ContainerId, /** Completely destroys this instance of the container. */ override def destroy()(implicit transid: TransactionId): Future[Unit] = { - val start = transid.started( - this, - LoggingMarkers.INVOKER_MESOS_CMD(MesosTask.KILL_CMD), - s"killing mesos taskid $taskId (timeout: ${taskDeleteTimeout})", - logLevel = InfoLevel) - - mesosClientActor - .ask(DeleteTask(taskId))(taskDeleteTimeout) - .mapTo[TaskStatus] - .andThen { - case Success(_) => transid.finished(this, start, logLevel = InfoLevel) - case Failure(ate: AskTimeoutException) => - transid.failed(this, start, ate.getMessage, ErrorLevel) - MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_MESOS_CMD_TIMEOUT(MesosTask.KILL_CMD)) - case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel) - } - .map(taskStatus => { - // verify that task ended in TASK_KILLED state (but don't fail if it didn't...) - if (taskStatus.getState != TaskState.TASK_KILLED) { - logging.error(this, s"task kill resulted in unexpected state ${taskStatus.getState}") - } else { - logging.info(this, s"task killed ended with state ${taskStatus.getState}") - } - })(ec) + MesosTask.destroy(mesosClientActor, mesosConfig, taskId) } /**