Repository: mesos
Updated Branches:
  refs/heads/master a99de1a98 -> 90415006d


Send status update acknowledgements through the master.

Review: https://reviews.apache.org/r/22592


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/90415006
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/90415006
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/90415006

Branch: refs/heads/master
Commit: 90415006db79f127aa26a41f4bbc6d327f92c3dc
Parents: a99de1a
Author: Benjamin Mahler <bmah...@twitter.com>
Authored: Thu Jun 12 16:34:33 2014 -0700
Committer: Benjamin Mahler <bmah...@twitter.com>
Committed: Tue Jun 17 14:38:20 2014 -0700

----------------------------------------------------------------------
 src/sched/sched.cpp                 | 13 +++--
 src/tests/fault_tolerance_tests.cpp | 16 +++---
 src/tests/master_tests.cpp          | 96 --------------------------------
 src/tests/slave_tests.cpp           | 28 +++-------
 4 files changed, 23 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/90415006/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 6e14f1c..aa19735 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -634,16 +634,21 @@ protected:
       return;
     }
 
-    // Acknowledge the status update.
-    if (pid != UPID()) {
-      VLOG(2) << "Sending ACK for status update " << update << " to " << pid;
+    // Don't acknowledge updates created by the driver or master.
+    if (from != UPID() && pid != UPID()) {
+      // We drop updates while we're disconnected.
+      CHECK(connected);
+      CHECK_SOME(master);
+
+      VLOG(2) << "Sending ACK for status update " << update
+              << " to " << master.get();
 
       StatusUpdateAcknowledgementMessage message;
       message.mutable_framework_id()->MergeFrom(framework.id());
       message.mutable_slave_id()->MergeFrom(update.slave_id());
       message.mutable_task_id()->MergeFrom(update.status().task_id());
       message.set_uuid(update.uuid());
-      send(pid, message);
+      send(master.get(), message);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/90415006/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp 
b/src/tests/fault_tolerance_tests.cpp
index 4c6a5c4..5469c17 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -1899,11 +1899,18 @@ TEST_F(FaultToleranceTest, 
SlaveReregisterTerminatedExecutor)
   EXPECT_CALL(sched, statusUpdate(&driver, _))
     .WillOnce(FutureArg<1>(&status));
 
+  Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgementMessage
+    = FUTURE_PROTOBUF(
+        StatusUpdateAcknowledgementMessage(), master.get(), slave.get());
+
   driver.start();
 
   AWAIT_READY(status);
   EXPECT_EQ(TASK_RUNNING, status.get().state());
 
+  // Make sure the acknowledgement reaches the slave.
+  AWAIT_READY(statusUpdateAcknowledgementMessage);
+
   // Drop the TASK_FINISHED status update sent to the master.
   Future<StatusUpdateMessage> statusUpdateMessage =
     DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
@@ -1911,8 +1918,6 @@ TEST_F(FaultToleranceTest, 
SlaveReregisterTerminatedExecutor)
   Future<ExitedExecutorMessage> executorExitedMessage =
     FUTURE_PROTOBUF(ExitedExecutorMessage(), _, _);
 
-  Clock::pause();
-
   TaskStatus finishedStatus;
   finishedStatus = status.get();
   finishedStatus.set_state(TASK_FINISHED);
@@ -1928,15 +1933,8 @@ TEST_F(FaultToleranceTest, 
SlaveReregisterTerminatedExecutor)
   EXPECT_CALL(sched, statusUpdate(&driver, _))
     .WillOnce(FutureArg<1>(&status2));
 
-  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
-    FUTURE_PROTOBUF(SlaveReregisteredMessage(), master.get(), slave.get());
-
   detector.appoint(master.get());
 
-  AWAIT_READY(slaveReregisteredMessage);
-
-  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
-
   AWAIT_READY(status2);
   EXPECT_EQ(TASK_FINISHED, status2.get().state());
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/90415006/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index edcaa75..a60709f 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -1402,102 +1402,6 @@ TEST_F(MasterTest, LaunchDuplicateOfferTest)
 }
 
 
-// This test runs a task but intercepts the scheduler driver's
-// acknowledgement messages to the slave and instead sends them to
-// the master. This test is necessary to test that the
-// acknowledgement handling in the master is correct, but once the
-// driver sends these messages we should remove/update this test!
-TEST_F(MasterTest, StatusUpdateAcknowledgementsThroughMaster)
-{
-  Try<PID<Master> > master = StartMaster();
-  ASSERT_SOME(master);
-
-  MockExecutor exec(DEFAULT_EXECUTOR_ID);
-
-  Try<PID<Slave> > slave = StartSlave(&exec);
-  ASSERT_SOME(slave);
-
-  MockScheduler sched;
-  MesosSchedulerDriver schedDriver(
-      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
-  EXPECT_CALL(sched, registered(&schedDriver, _, _))
-    .Times(1);
-
-  Future<vector<Offer> > offers;
-  EXPECT_CALL(sched, resourceOffers(&schedDriver, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
-
-  // We need to grab this message to get the scheduler's pid.
-  Future<process::Message> frameworkRegisteredMessage = FUTURE_MESSAGE(
-      Eq(FrameworkRegisteredMessage().GetTypeName()), master.get(), _);
-
-  schedDriver.start();
-
-  AWAIT_READY(frameworkRegisteredMessage);
-  const process::UPID schedulerPid = frameworkRegisteredMessage.get().to;
-
-  AWAIT_READY(offers);
-  EXPECT_NE(0u, offers.get().size());
-
-  TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
-
-  vector<TaskInfo> tasks;
-  tasks.push_back(task);
-
-  Future<ExecutorDriver*> execDriver;
-  EXPECT_CALL(exec, registered(_, _, _, _))
-    .WillOnce(FutureArg<0>(&execDriver));
-
-  EXPECT_CALL(exec, launchTask(_, _))
-    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
-
-  Future<TaskStatus> update;
-  EXPECT_CALL(sched, statusUpdate(&schedDriver, _))
-    .WillOnce(FutureArg<1>(&update));
-
-  // Pause the clock to prevent status update retries on the slave.
-  Clock::pause();
-
-  // Intercept the status update acknowledgement and send it to the
-  // master instead!
-  Future<StatusUpdateAcknowledgementMessage> acknowledgementMessage =
-    DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(),
-                  schedulerPid,
-                  slave.get());
-
-  schedDriver.launchTasks(offers.get()[0].id(), tasks);
-
-  AWAIT_READY(update);
-  EXPECT_EQ(TASK_RUNNING, update.get().state());
-
-  AWAIT_READY(acknowledgementMessage);
-
-  Future<Nothing> _statusUpdateAcknowledgement =
-    FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdateAcknowledgement);
-
-  // Send the acknowledgement to the master.
-  process::post(schedulerPid, master.get(), acknowledgementMessage.get());
-
-  // Ensure the slave receives and properly handles the ACK.
-  // Clock::settle() ensures that the slave successfully
-  // executes Slave::_statusUpdateAcknowledgement().
-  AWAIT_READY(_statusUpdateAcknowledgement);
-  Clock::settle();
-
-  Clock::resume();
-
-  EXPECT_CALL(exec, shutdown(_))
-    .Times(AtMost(1));
-
-  schedDriver.stop();
-  schedDriver.join();
-
-  Shutdown();
-}
-
-
 TEST_F(MasterTest, MetricsInStatsEndpoint)
 {
   Try<PID<Master> > master = StartMaster();

http://git-wip-us.apache.org/repos/asf/mesos/blob/90415006/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index aaf509d..9178e01 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -541,8 +541,6 @@ TEST_F(SlaveTest, 
DISABLED_ROOT_RunTaskWithCommandInfoWithUser)
 
 // This test ensures that a status update acknowledgement from a
 // non-leading master is ignored.
-// TODO(bmahler): This test will need to be updated once all
-// acknowledgements go through the master.
 TEST_F(SlaveTest, IgnoreNonLeaderStatusUpdateAcknowledgement)
 {
   Try<PID<Master> > master = StartMaster();
@@ -596,13 +594,16 @@ TEST_F(SlaveTest, 
IgnoreNonLeaderStatusUpdateAcknowledgement)
   // Pause the clock to prevent status update retries on the slave.
   Clock::pause();
 
-  // Intercept the status update acknowledgement and send it to the
-  // master instead!
+  // Intercept the acknowledgement sent to the slave so that we can
+  // spoof the master's pid.
   Future<StatusUpdateAcknowledgementMessage> acknowledgementMessage =
     DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(),
-                  schedulerPid,
+                  master.get(),
                   slave.get());
 
+  Future<Nothing> _statusUpdateAcknowledgement =
+    FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdateAcknowledgement);
+
   schedDriver.launchTasks(offers.get()[0].id(), tasks);
 
   AWAIT_READY(update);
@@ -610,25 +611,10 @@ TEST_F(SlaveTest, 
IgnoreNonLeaderStatusUpdateAcknowledgement)
 
   AWAIT_READY(acknowledgementMessage);
 
-  // Intercept the status update acknowledgement from the master
-  // to the slave so that we can spoof a non-leading master pid.
-  Future<StatusUpdateAcknowledgementMessage> acknowledgementMessage2 =
-    DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(),
-                  master.get(),
-                  slave.get());
-
-  // Send the acknowledgment to the master.
-  process::post(schedulerPid, master.get(), acknowledgementMessage.get());
-
-  AWAIT_READY(acknowledgementMessage2);
-
-  Future<Nothing> _statusUpdateAcknowledgement =
-    FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdateAcknowledgement);
-
   // Send the acknowledgement to the slave with a non-leading master.
   process::post(
-      schedulerPid,
       process::UPID("master@localhost:1"),
+      slave.get(),
       acknowledgementMessage.get());
 
   // Make sure the acknowledgement was ignored.

Reply via email to