Repository: mesos Updated Branches: refs/heads/master ee3f1c74c -> 26f892b52
Send an unregister message when the slave is shutting itself down. Review: https://reviews.apache.org/r/22367 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/26f892b5 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/26f892b5 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/26f892b5 Branch: refs/heads/master Commit: 26f892b5274b30783f19803558a84483327c3b83 Parents: ee3f1c7 Author: Alexandra Sava <alexandrasav...@gmail.com> Authored: Mon Jul 14 17:59:13 2014 -0700 Committer: Benjamin Mahler <bmah...@twitter.com> Committed: Tue Jul 15 11:07:52 2014 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 15 ++++++++++----- src/master/master.hpp | 1 + src/slave/slave.cpp | 6 +++++- src/tests/slave_recovery_tests.cpp | 26 +++++++++++++++++++++----- 4 files changed, 37 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/26f892b5/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 86b147f..3ba8c33 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -3040,20 +3040,25 @@ void Master::__reregisterSlave(Slave* slave, const vector<Task>& tasks) } -void Master::unregisterSlave(const SlaveID& slaveId) +void Master::unregisterSlave(const UPID& from, const SlaveID& slaveId) { ++metrics.messages_unregister_slave; LOG(INFO) << "Asked to unregister slave " << slaveId; - // TODO(benh): Check that only the slave is asking to unregister? - if (slaves.activated.contains(slaveId)) { - removeSlave(slaves.activated[slaveId]); + Slave* slave = getSlave(slaveId); + + if (slave != NULL) { + if (slave->pid != from) { + LOG(WARNING) << "Ignoring unregister slave message from " << from + << " because it is not the slave " << slave->pid; + return; + } + removeSlave(slave); } } - // NOTE: We cannot use 'from' here to identify the slave as this is // now sent by the StatusUpdateManagerProcess and master itself when // it generates TASK_LOST messages. Only 'pid' can be used to identify http://git-wip-us.apache.org/repos/asf/mesos/blob/26f892b5/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 8641f2d..10bd95a 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -166,6 +166,7 @@ public: const std::vector<Archive::Framework>& completedFrameworks); void unregisterSlave( + const process::UPID& from, const SlaveID& slaveId); void statusUpdate( http://git-wip-us.apache.org/repos/asf/mesos/blob/26f892b5/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index e81abb2..d6cfe4d 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -529,7 +529,11 @@ void Slave::shutdown(const UPID& from, const string& message) LOG(INFO) << "Slave asked to shut down by " << from << (message.empty() ? "" : " because '" + message + "'"); } else { - LOG(INFO) << message << "; shutting down"; + LOG(INFO) << message << "; unregistering and shutting down"; + + UnregisterSlaveMessage message_; + message_.mutable_slave_id()->MergeFrom(info.id()); + send(master.get(), message_); } state = TERMINATING; http://git-wip-us.apache.org/repos/asf/mesos/blob/26f892b5/src/tests/slave_recovery_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp index 582f52d..6c27437 100644 --- a/src/tests/slave_recovery_tests.cpp +++ b/src/tests/slave_recovery_tests.cpp @@ -1818,15 +1818,22 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlaveSIGUSR1) vector<TaskInfo> tasks; tasks.push_back(task); // Long-running task. - Future<Nothing> statusUpdate; + Future<TaskStatus> status; EXPECT_CALL(sched, statusUpdate(_, _)) - .WillOnce(FutureSatisfy(&statusUpdate)) - .WillRepeatedly(Return()); + .WillOnce(FutureArg<1>(&status)); driver.launchTasks(offers.get()[0].id(), tasks); - // Wait for TASK_RUNNING update to be acknowledged. - AWAIT_READY(statusUpdate); + AWAIT_READY(status); + ASSERT_EQ(TASK_RUNNING, status.get().state()); + + Future<TaskStatus> status2; + EXPECT_CALL(sched, statusUpdate(_, _)) + .WillOnce(FutureArg<1>(&status2)); + + Future<Nothing> slaveLost; + EXPECT_CALL(sched, slaveLost(_, _)) + .WillOnce(FutureSatisfy(&slaveLost)); Future<Nothing> executorTerminated = FUTURE_DISPATCH(_, &Slave::executorTerminated); @@ -1834,12 +1841,21 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlaveSIGUSR1) Future<Nothing> signaled = FUTURE_DISPATCH(_, &Slave::signaled); + Future<UnregisterSlaveMessage> unregisterSlaveMessage = + FUTURE_PROTOBUF(UnregisterSlaveMessage(), slave.get(), master.get()); + // Send SIGUSR1 signal to the slave. kill(getpid(), SIGUSR1); AWAIT_READY(signaled); + AWAIT_READY(unregisterSlaveMessage); AWAIT_READY(executorTerminated); + // The master should send a TASK_LOST and slaveLost. + AWAIT_READY(status2); + ASSERT_EQ(TASK_LOST, status2.get().state()); + AWAIT_READY(slaveLost); + // Make sure the slave terminates. ASSERT_TRUE(process::wait(slave.get(), Seconds(10)));