Repository: mesos Updated Branches: refs/heads/master a99de1a98 -> 90415006d
Send status update acknowledgements through the master. Review: https://reviews.apache.org/r/22592 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/90415006 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/90415006 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/90415006 Branch: refs/heads/master Commit: 90415006db79f127aa26a41f4bbc6d327f92c3dc Parents: a99de1a Author: Benjamin Mahler <bmah...@twitter.com> Authored: Thu Jun 12 16:34:33 2014 -0700 Committer: Benjamin Mahler <bmah...@twitter.com> Committed: Tue Jun 17 14:38:20 2014 -0700 ---------------------------------------------------------------------- src/sched/sched.cpp | 13 +++-- src/tests/fault_tolerance_tests.cpp | 16 +++--- src/tests/master_tests.cpp | 96 -------------------------------- src/tests/slave_tests.cpp | 28 +++------- 4 files changed, 23 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/90415006/src/sched/sched.cpp ---------------------------------------------------------------------- diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp index 6e14f1c..aa19735 100644 --- a/src/sched/sched.cpp +++ b/src/sched/sched.cpp @@ -634,16 +634,21 @@ protected: return; } - // Acknowledge the status update. - if (pid != UPID()) { - VLOG(2) << "Sending ACK for status update " << update << " to " << pid; + // Don't acknowledge updates created by the driver or master. + if (from != UPID() && pid != UPID()) { + // We drop updates while we're disconnected. + CHECK(connected); + CHECK_SOME(master); + + VLOG(2) << "Sending ACK for status update " << update + << " to " << master.get(); StatusUpdateAcknowledgementMessage message; message.mutable_framework_id()->MergeFrom(framework.id()); message.mutable_slave_id()->MergeFrom(update.slave_id()); message.mutable_task_id()->MergeFrom(update.status().task_id()); message.set_uuid(update.uuid()); - send(pid, message); + send(master.get(), message); } } http://git-wip-us.apache.org/repos/asf/mesos/blob/90415006/src/tests/fault_tolerance_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp index 4c6a5c4..5469c17 100644 --- a/src/tests/fault_tolerance_tests.cpp +++ b/src/tests/fault_tolerance_tests.cpp @@ -1899,11 +1899,18 @@ TEST_F(FaultToleranceTest, SlaveReregisterTerminatedExecutor) EXPECT_CALL(sched, statusUpdate(&driver, _)) .WillOnce(FutureArg<1>(&status)); + Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgementMessage + = FUTURE_PROTOBUF( + StatusUpdateAcknowledgementMessage(), master.get(), slave.get()); + driver.start(); AWAIT_READY(status); EXPECT_EQ(TASK_RUNNING, status.get().state()); + // Make sure the acknowledgement reaches the slave. + AWAIT_READY(statusUpdateAcknowledgementMessage); + // Drop the TASK_FINISHED status update sent to the master. Future<StatusUpdateMessage> statusUpdateMessage = DROP_PROTOBUF(StatusUpdateMessage(), _, master.get()); @@ -1911,8 +1918,6 @@ TEST_F(FaultToleranceTest, SlaveReregisterTerminatedExecutor) Future<ExitedExecutorMessage> executorExitedMessage = FUTURE_PROTOBUF(ExitedExecutorMessage(), _, _); - Clock::pause(); - TaskStatus finishedStatus; finishedStatus = status.get(); finishedStatus.set_state(TASK_FINISHED); @@ -1928,15 +1933,8 @@ TEST_F(FaultToleranceTest, SlaveReregisterTerminatedExecutor) EXPECT_CALL(sched, statusUpdate(&driver, _)) .WillOnce(FutureArg<1>(&status2)); - Future<SlaveReregisteredMessage> slaveReregisteredMessage = - FUTURE_PROTOBUF(SlaveReregisteredMessage(), master.get(), slave.get()); - detector.appoint(master.get()); - AWAIT_READY(slaveReregisteredMessage); - - Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN); - AWAIT_READY(status2); EXPECT_EQ(TASK_FINISHED, status2.get().state()); http://git-wip-us.apache.org/repos/asf/mesos/blob/90415006/src/tests/master_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp index edcaa75..a60709f 100644 --- a/src/tests/master_tests.cpp +++ b/src/tests/master_tests.cpp @@ -1402,102 +1402,6 @@ TEST_F(MasterTest, LaunchDuplicateOfferTest) } -// This test runs a task but intercepts the scheduler driver's -// acknowledgement messages to the slave and instead sends them to -// the master. This test is necessary to test that the -// acknowledgement handling in the master is correct, but once the -// driver sends these messages we should remove/update this test! -TEST_F(MasterTest, StatusUpdateAcknowledgementsThroughMaster) -{ - Try<PID<Master> > master = StartMaster(); - ASSERT_SOME(master); - - MockExecutor exec(DEFAULT_EXECUTOR_ID); - - Try<PID<Slave> > slave = StartSlave(&exec); - ASSERT_SOME(slave); - - MockScheduler sched; - MesosSchedulerDriver schedDriver( - &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); - - EXPECT_CALL(sched, registered(&schedDriver, _, _)) - .Times(1); - - Future<vector<Offer> > offers; - EXPECT_CALL(sched, resourceOffers(&schedDriver, _)) - .WillOnce(FutureArg<1>(&offers)) - .WillRepeatedly(Return()); // Ignore subsequent offers. - - // We need to grab this message to get the scheduler's pid. - Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE( - Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _); - - schedDriver.start(); - - AWAIT_READY(frameworkRegisteredMessage); - const process::UPID schedulerPid = frameworkRegisteredMessage.get().to; - - AWAIT_READY(offers); - EXPECT_NE(0u, offers.get().size()); - - TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID); - - vector<TaskInfo> tasks; - tasks.push_back(task); - - Future<ExecutorDriver*> execDriver; - EXPECT_CALL(exec, registered(_, _, _, _)) - .WillOnce(FutureArg<0>(&execDriver)); - - EXPECT_CALL(exec, launchTask(_, _)) - .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); - - Future<TaskStatus> update; - EXPECT_CALL(sched, statusUpdate(&schedDriver, _)) - .WillOnce(FutureArg<1>(&update)); - - // Pause the clock to prevent status update retries on the slave. - Clock::pause(); - - // Intercept the status update acknowledgement and send it to the - // master instead! - Future<StatusUpdateAcknowledgementMessage> acknowledgementMessage = - DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), - schedulerPid, - slave.get()); - - schedDriver.launchTasks(offers.get()[0].id(), tasks); - - AWAIT_READY(update); - EXPECT_EQ(TASK_RUNNING, update.get().state()); - - AWAIT_READY(acknowledgementMessage); - - Future<Nothing> _statusUpdateAcknowledgement = - FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdateAcknowledgement); - - // Send the acknowledgement to the master. - process::post(schedulerPid, master.get(), acknowledgementMessage.get()); - - // Ensure the slave receives and properly handles the ACK. - // Clock::settle() ensures that the slave successfully - // executes Slave::_statusUpdateAcknowledgement(). - AWAIT_READY(_statusUpdateAcknowledgement); - Clock::settle(); - - Clock::resume(); - - EXPECT_CALL(exec, shutdown(_)) - .Times(AtMost(1)); - - schedDriver.stop(); - schedDriver.join(); - - Shutdown(); -} - - TEST_F(MasterTest, MetricsInStatsEndpoint) { Try<PID<Master> > master = StartMaster(); http://git-wip-us.apache.org/repos/asf/mesos/blob/90415006/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index aaf509d..9178e01 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -541,8 +541,6 @@ TEST_F(SlaveTest, DISABLED_ROOT_RunTaskWithCommandInfoWithUser) // This test ensures that a status update acknowledgement from a // non-leading master is ignored. -// TODO(bmahler): This test will need to be updated once all -// acknowledgements go through the master. TEST_F(SlaveTest, IgnoreNonLeaderStatusUpdateAcknowledgement) { Try<PID<Master> > master = StartMaster(); @@ -596,13 +594,16 @@ TEST_F(SlaveTest, IgnoreNonLeaderStatusUpdateAcknowledgement) // Pause the clock to prevent status update retries on the slave. Clock::pause(); - // Intercept the status update acknowledgement and send it to the - // master instead! + // Intercept the acknowledgement sent to the slave so that we can + // spoof the master's pid. Future<StatusUpdateAcknowledgementMessage> acknowledgementMessage = DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), - schedulerPid, + master.get(), slave.get()); + Future<Nothing> _statusUpdateAcknowledgement = + FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdateAcknowledgement); + schedDriver.launchTasks(offers.get()[0].id(), tasks); AWAIT_READY(update); @@ -610,25 +611,10 @@ TEST_F(SlaveTest, IgnoreNonLeaderStatusUpdateAcknowledgement) AWAIT_READY(acknowledgementMessage); - // Intercept the status update acknowledgement from the master - // to the slave so that we can spoof a non-leading master pid. - Future<StatusUpdateAcknowledgementMessage> acknowledgementMessage2 = - DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), - master.get(), - slave.get()); - - // Send the acknowledgment to the master. - process::post(schedulerPid, master.get(), acknowledgementMessage.get()); - - AWAIT_READY(acknowledgementMessage2); - - Future<Nothing> _statusUpdateAcknowledgement = - FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdateAcknowledgement); - // Send the acknowledgement to the slave with a non-leading master. process::post( - schedulerPid, process::UPID("master@localhost:1"), + slave.get(), acknowledgementMessage.get()); // Make sure the acknowledgement was ignored.