This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository

commit c076c8ce286abfb34de3d962a7ca1601d9494919
Author: Benjamin Bannier <>
AuthorDate: Mon Jul 15 10:26:28 2019 -0700

    Added test for agent to leave draining state on its own.
    This patch adds a test which confirms that the agent leaves a draining
    state on its own once all frameworks on the agent have no more pending
    tasks and all their executors have neither launched or queued tasks.
    The test uses the fact that the agent rejects task launches while
 src/tests/slave_tests.cpp | 188 ++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 188 insertions(+)

diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 95f7780..1ed59ca 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -12223,6 +12223,194 @@ TEST_F(SlaveTest, DrainAgentKillsPendingTask)
+// This test validates that a draining agent fails further task launch
+// attempts to protect its internal draining invariants, and that the
+// agent leaves the draining state on its own once all tasks have
+// terminated and their status updates have been acknowledged.
+TEST_F(SlaveTest, DrainingAgentRejectLaunch)
+  Clock::pause();
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+  StandaloneMasterDetector detector(master.get()->pid);
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
+  ASSERT_SOME(slave);
+  Clock::advance(slaveFlags.registration_backoff_factor);
+  AWAIT_READY(updateSlaveMessage);
+  // Register a scheduler to launch tasks.
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+  Future<v1::scheduler::Event::Offers> offers1;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers1))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid, ContentType::PROTOBUF, scheduler);
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+  AWAIT_READY(offers1);
+  ASSERT_FALSE(offers1->offers().empty());
+  v1::Offer offer = offers1->offers(0);
+  v1::AgentID agentId = offer.agent_id();
+  // Launch a task. When the agent is put into draining state this task will be
+  // killed, but we will leave the draining state open even after the task is
+  // killed by not acknowledging the terminal task status update.
+  v1::Resources resources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+  v1::TaskInfo taskInfo1 =
+    v1::createTask(agentId, resources, SLEEP_COMMAND(1000), None());
+  // We do not acknowledge the KILLED update to control
+  // when the agent finishes draining.
+  Future<v1::scheduler::Event::Update> runningUpdate1;
+  Future<v1::scheduler::Event::Update> killedUpdate1;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(v1::scheduler::SendAcknowledge(frameworkId, agentId)) // 
+    .WillOnce(DoAll(
+        v1::scheduler::SendAcknowledge(frameworkId, agentId),
+        FutureArg<1>(&runningUpdate1)))
+    .WillOnce(FutureArg<1>(&killedUpdate1));
+  mesos.send(
+      v1::createCallAccept(frameworkId, offer, {v1::LAUNCH({taskInfo1})}));
+  AWAIT_READY(runningUpdate1);
+  // Simulate the master sending a `DrainSlaveMessage` to the agent.
+  DurationInfo maxGracePeriod;
+  maxGracePeriod.set_nanoseconds(0);
+  DrainConfig drainConfig;
+  drainConfig.set_mark_gone(false);
+  drainConfig.mutable_max_grace_period()->CopyFrom(maxGracePeriod);
+  DrainSlaveMessage drainSlaveMessage;
+  drainSlaveMessage.mutable_config()->CopyFrom(drainConfig);
+  // Explicitly wait for the executor to be terminated.
+  Future<Nothing> executorTerminated =
+    FUTURE_DISPATCH(_, &Slave::executorTerminated);
+  process::post(master.get()->pid, slave.get()->pid, drainSlaveMessage);
+  // Wait until we have received the terminal task status update
+  // (which we did not acknowledge) before continuing. The agent will
+  // subsequentially be left in a draining state.
+  AWAIT_READY(killedUpdate1);
+  ASSERT_EQ(v1::TASK_KILLED, killedUpdate1->status().state());
+  Future<v1::scheduler::Event::Offers> offers2;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers2))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+  // Resume the clock so the containerizer can detect the terminated executor.
+  Clock::resume();
+  AWAIT_READY(executorTerminated);
+  Clock::pause();
+  Clock::settle();
+  Clock::advance(masterFlags.allocation_interval);
+  AWAIT_READY(offers2);
+  ASSERT_FALSE(offers2->offers().empty());
+  offer = offers2->offers(0);
+  agentId = offer.agent_id();
+  // Launch another task. Since the agent is in draining
+  // state the task will be rejected by the agent.
+  Future<v1::scheduler::Event::Update> lostUpdate;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(DoAll(
+        v1::scheduler::SendAcknowledge(frameworkId, agentId),
+        FutureArg<1>(&lostUpdate)));
+  v1::TaskInfo taskInfo2 =
+    v1::createTask(agentId, resources, SLEEP_COMMAND(1000), None());
+  mesos.send(
+      v1::createCallAccept(frameworkId, offer, {v1::LAUNCH({taskInfo2})}));
+  AWAIT_READY(lostUpdate);
+  ASSERT_EQ(taskInfo2.task_id(), lostUpdate->status().task_id());
+  ASSERT_EQ(v1::TASK_LOST, lostUpdate->status().state());
+      v1::TaskStatus::REASON_AGENT_DRAINING, lostUpdate->status().reason());
+  // Acknowledge the pending task status update. Once the acknowledgement has
+  // been processed the agent will leave its draining state and accept task
+  // launches again.
+  {
+    v1::scheduler::Call call;
+    call.set_type(v1::scheduler::Call::ACKNOWLEDGE);
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    v1::scheduler::Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+    acknowledge->mutable_agent_id()->CopyFrom(agentId);
+    acknowledge->set_uuid(killedUpdate1->status().uuid());
+    mesos.send(call);
+  }
+  Future<v1::scheduler::Event::Offers> offers3;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers3))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+  // Trigger another allocation.
+  Clock::advance(masterFlags.allocation_interval);
+  AWAIT_READY(offers3);
+  ASSERT_FALSE(offers3->offers().empty());
+  offer = offers3->offers(0);
+  agentId = offer.agent_id();
+  // The agent should have left its running state and now accept task launches.
+  Future<v1::scheduler::Event::Update> runningUpdate2;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(v1::scheduler::SendAcknowledge(frameworkId, agentId)) // 
+    .WillOnce(FutureArg<1>(&runningUpdate2));
+  mesos.send(
+      v1::createCallAccept(frameworkId, offer, {v1::LAUNCH({taskInfo2})}));
+  AWAIT_READY(runningUpdate2);
+  EXPECT_EQ(taskInfo2.task_id(), runningUpdate2->status().task_id());
+  EXPECT_EQ(v1::TASK_RUNNING, runningUpdate2->status().state());
 // This test verifies that if the agent recovers that it is in
 // draining state any tasks after the restart are killed.
 TEST_F(SlaveTest, CheckpointedDrainInfo)

Reply via email to