Changed agent to send TASK_GONE. The agent previously sent TASK_LOST updates for tasks that are killed for various reasons, such as containerizer errors or QoS preemption. The agent now sends TASK_GONE to partition-aware frameworks instead.
Review: https://reviews.apache.org/r/52803/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/01aa3ba4 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/01aa3ba4 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/01aa3ba4 Branch: refs/heads/master Commit: 01aa3ba4a043e3aff04a632008f65a6ba33f8dcb Parents: b46df16 Author: Neil Conway <neil.con...@gmail.com> Authored: Fri Oct 21 14:13:45 2016 -0700 Committer: Vinod Kone <vinodk...@gmail.com> Committed: Fri Oct 21 14:13:45 2016 -0700 ---------------------------------------------------------------------- src/slave/slave.cpp | 70 +++++++++-- .../docker_containerizer_tests.cpp | 16 ++- src/tests/oversubscription_tests.cpp | 124 ++++++++++++++++++- src/tests/slave_tests.cpp | 123 +++++++++++++++++- 4 files changed, 312 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/01aa3ba4/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index e13485c..881c10a 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -2111,8 +2111,20 @@ void Slave::__run( Executor* executor = getExecutor(frameworkId, executorId); if (executor != nullptr) { + Framework* framework = getFramework(frameworkId); + CHECK_NOTNULL(framework); + + // Send TASK_GONE because the task was started but has now + // been terminated. If the framework is not partition-aware, + // we send TASK_LOST instead for backward compatibility. + mesos::TaskState taskState = TASK_GONE; + if (!protobuf::frameworkHasCapability( + framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) { + taskState = TASK_LOST; + } + ContainerTermination termination; - termination.set_state(TASK_LOST); + termination.set_state(taskState); termination.add_reasons(TaskStatus::REASON_CONTAINER_UPDATE_FAILED); termination.set_message( "Failed to update resources for container: " + @@ -3624,8 +3636,20 @@ void Slave::_reregisterExecutor( Executor* executor = getExecutor(frameworkId, executorId); if (executor != nullptr) { + Framework* framework = getFramework(frameworkId); + CHECK_NOTNULL(framework); + + // Send TASK_GONE because the task was started but has now + // been terminated. If the framework is not partition-aware, + // we send TASK_LOST instead for backward compatibility. + mesos::TaskState taskState = TASK_GONE; + if (!protobuf::frameworkHasCapability( + framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) { + taskState = TASK_LOST; + } + ContainerTermination termination; - termination.set_state(TASK_LOST); + termination.set_state(taskState); termination.add_reasons(TaskStatus::REASON_CONTAINER_UPDATE_FAILED); termination.set_message( "Failed to update resources for container: " + @@ -3667,8 +3691,18 @@ void Slave::reregisterExecutorTimeout() executor->state = Executor::TERMINATING; + // Send TASK_GONE because the task was started but has now + // been terminated. If the framework is not partition-aware, + // we send TASK_LOST instead for backward compatibility. + mesos::TaskState taskState = TASK_GONE; + if (!protobuf::frameworkHasCapability( + framework->info, + FrameworkInfo::Capability::PARTITION_AWARE)) { + taskState = TASK_LOST; + } + ContainerTermination termination; - termination.set_state(TASK_LOST); + termination.set_state(taskState); termination.add_reasons( TaskStatus::REASON_EXECUTOR_REREGISTRATION_TIMEOUT); termination.set_message( @@ -3982,8 +4016,20 @@ void Slave::__statusUpdate( Executor* executor = getExecutor(update.framework_id(), executorId); if (executor != nullptr) { + Framework* framework = getFramework(update.framework_id()); + CHECK_NOTNULL(framework); + + // Send TASK_GONE because the task was started but has now + // been terminated. If the framework is not partition-aware, + // we send TASK_LOST instead for backward compatibility. + mesos::TaskState taskState = TASK_GONE; + if (!protobuf::frameworkHasCapability( + framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) { + taskState = TASK_LOST; + } + ContainerTermination termination; - termination.set_state(TASK_LOST); + termination.set_state(taskState); termination.add_reasons(TaskStatus::REASON_CONTAINER_UPDATE_FAILED); termination.set_message( "Failed to update resources for container: " + @@ -4644,10 +4690,10 @@ void Slave::executorTerminated( executor->state = Executor::TERMINATED; - // Transition all live tasks to TASK_LOST/TASK_FAILED. + // Transition all live tasks to TASK_GONE/TASK_FAILED. // If the containerizer killed the executor (e.g., due to OOM event) // or if this is a command executor, we send TASK_FAILED status updates - // instead of TASK_LOST. + // instead of TASK_GONE. // NOTE: We don't send updates if the framework is terminating // because we don't want the status update manager to keep retrying // these updates since it won't receive ACKs from the scheduler. Also, @@ -5698,8 +5744,18 @@ void Slave::_qosCorrections(const Future<list<QoSCorrection>>& future) // (MESOS-2875). executor->state = Executor::TERMINATING; + // Send TASK_GONE because the task was started but has now + // been terminated. If the framework is not partition-aware, + // we send TASK_LOST instead for backward compatibility. + mesos::TaskState taskState = TASK_GONE; + if (!protobuf::frameworkHasCapability( + framework->info, + FrameworkInfo::Capability::PARTITION_AWARE)) { + taskState = TASK_LOST; + } + ContainerTermination termination; - termination.set_state(TASK_LOST); + termination.set_state(taskState); termination.add_reasons(TaskStatus::REASON_CONTAINER_PREEMPTED); termination.set_message("Container preempted by QoS correction"); http://git-wip-us.apache.org/repos/asf/mesos/blob/01aa3ba4/src/tests/containerizer/docker_containerizer_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/containerizer/docker_containerizer_tests.cpp b/src/tests/containerizer/docker_containerizer_tests.cpp index 6d26797..73ae390 100644 --- a/src/tests/containerizer/docker_containerizer_tests.cpp +++ b/src/tests/containerizer/docker_containerizer_tests.cpp @@ -3382,9 +3382,13 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_ExecutorCleanupWhenLaunchFailed) StartSlave(detector.get(), &dockerContainerizer); ASSERT_SOME(slave); + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.add_capabilities()->set_type( + FrameworkInfo::Capability::PARTITION_AWARE); + MockScheduler sched; MesosSchedulerDriver driver( - &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); + &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); Future<FrameworkID> frameworkId; EXPECT_CALL(sched, registered(&driver, _, _)) @@ -3423,9 +3427,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_ExecutorCleanupWhenLaunchFailed) task.mutable_command()->CopyFrom(command); task.mutable_container()->CopyFrom(containerInfo); - Future<TaskStatus> statusLost; + Future<TaskStatus> statusGone; EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&statusLost)); + .WillOnce(FutureArg<1>(&statusGone)); Future<ContainerID> containerId; EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _)) @@ -3441,10 +3445,10 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_ExecutorCleanupWhenLaunchFailed) AWAIT_READY_FOR(containerId, Seconds(60)); - AWAIT_READY(statusLost); - EXPECT_EQ(TASK_LOST, statusLost.get().state()); + AWAIT_READY(statusGone); + EXPECT_EQ(TASK_GONE, statusGone.get().state()); EXPECT_EQ(TaskStatus::REASON_CONTAINER_UPDATE_FAILED, - statusLost.get().reason()); + statusGone.get().reason()); driver.stop(); driver.join(); http://git-wip-us.apache.org/repos/asf/mesos/blob/01aa3ba4/src/tests/oversubscription_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp index b356fb6..027f549 100644 --- a/src/tests/oversubscription_tests.cpp +++ b/src/tests/oversubscription_tests.cpp @@ -943,9 +943,9 @@ TEST_F(OversubscriptionTest, ReceiveQoSCorrection) } -// This test verifies that a QoS controller can kill a running task -// and that a TASK_LOST with REASON_EXECUTOR_PREEMPTED is sent to the -// framework. +// This test verifies that a QoS controller can kill a running task, +// and that this results in sending a TASK_LOST status update with +// REASON_EXECUTOR_PREEMPTED if the framework is not partition-aware. TEST_F(OversubscriptionTest, QoSCorrectionKill) { Try<Owned<cluster::Master>> master = StartMaster(); @@ -1033,12 +1033,124 @@ TEST_F(OversubscriptionTest, QoSCorrectionKill) // Verify task status is TASK_LOST. AWAIT_READY(status2); - ASSERT_EQ(TASK_LOST, status2->state()); - ASSERT_EQ(TaskStatus::REASON_CONTAINER_PREEMPTED, status2->reason()); + EXPECT_EQ(TASK_LOST, status2->state()); + EXPECT_EQ(TaskStatus::REASON_CONTAINER_PREEMPTED, status2->reason()); + EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status2->source()); - // Verify that slave incremented counter for preempted executors. + // Verify that slave incremented metrics appropriately. snapshot = Metrics(); EXPECT_EQ(1u, snapshot.values["slave/executors_preempted"]); + EXPECT_EQ(1u, snapshot.values["slave/tasks_lost"]); + EXPECT_EQ(0u, snapshot.values["slave/tasks_gone"]); + + driver.stop(); + driver.join(); +} + + +// This test verifies that a QoS controller can kill a running task, +// and that this results in sending a TASK_GONE status update with +// REASON_EXECUTOR_PREEMPTED if the framework is partition-aware. +TEST_F(OversubscriptionTest, QoSCorrectionKillPartitionAware) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + MockQoSController controller; + + Queue<list<mesos::slave::QoSCorrection>> corrections; + + EXPECT_CALL(controller, corrections()) + .WillRepeatedly(InvokeWithoutArgs( + &corrections, + &Queue<list<mesos::slave::QoSCorrection>>::get)); + + Future<lambda::function<Future<ResourceUsage>()>> usageCallback; + + // Catching callback which is passed to the QoS Controller. + EXPECT_CALL(controller, initialize(_)) + .WillOnce(DoAll(FutureArg<0>(&usageCallback), Return(Nothing()))); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + Try<Owned<cluster::Slave>> slave = + StartSlave(detector.get(), &controller, CreateSlaveFlags()); + ASSERT_SOME(slave); + + // Verify presence and initial value of counter for preempted + // executors. + JSON::Object snapshot = Metrics(); + EXPECT_EQ(1u, snapshot.values.count("slave/executors_preempted")); + EXPECT_EQ(0u, snapshot.values["slave/executors_preempted"]); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.add_capabilities()->set_type( + FrameworkInfo::Capability::PARTITION_AWARE); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); + + Future<FrameworkID> frameworkId; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureArg<1>(&frameworkId)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(frameworkId); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + TaskInfo task = createTask(offers.get()[0], "sleep 10"); + + Future<TaskStatus> status1; + Future<TaskStatus> status2; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status1)) + .WillOnce(FutureArg<1>(&status2)) + .WillRepeatedly(Return()); // Ignore subsequent updates. + + driver.launchTasks(offers.get()[0].id(), {task}); + + AWAIT_READY(status1); + ASSERT_EQ(TASK_RUNNING, status1.get().state()); + + AWAIT_READY(usageCallback); + + Future<ResourceUsage> usage = usageCallback.get()(); + AWAIT_READY(usage); + + // Expecting the same statistics as these returned by mocked containerizer. + ASSERT_EQ(1, usage.get().executors_size()); + + const ResourceUsage::Executor& executor = usage.get().executors(0); + // Carry out kill correction. + QoSCorrection killCorrection; + + QoSCorrection::Kill* kill = killCorrection.mutable_kill(); + kill->mutable_framework_id()->CopyFrom(frameworkId.get()); + kill->mutable_executor_id()->CopyFrom(executor.executor_info().executor_id()); + kill->mutable_container_id()->CopyFrom(executor.container_id()); + + corrections.put({killCorrection}); + + // Verify task status is TASK_GONE. + AWAIT_READY(status2); + EXPECT_EQ(TASK_GONE, status2->state()); + EXPECT_EQ(TaskStatus::REASON_CONTAINER_PREEMPTED, status2->reason()); + EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status2->source()); + + // Verify that slave incremented metrics appropriately. + snapshot = Metrics(); + EXPECT_EQ(1u, snapshot.values["slave/executors_preempted"]); + EXPECT_EQ(1u, snapshot.values["slave/tasks_gone"]); + EXPECT_EQ(0u, snapshot.values["slave/tasks_lost"]); driver.stop(); driver.join(); http://git-wip-us.apache.org/repos/asf/mesos/blob/01aa3ba4/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index 4395a67..8717ed1 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -2316,8 +2316,9 @@ TEST_F(SlaveTest, DISABLED_TerminatingSlaveDoesNotReregister) // This test verifies the slave will destroy a container if, when // receiving a terminal status task update, updating the container's -// resources fails. -TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails) +// resources fails. A non-partition-aware framework should receive +// TASK_LOST in this situation. +TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFailsWithLost) { // Start a master. Try<Owned<cluster::Master>> master = StartMaster(); @@ -2333,6 +2334,7 @@ TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails) Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer); ASSERT_SOME(slave); + // Connect a non-partition-aware scheduler. MockScheduler sched; MesosSchedulerDriver driver( &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); @@ -2412,6 +2414,123 @@ TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails) AWAIT_READY(executorLost); + JSON::Object stats = Metrics(); + EXPECT_EQ(0, stats.values["slave/tasks_gone"]); + EXPECT_EQ(1, stats.values["slave/tasks_lost"]); + + driver.stop(); + driver.join(); +} + + +// This test verifies the slave will destroy a container if, when +// receiving a terminal status task update, updating the container's +// resources fails. A partition-aware framework should receive +// TASK_GONE in this situation. +TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFailsWithGone) +{ + // Start a master. + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + TestContainerizer containerizer(&exec); + EXPECT_CALL(exec, registered(_, _, _, _)); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + // Start a slave. + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer); + ASSERT_SOME(slave); + + // Connect a partition-aware scheduler. + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.add_capabilities()->set_type( + FrameworkInfo::Capability::PARTITION_AWARE); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(_, _, _)); + + Future<vector<Offer>> offers; + + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + Offer offer = offers.get()[0]; + + // Start two tasks. + vector<TaskInfo> tasks; + + tasks.push_back(createTask( + offer.slave_id(), + Resources::parse("cpus:0.1;mem:32").get(), + "sleep 1000", + exec.id)); + + tasks.push_back(createTask( + offer.slave_id(), + Resources::parse("cpus:0.1;mem:32").get(), + "sleep 1000", + exec.id)); + + EXPECT_CALL(exec, launchTask(_, _)) + .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)) + .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); + + Future<TaskStatus> status1, status2, status3, status4; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status1)) + .WillOnce(FutureArg<1>(&status2)) + .WillOnce(FutureArg<1>(&status3)) + .WillOnce(FutureArg<1>(&status4)); + + driver.launchTasks(offer.id(), tasks); + + AWAIT_READY(status1); + EXPECT_EQ(TASK_RUNNING, status1.get().state()); + + AWAIT_READY(status2); + EXPECT_EQ(TASK_RUNNING, status2.get().state()); + + // Set up the containerizer so the next update() will fail. + EXPECT_CALL(containerizer, update(_, _)) + .WillOnce(Return(Failure("update() failed"))) + .WillRepeatedly(Return(Nothing())); + + EXPECT_CALL(exec, killTask(_, _)) + .WillOnce(SendStatusUpdateFromTaskID(TASK_KILLED)); + + Future<Nothing> executorLost; + EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _)) + .WillOnce(FutureSatisfy(&executorLost)); + + // Kill one of the tasks. The failed update should result in the + // second task going lost when the container is destroyed. + driver.killTask(tasks[0].task_id()); + + AWAIT_READY(status3); + EXPECT_EQ(TASK_KILLED, status3.get().state()); + EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, status3.get().source()); + + AWAIT_READY(status4); + EXPECT_EQ(TASK_GONE, status4->state()); + EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status4->source()); + EXPECT_EQ(TaskStatus::REASON_CONTAINER_UPDATE_FAILED, status4->reason()); + + AWAIT_READY(executorLost); + + JSON::Object stats = Metrics(); + EXPECT_EQ(1, stats.values["slave/tasks_gone"]); + EXPECT_EQ(0, stats.values["slave/tasks_lost"]); + driver.stop(); driver.join(); }