chetanmeh closed pull request #4109: update mesos-actor; cleanup orphaned failed task launches URL: https://github.com/apache/incubator-openwhisk/pull/4109
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/scala/build.gradle b/common/scala/build.gradle index 2ae4a2963d..500e3227c7 100644 --- a/common/scala/build.gradle +++ b/common/scala/build.gradle @@ -64,7 +64,7 @@ dependencies { compile 'io.kamon:kamon-core_2.12:0.6.7' compile 'io.kamon:kamon-statsd_2.12:0.6.7' //for mesos - compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.13' + compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.14' //tracing support compile 'io.opentracing:opentracing-api:0.31.0' diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala index b78e33fc49..373b123f97 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala @@ -37,8 +37,6 @@ import com.adobe.api.platform.runtime.mesos.SubmitTask import com.adobe.api.platform.runtime.mesos.TaskDef import com.adobe.api.platform.runtime.mesos.User import java.time.Instant -import org.apache.mesos.v1.Protos.TaskState -import org.apache.mesos.v1.Protos.TaskStatus import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.util.Failure @@ -142,9 +140,14 @@ object MesosTask { transid.finished(this, start, s"launched task ${taskId} at ${taskDetails.hostname}:${taskDetails .hostports(0)}", logLevel = InfoLevel) case Failure(ate: AskTimeoutException) => - transid.failed(this, start, ate.getMessage, ErrorLevel) + transid.failed(this, start, s"task launch timed out ${ate.getMessage}", ErrorLevel) MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_MESOS_CMD_TIMEOUT(LAUNCH_CMD)) - case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel) + //kill the task whose launch timed out + destroy(mesosClientActor, mesosConfig, taskId) + case Failure(t) => + //kill the task whose launch timed out + destroy(mesosClientActor, mesosConfig, taskId) + transid.failed(this, start, s"task launch failed ${t.getMessage}", ErrorLevel) } .map(taskDetails => { val taskHost = taskDetails.hostname @@ -155,7 +158,29 @@ object MesosTask { }) } + private def destroy(mesosClientActor: ActorRef, mesosConfig: MesosConfig, taskId: String)( + implicit transid: TransactionId, + logging: Logging, + ec: ExecutionContext): Future[Unit] = { + val taskDeleteTimeout = Timeout(mesosConfig.timeouts.taskDelete) + val start = transid.started( + this, + LoggingMarkers.INVOKER_MESOS_CMD(MesosTask.KILL_CMD), + s"killing mesos taskid $taskId (timeout: ${taskDeleteTimeout})", + logLevel = InfoLevel) + + mesosClientActor + .ask(DeleteTask(taskId))(taskDeleteTimeout) + .andThen { + case Success(_) => transid.finished(this, start, logLevel = InfoLevel) + case Failure(ate: AskTimeoutException) => + transid.failed(this, start, s"task destroy timed out ${ate.getMessage}", ErrorLevel) + MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_MESOS_CMD_TIMEOUT(MesosTask.KILL_CMD)) + case Failure(t) => transid.failed(this, start, s"task destroy failed ${t.getMessage}", ErrorLevel) + } + .map(_ => {}) + } } object JsonFormatters extends DefaultJsonProtocol { @@ -171,7 +196,6 @@ class MesosTask(override protected val id: ContainerId, mesosClientActor: ActorRef, mesosConfig: MesosConfig) extends Container { - val taskDeleteTimeout = Timeout(mesosConfig.timeouts.taskLaunch) /** Stops the container from consuming CPU cycles. */ override def suspend()(implicit transid: TransactionId): Future[Unit] = { @@ -187,30 +211,7 @@ class MesosTask(override protected val id: ContainerId, /** Completely destroys this instance of the container. */ override def destroy()(implicit transid: TransactionId): Future[Unit] = { - val start = transid.started( - this, - LoggingMarkers.INVOKER_MESOS_CMD(MesosTask.KILL_CMD), - s"killing mesos taskid $taskId (timeout: ${taskDeleteTimeout})", - logLevel = InfoLevel) - - mesosClientActor - .ask(DeleteTask(taskId))(taskDeleteTimeout) - .mapTo[TaskStatus] - .andThen { - case Success(_) => transid.finished(this, start, logLevel = InfoLevel) - case Failure(ate: AskTimeoutException) => - transid.failed(this, start, ate.getMessage, ErrorLevel) - MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_MESOS_CMD_TIMEOUT(MesosTask.KILL_CMD)) - case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel) - } - .map(taskStatus => { - // verify that task ended in TASK_KILLED state (but don't fail if it didn't...) - if (taskStatus.getState != TaskState.TASK_KILLED) { - logging.error(this, s"task kill resulted in unexpected state ${taskStatus.getState}") - } else { - logging.info(this, s"task killed ended with state ${taskStatus.getState}") - } - })(ec) + MesosTask.destroy(mesosClientActor, mesosConfig, taskId) } /** ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services