This is an automated email from the ASF dual-hosted git repository. josephwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 1e3661982eba6da71a5ca8178472ef762d9fc780 Author: Joseph Wu <josep...@apache.org> AuthorDate: Wed Aug 7 09:02:01 2019 -0700 Added draining tests for empty agents. This splits the existing agent draining tests into two variants: 1) where the agent has nothing running, and 2) where the agent has one task running. Review: https://reviews.apache.org/r/71316 --- src/tests/master_draining_tests.cpp | 294 ++++++++++++++++++++++++++++++------ 1 file changed, 250 insertions(+), 44 deletions(-) diff --git a/src/tests/master_draining_tests.cpp b/src/tests/master_draining_tests.cpp index eae809f..674f5b5 100644 --- a/src/tests/master_draining_tests.cpp +++ b/src/tests/master_draining_tests.cpp @@ -42,6 +42,8 @@ #include "common/protobuf_utils.hpp" #include "common/resources_utils.hpp" +#include "master/registry_operations.hpp" + #include "messages/messages.hpp" #include "tests/cluster.hpp" @@ -69,12 +71,12 @@ namespace mesos { namespace internal { namespace tests { -class MasterDrainingTest +class MasterAlreadyDrainedTest : public MesosTest, public WithParamInterface<ContentType> { public: - // Creates a master, agent, framework, and launches one sleep task. + // Creates a master and agent. void SetUp() override { MesosTest::SetUp(); @@ -99,6 +101,251 @@ public: Clock::advance(agentFlags.registration_backoff_factor); AWAIT_READY(slaveRegisteredMessage); + agentId = evolve(slaveRegisteredMessage->slave_id()); + } + + void TearDown() override + { + slave.reset(); + detector.reset(); + master.reset(); + + Clock::resume(); + + MesosTest::TearDown(); + } + + master::Flags CreateMasterFlags() override + { + // Turn off periodic allocations to avoid the race between + // `HierarchicalAllocator::updateAvailable()` and periodic allocations. + master::Flags flags = MesosTest::CreateMasterFlags(); + flags.allocation_interval = Seconds(1000); + return flags; + } + + // Helper function to post a request to "/api/v1" master endpoint and return + // the response. + Future<http::Response> post( + const process::PID<master::Master>& pid, + const v1::master::Call& call, + const ContentType& contentType, + const Credential& credential = DEFAULT_CREDENTIAL) + { + http::Headers headers = createBasicAuthHeaders(credential); + headers["Accept"] = stringify(contentType); + + return http::post( + pid, + "api/v1", + headers, + serialize(contentType, call), + stringify(contentType)); + } + +protected: + master::Flags masterFlags; + Owned<cluster::Master> master; + Owned<MasterDetector> detector; + + slave::Flags agentFlags; + Owned<cluster::Slave> slave; + v1::AgentID agentId; +}; + + +// These tests are parameterized by the content type of the HTTP request. +INSTANTIATE_TEST_CASE_P( + ContentType, + MasterAlreadyDrainedTest, + ::testing::Values(ContentType::PROTOBUF, ContentType::JSON)); + + +// When an operator submits a DRAIN_AGENT call, the agent with nothing running +// should be immediately transitioned to the DRAINED state. +TEST_P(MasterAlreadyDrainedTest, DrainAgent) +{ + Future<Nothing> registrarApplyDrained; + EXPECT_CALL(*master->registrar, apply(_)) + .WillOnce(DoDefault()) + .WillOnce(DoAll( + FutureSatisfy(®istrarApplyDrained), + Invoke(master->registrar.get(), &MockRegistrar::unmocked_apply))); + + ContentType contentType = GetParam(); + + { + v1::master::Call::DrainAgent drainAgent; + drainAgent.mutable_agent_id()->CopyFrom(agentId); + drainAgent.mutable_max_grace_period()->set_seconds(10); + + v1::master::Call call; + call.set_type(v1::master::Call::DRAIN_AGENT); + call.mutable_drain_agent()->CopyFrom(drainAgent); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ( + http::OK().status, + post(master->pid, call, contentType)); + } + + AWAIT_READY(registrarApplyDrained); + + mesos::v1::DrainInfo drainInfo; + drainInfo.set_state(mesos::v1::DRAINED); + drainInfo.mutable_config()->set_mark_gone(false); + drainInfo.mutable_config()->mutable_max_grace_period() + ->set_nanoseconds(Seconds(10).ns()); + + // Ensure that the agent's drain info is reflected in the master's + // GET_AGENTS response. + { + v1::master::Call call; + call.set_type(v1::master::Call::GET_AGENTS); + + Future<http::Response> response = + post(master->pid, call, contentType); + AWAIT_ASSERT_RESPONSE_STATUS_EQ(http::OK().status, response); + + Try<v1::master::Response> getAgents = + deserialize<v1::master::Response>(contentType, response->body); + ASSERT_SOME(getAgents); + + ASSERT_EQ(v1::master::Response::GET_AGENTS, getAgents->type()); + ASSERT_EQ(getAgents->get_agents().agents_size(), 1); + + const v1::master::Response::GetAgents::Agent& agent = + getAgents->get_agents().agents(0); + + EXPECT_EQ(agent.deactivated(), true); + + EXPECT_EQ(agent.drain_info(), drainInfo); + EXPECT_LT(0, agent.estimated_drain_start_time().nanoseconds()); + } +} + + +// When an operator submits a DRAIN_AGENT call with 'mark_gone == true', +// and the agent is not running anything, the agent should immediately be +// marked gone. +TEST_P(MasterAlreadyDrainedTest, DrainAgentMarkGone) +{ + // When the terminal ACK is received by the master, the agent should be marked + // gone, which entails sending a `ShutdownMessage`. + Future<ShutdownMessage> shutdownMessage = + FUTURE_PROTOBUF(ShutdownMessage(), _, _); + + ContentType contentType = GetParam(); + + { + v1::master::Call::DrainAgent drainAgent; + drainAgent.mutable_agent_id()->CopyFrom(agentId); + drainAgent.set_mark_gone(true); + + v1::master::Call call; + call.set_type(v1::master::Call::DRAIN_AGENT); + call.mutable_drain_agent()->CopyFrom(drainAgent); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ( + http::OK().status, + post(master->pid, call, contentType)); + } + + AWAIT_READY(shutdownMessage); +} + + +// When an operator submits a DRAIN_AGENT call for an agent that has gone +// unreachable, the call should succeed, and the agent should be drained +// if/when it returns to the cluster. +TEST_P(MasterAlreadyDrainedTest, DrainAgentUnreachable) +{ + Future<Owned<master::RegistryOperation>> registrarApplyUnreachable; + EXPECT_CALL(*master->registrar, apply(_)) + .WillOnce(DoAll( + FutureArg<0>(®istrarApplyUnreachable), + Invoke(master->registrar.get(), &MockRegistrar::unmocked_apply))) + .WillRepeatedly(DoDefault()); + + // Simulate an agent crash, so that it disconnects from the master. + slave->terminate(); + slave.reset(); + + Clock::advance(masterFlags.agent_reregister_timeout); + AWAIT_READY(registrarApplyUnreachable); + ASSERT_NE( + nullptr, + dynamic_cast<master::MarkSlaveUnreachable*>( + registrarApplyUnreachable->get())); + + // Start draining the unreachable agent. + ContentType contentType = GetParam(); + + { + v1::master::Call::DrainAgent drainAgent; + drainAgent.mutable_agent_id()->CopyFrom(agentId); + + v1::master::Call call; + call.set_type(v1::master::Call::DRAIN_AGENT); + call.mutable_drain_agent()->CopyFrom(drainAgent); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ( + http::OK().status, + post(master->pid, call, contentType)); + } + + // Bring the agent back. + Future<SlaveReregisteredMessage> slaveReregisteredMessage = + FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); + + Future<DrainSlaveMessage> drainSlaveMesage = + FUTURE_PROTOBUF(DrainSlaveMessage(), _, _); + + Try<Owned<cluster::Slave>> recoveredSlave = + StartSlave(detector.get(), agentFlags); + ASSERT_SOME(recoveredSlave); + + Clock::advance(agentFlags.executor_reregistration_timeout); + Clock::settle(); + Clock::advance(agentFlags.registration_backoff_factor); + Clock::settle(); + AWAIT_READY(slaveReregisteredMessage); + + // The agent should be told to drain once it reregisters. + AWAIT_READY(drainSlaveMesage); + + // Ensure that the agent is marked as DRAINED in the master now. + { + v1::master::Call call; + call.set_type(v1::master::Call::GET_AGENTS); + + Future<http::Response> response = + post(master->pid, call, contentType); + AWAIT_ASSERT_RESPONSE_STATUS_EQ(http::OK().status, response); + + Try<v1::master::Response> getAgents = + deserialize<v1::master::Response>(contentType, response->body); + ASSERT_SOME(getAgents); + + ASSERT_EQ(v1::master::Response::GET_AGENTS, getAgents->type()); + ASSERT_EQ(getAgents->get_agents().agents_size(), 1); + + const v1::master::Response::GetAgents::Agent& agent = + getAgents->get_agents().agents(0); + + EXPECT_EQ(agent.deactivated(), true); + EXPECT_EQ(mesos::v1::DRAINED, agent.drain_info().state()); + } +} + + +class MasterDrainingTest + : public MasterAlreadyDrainedTest +{ +public: + // Creates a master, agent, framework, and launches one sleep task. + void SetUp() override + { + MasterAlreadyDrainedTest::SetUp(); // Create the framework. scheduler = std::make_shared<v1::MockHTTPScheduler>(); @@ -134,7 +381,6 @@ public: ASSERT_FALSE(offers->offers().empty()); const v1::Offer& offer = offers->offers(0); - agentId = offer.agent_id(); Try<v1::Resources> resources = v1::Resources::parse("cpus:0.1;mem:64;disk:64"); @@ -189,51 +435,11 @@ public: { mesos.reset(); scheduler.reset(); - slave.reset(); - detector.reset(); - master.reset(); - - Clock::resume(); - - MesosTest::TearDown(); - } - - master::Flags CreateMasterFlags() override - { - // Turn off periodic allocations to avoid the race between - // `HierarchicalAllocator::updateAvailable()` and periodic allocations. - master::Flags flags = MesosTest::CreateMasterFlags(); - flags.allocation_interval = Seconds(1000); - return flags; - } - - // Helper function to post a request to "/api/v1" master endpoint and return - // the response. - Future<http::Response> post( - const process::PID<master::Master>& pid, - const v1::master::Call& call, - const ContentType& contentType) - { - http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); - headers["Accept"] = stringify(contentType); - return http::post( - pid, - "api/v1", - headers, - serialize(contentType, call), - stringify(contentType)); + MasterAlreadyDrainedTest::TearDown(); } protected: - master::Flags masterFlags; - Owned<cluster::Master> master; - Owned<MasterDetector> detector; - - slave::Flags agentFlags; - Owned<cluster::Slave> slave; - v1::AgentID agentId; - std::shared_ptr<v1::MockHTTPScheduler> scheduler; v1::FrameworkInfo frameworkInfo; std::shared_ptr<v1::scheduler::TestMesos> mesos;