Fixed a bug relating to lingering executors [2/2].

A task-less v1 executor could linger if the agent restarts
before any of the executor's initial tasks got delivered.
This was because we checked for the executor having any
tasks running before we remove the dropped tasks.

This patch fixes this issue by checking whether the
executor should be shut down *after* we've removed the
tasks that were dropped during agent restart.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a01e6602
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a01e6602
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a01e6602

Branch: refs/heads/1.4.x
Commit: a01e66024de8336c7cf6c70f7d689eccd29ebabd
Parents: 778ed21
Author: Meng Zhu <m...@mesosphere.io>
Authored: Wed Feb 7 17:46:15 2018 -0800
Committer: Benjamin Mahler <bmah...@apache.org>
Committed: Mon Feb 12 21:44:26 2018 -0800

----------------------------------------------------------------------
 src/slave/slave.cpp | 118 +++++++++++++++++++++++------------------------
 1 file changed, 59 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a01e6602/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 5ba160e..4b3d1fc 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3808,34 +3808,6 @@ void Slave::subscribe(
         CHECK_SOME(os::touch(path));
       }
 
-      // Here, we kill the executor if it no longer has any task or task group
-      // to run (e.g., framework sent a `killTask()`). This is a workaround for
-      // those executors (e.g., command executor, default executor) that do not
-      // have a proper self terminating logic when they haven't received the
-      // task or task group within a timeout.
-      if (!executor->everSentTask() && executor->queuedTasks.empty()) {
-        LOG(WARNING) << "Shutting down subscribing executor " << *executor
-                     << " because it was never sent a task and"
-                     << " has no tasks to run";
-
-        _shutdownExecutor(framework, executor);
-
-        return;
-      }
-
-      // Tell executor it's registered and give it any queued tasks
-      // or task groups.
-      executor::Event event;
-      event.set_type(executor::Event::SUBSCRIBED);
-
-      executor::Event::Subscribed* subscribed = event.mutable_subscribed();
-      subscribed->mutable_executor_info()->CopyFrom(executor->info);
-      subscribed->mutable_framework_info()->MergeFrom(framework->info);
-      subscribed->mutable_slave_info()->CopyFrom(info);
-      subscribed->mutable_container_id()->CopyFrom(executor->containerId);
-
-      executor->send(event);
-
       // Handle all the pending updates.
       // The status update manager might have already checkpointed some
       // of these pending updates (for example, if the slave died right
@@ -3851,37 +3823,6 @@ void Slave::subscribe(
             None());
       }
 
-      // We maintain a copy of the tasks in `queuedTaskGroups` also in
-      // `queuedTasks`. Hence, we need to ensure that we don't send the same
-      // tasks to the executor twice.
-      LinkedHashMap<TaskID, TaskInfo> queuedTasks;
-      foreachpair (const TaskID& taskId,
-                   const TaskInfo& taskInfo,
-                   executor->queuedTasks) {
-        queuedTasks[taskId] = taskInfo;
-      }
-
-      foreach (const TaskGroupInfo& taskGroup, executor->queuedTaskGroups) {
-        foreach (const TaskInfo& task, taskGroup.tasks()) {
-          const TaskID& taskId = task.task_id();
-          if (queuedTasks.contains(taskId)) {
-            queuedTasks.erase(taskId);
-          }
-        }
-      }
-
-      containerizer->update(
-          executor->containerId,
-          executor->allocatedResources())
-        .onAny(defer(self(),
-                     &Self::___run,
-                     lambda::_1,
-                     framework->id(),
-                     executor->id,
-                     executor->containerId,
-                     queuedTasks.values(),
-                     executor->queuedTaskGroups));
-
       hashmap<TaskID, TaskInfo> unackedTasks;
       foreach (const TaskInfo& task, subscribe.unacknowledged_tasks()) {
         unackedTasks[task.task_id()] = task;
@@ -3928,6 +3869,65 @@ void Slave::subscribe(
         }
       }
 
+      // Shutdown the executor if all of its initial tasks are killed.
+      // See MESOS-8411. This is a workaround for those executors (e.g.,
+      // command executor, default executor) that do not have a proper
+      // self terminating logic when they haven't received the task or
+      // task group within a timeout.
+      if (!executor->everSentTask() && executor->queuedTasks.empty()) {
+        LOG(WARNING) << "Shutting down executor " << *executor
+                     << " because it has never been sent a task and all of"
+                     << " its queued tasks have been killed before delivery";
+
+        _shutdownExecutor(framework, executor);
+
+        return;
+      }
+
+      // Tell executor it's registered and give it any queued tasks
+      // or task groups.
+      executor::Event event;
+      event.set_type(executor::Event::SUBSCRIBED);
+
+      executor::Event::Subscribed* subscribed = event.mutable_subscribed();
+      subscribed->mutable_executor_info()->CopyFrom(executor->info);
+      subscribed->mutable_framework_info()->MergeFrom(framework->info);
+      subscribed->mutable_slave_info()->CopyFrom(info);
+      subscribed->mutable_container_id()->CopyFrom(executor->containerId);
+
+      executor->send(event);
+
+      // We maintain a copy of the tasks in `queuedTaskGroups` also in
+      // `queuedTasks`. Hence, we need to ensure that we don't send the same
+      // tasks to the executor twice.
+      LinkedHashMap<TaskID, TaskInfo> queuedTasks;
+      foreachpair (const TaskID& taskId,
+                   const TaskInfo& taskInfo,
+                   executor->queuedTasks) {
+        queuedTasks[taskId] = taskInfo;
+      }
+
+      foreach (const TaskGroupInfo& taskGroup, executor->queuedTaskGroups) {
+        foreach (const TaskInfo& task, taskGroup.tasks()) {
+          const TaskID& taskId = task.task_id();
+          if (queuedTasks.contains(taskId)) {
+            queuedTasks.erase(taskId);
+          }
+        }
+      }
+
+      containerizer->update(
+          executor->containerId,
+          executor->allocatedResources())
+        .onAny(defer(self(),
+                     &Self::___run,
+                     lambda::_1,
+                     framework->id(),
+                     executor->id,
+                     executor->containerId,
+                     queuedTasks.values(),
+                     executor->queuedTaskGroups));
+
       break;
     }
     default:

Reply via email to