This is an automated email from the ASF dual-hosted git repository. josephwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 5124b290ddc368e2e7cc3d56173fb4b3137af620 Author: Joseph Wu <josep...@apache.org> AuthorDate: Wed Jul 24 15:45:22 2019 -0700 Moved master-side agent draining tests into a separate file. The test bodies were not changed, besides renaming the test class. Review: https://reviews.apache.org/r/71314 --- src/Makefile.am | 3 +- src/tests/CMakeLists.txt | 1 + src/tests/api_tests.cpp | 541 ----------------------------- src/tests/master_draining_tests.cpp | 662 ++++++++++++++++++++++++++++++++++++ 4 files changed, 665 insertions(+), 542 deletions(-) diff --git a/src/Makefile.am b/src/Makefile.am index a89cd61..577acfd 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -2608,7 +2608,8 @@ mesos_tests_SOURCES = \ tests/master_allocator_tests.cpp \ tests/master_authorization_tests.cpp \ tests/master_benchmarks.cpp \ - tests/master_contender_detector_tests.cpp \ + tests/master_contender_detector_tests.cpp \ + tests/master_draining_tests.cpp \ tests/master_load_tests.cpp \ tests/master_maintenance_tests.cpp \ tests/master_quota_tests.cpp \ diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index 04c552a..1e53b39 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -105,6 +105,7 @@ set(MESOS_TESTS_SRC hook_tests.cpp http_authentication_tests.cpp http_fault_tolerance_tests.cpp + master_draining_tests.cpp master_load_tests.cpp master_maintenance_tests.cpp master_slave_reconciliation_tests.cpp diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp index a735a20..bd207ea 100644 --- a/src/tests/api_tests.cpp +++ b/src/tests/api_tests.cpp @@ -5470,547 +5470,6 @@ TEST_P(MasterAPITest, OperationUpdatesUponUnreachable) } -// When an operator submits a DRAIN_AGENT call, the agent should kill all -// running tasks. -TEST_P(MasterAPITest, DrainAgent) -{ - Clock::pause(); - - master::Flags masterFlags = CreateMasterFlags(); - Try<Owned<cluster::Master>> master = StartMaster(masterFlags); - ASSERT_SOME(master); - - Future<SlaveRegisteredMessage> slaveRegisteredMessage = - FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); - - slave::Flags agentFlags = CreateSlaveFlags(); - Owned<MasterDetector> detector = master.get()->createDetector(); - Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags); - ASSERT_SOME(slave); - - Clock::advance(agentFlags.registration_backoff_factor); - - AWAIT_READY(slaveRegisteredMessage); - - auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); - - v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; - frameworkInfo.add_capabilities()->set_type( - v1::FrameworkInfo::Capability::PARTITION_AWARE); - - EXPECT_CALL(*scheduler, connected(_)) - .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)); - - Future<v1::scheduler::Event::Subscribed> subscribed; - EXPECT_CALL(*scheduler, subscribed(_, _)) - .WillOnce(FutureArg<1>(&subscribed)); - - EXPECT_CALL(*scheduler, heartbeat(_)) - .WillRepeatedly(Return()); // Ignore heartbeats. - - Future<v1::scheduler::Event::Offers> offers; - EXPECT_CALL(*scheduler, offers(_, _)) - .WillOnce(FutureArg<1>(&offers)) - .WillRepeatedly(Return()); - - auto mesos = std::make_shared<v1::scheduler::TestMesos>( - master.get()->pid, ContentType::PROTOBUF, scheduler); - - AWAIT_READY(subscribed); - v1::FrameworkID frameworkId(subscribed->framework_id()); - - AWAIT_READY(offers); - ASSERT_FALSE(offers->offers().empty()); - - const v1::Offer& offer = offers->offers(0); - const v1::AgentID& agentId = offer.agent_id(); - - Try<v1::Resources> resources = - v1::Resources::parse("cpus:0.1;mem:64;disk:64"); - - ASSERT_SOME(resources); - - v1::TaskInfo taskInfo = - v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000)); - - testing::Sequence updateSequence; - Future<v1::scheduler::Event::Update> startingUpdate; - Future<v1::scheduler::Event::Update> runningUpdate; - - EXPECT_CALL( - *scheduler, - update(_, AllOf( - TaskStatusUpdateTaskIdEq(taskInfo.task_id()), - TaskStatusUpdateStateEq(v1::TASK_STARTING)))) - .InSequence(updateSequence) - .WillOnce(DoAll( - FutureArg<1>(&startingUpdate), - v1::scheduler::SendAcknowledge(frameworkId, agentId))); - - EXPECT_CALL( - *scheduler, - update(_, AllOf( - TaskStatusUpdateTaskIdEq(taskInfo.task_id()), - TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) - .InSequence(updateSequence) - .WillOnce(DoAll( - FutureArg<1>(&runningUpdate), - v1::scheduler::SendAcknowledge(frameworkId, agentId))) - .WillRepeatedly(Return()); - - mesos->send( - v1::createCallAccept( - frameworkId, - offer, - {v1::LAUNCH({taskInfo})})); - - AWAIT_READY(startingUpdate); - AWAIT_READY(runningUpdate); - - Future<v1::scheduler::Event::Update> killedUpdate; - EXPECT_CALL( - *scheduler, - update(_, AllOf( - TaskStatusUpdateTaskIdEq(taskInfo.task_id()), - TaskStatusUpdateStateEq(v1::TASK_KILLED)))) - .WillOnce(DoAll( - FutureArg<1>(&killedUpdate), - v1::scheduler::SendAcknowledge(frameworkId, agentId))); - - Future<Nothing> registrarApplyDrained; - EXPECT_CALL(*master.get()->registrar, apply(_)) - .WillOnce(DoDefault()) - .WillOnce(DoAll( - FutureSatisfy(®istrarApplyDrained), - Invoke(master.get()->registrar.get(), &MockRegistrar::unmocked_apply))); - - ContentType contentType = GetParam(); - - { - v1::master::Call::DrainAgent drainAgent; - drainAgent.mutable_agent_id()->CopyFrom(agentId); - drainAgent.mutable_max_grace_period()->set_seconds(0); - - v1::master::Call call; - call.set_type(v1::master::Call::DRAIN_AGENT); - call.mutable_drain_agent()->CopyFrom(drainAgent); - - post(master.get()->pid, call, contentType); - } - - AWAIT_READY(killedUpdate); - AWAIT_READY(registrarApplyDrained); - - // Ensure that the update acknowledgement has been processed. - Clock::settle(); - - mesos::v1::DrainInfo drainInfo; - drainInfo.set_state(mesos::v1::DRAINED); - drainInfo.mutable_config()->set_mark_gone(false); - drainInfo.mutable_config()->mutable_max_grace_period()->set_nanoseconds(0); - - // Ensure that the agent's drain info is reflected in the master's - // GET_AGENTS response. - { - v1::master::Call call; - call.set_type(v1::master::Call::GET_AGENTS); - - Future<v1::master::Response> response = - post(master.get()->pid, call, contentType); - - AWAIT_READY(response); - ASSERT_TRUE(response->IsInitialized()); - ASSERT_EQ(v1::master::Response::GET_AGENTS, response->type()); - ASSERT_EQ(response->get_agents().agents_size(), 1); - - const v1::master::Response::GetAgents::Agent& agent = - response->get_agents().agents(0); - - EXPECT_EQ(agent.deactivated(), true); - - EXPECT_EQ(agent.drain_info(), drainInfo); - EXPECT_LT(0, agent.estimated_drain_start_time().nanoseconds()); - } - - // Ensure that the agent's drain info is reflected in the master's - // '/state' response. - { - Future<process::http::Response> response = process::http::get( - master.get()->pid, - "state", - None(), - createBasicAuthHeaders(DEFAULT_CREDENTIAL)); - - AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response); - AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); - - Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body); - ASSERT_SOME(parse); - - Result<JSON::Object> stateDrainInfo = parse->find<JSON::Object>( - "slaves[0].drain_info"); - - ASSERT_SOME_EQ(JSON::protobuf(drainInfo), stateDrainInfo); - - Result<JSON::Number> stateDrainStartTime = parse->find<JSON::Number>( - "slaves[0].estimated_drain_start_time_seconds"); - - ASSERT_SOME(stateDrainStartTime); - EXPECT_LT(0, stateDrainStartTime->as<int>()); - } - - // Ensure that the agent's drain info is reflected in the master's - // '/state-summary' response. - { - Future<process::http::Response> response = process::http::get( - master.get()->pid, - "state-summary", - None(), - createBasicAuthHeaders(DEFAULT_CREDENTIAL)); - - AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response); - AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); - - Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body); - ASSERT_SOME(parse); - - Result<JSON::Object> stateDrainInfo = parse->find<JSON::Object>( - "slaves[0].drain_info"); - - ASSERT_SOME_EQ(JSON::protobuf(drainInfo), stateDrainInfo); - - Result<JSON::Number> stateDrainStartTime = - parse->find<JSON::Number>("slaves[0].estimated_drain_start_time_seconds"); - - ASSERT_SOME(stateDrainStartTime); - EXPECT_LT(0, stateDrainStartTime->as<int>()); - } -} - - -// When an operator submits a DRAIN_AGENT call with 'mark_gone == true', the -// agent should kill all running tasks and the master should mark the agent gone -// once terminal ACKs have been received. -TEST_P(MasterAPITest, DrainAgentMarkGone) -{ - Clock::pause(); - - master::Flags masterFlags = CreateMasterFlags(); - Try<Owned<cluster::Master>> master = StartMaster(masterFlags); - ASSERT_SOME(master); - - Future<SlaveRegisteredMessage> slaveRegisteredMessage = - FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); - - slave::Flags agentFlags = CreateSlaveFlags(); - Owned<MasterDetector> detector = master.get()->createDetector(); - Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags); - ASSERT_SOME(slave); - - Clock::advance(agentFlags.registration_backoff_factor); - - AWAIT_READY(slaveRegisteredMessage); - - auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); - - v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; - frameworkInfo.add_capabilities()->set_type( - v1::FrameworkInfo::Capability::PARTITION_AWARE); - - EXPECT_CALL(*scheduler, connected(_)) - .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)); - - Future<v1::scheduler::Event::Subscribed> subscribed; - EXPECT_CALL(*scheduler, subscribed(_, _)) - .WillOnce(FutureArg<1>(&subscribed)); - - EXPECT_CALL(*scheduler, heartbeat(_)) - .WillRepeatedly(Return()); // Ignore heartbeats. - - Future<v1::scheduler::Event::Offers> offers; - EXPECT_CALL(*scheduler, offers(_, _)) - .WillOnce(FutureArg<1>(&offers)) - .WillRepeatedly(Return()); - - auto mesos = std::make_shared<v1::scheduler::TestMesos>( - master.get()->pid, ContentType::PROTOBUF, scheduler); - - AWAIT_READY(subscribed); - v1::FrameworkID frameworkId(subscribed->framework_id()); - - AWAIT_READY(offers); - ASSERT_FALSE(offers->offers().empty()); - - const v1::Offer& offer = offers->offers(0); - const v1::AgentID& agentId = offer.agent_id(); - - Try<v1::Resources> resources = - v1::Resources::parse("cpus:0.1;mem:64;disk:64"); - - ASSERT_SOME(resources); - - v1::TaskInfo taskInfo = - v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000)); - - testing::Sequence updateSequence; - Future<v1::scheduler::Event::Update> startingUpdate; - Future<v1::scheduler::Event::Update> runningUpdate; - - EXPECT_CALL( - *scheduler, - update(_, AllOf( - TaskStatusUpdateTaskIdEq(taskInfo.task_id()), - TaskStatusUpdateStateEq(v1::TASK_STARTING)))) - .InSequence(updateSequence) - .WillOnce(DoAll( - FutureArg<1>(&startingUpdate), - v1::scheduler::SendAcknowledge(frameworkId, agentId))); - - EXPECT_CALL( - *scheduler, - update(_, AllOf( - TaskStatusUpdateTaskIdEq(taskInfo.task_id()), - TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) - .InSequence(updateSequence) - .WillOnce(DoAll( - FutureArg<1>(&runningUpdate), - v1::scheduler::SendAcknowledge(frameworkId, agentId))) - .WillRepeatedly(Return()); - - mesos->send( - v1::createCallAccept( - frameworkId, - offer, - {v1::LAUNCH({taskInfo})})); - - AWAIT_READY(startingUpdate); - AWAIT_READY(runningUpdate); - - Future<v1::scheduler::Event::Update> goneUpdate; - EXPECT_CALL( - *scheduler, - update(_, AllOf( - TaskStatusUpdateTaskIdEq(taskInfo.task_id()), - TaskStatusUpdateStateEq(v1::TASK_GONE_BY_OPERATOR)))) - .WillOnce(DoAll( - FutureArg<1>(&goneUpdate), - v1::scheduler::SendAcknowledge(frameworkId, agentId))); - - // When the terminal ACK is received by the master, the agent should be marked - // gone, which entails sending a `ShutdownMessage`. - Future<ShutdownMessage> shutdownMessage = - FUTURE_PROTOBUF(ShutdownMessage(), _, _); - - ContentType contentType = GetParam(); - - { - v1::master::Call::DrainAgent drainAgent; - drainAgent.mutable_agent_id()->CopyFrom(agentId); - drainAgent.set_mark_gone(true); - - v1::master::Call call; - call.set_type(v1::master::Call::DRAIN_AGENT); - call.mutable_drain_agent()->CopyFrom(drainAgent); - - post(master.get()->pid, call, contentType); - } - - AWAIT_READY(goneUpdate); - AWAIT_READY(shutdownMessage); -} - - -// When an operator submits a DRAIN_AGENT call with an agent that has gone -// unreachable, the call should succeed, and the agent should be drained -// if/when it returns to the cluster. -TEST_P(MasterAPITest, DrainAgentUnreachable) -{ - Clock::pause(); - - master::Flags masterFlags = CreateMasterFlags(); - Try<Owned<cluster::Master>> master = StartMaster(masterFlags); - ASSERT_SOME(master); - - Future<SlaveRegisteredMessage> slaveRegisteredMessage = - FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); - - slave::Flags agentFlags = CreateSlaveFlags(); - Owned<MasterDetector> detector = master.get()->createDetector(); - Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags); - ASSERT_SOME(slave); - - Clock::advance(agentFlags.registration_backoff_factor); - - AWAIT_READY(slaveRegisteredMessage); - - auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); - - v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; - frameworkInfo.set_checkpoint(true); - frameworkInfo.add_capabilities()->set_type( - v1::FrameworkInfo::Capability::PARTITION_AWARE); - - EXPECT_CALL(*scheduler, connected(_)) - .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)); - - Future<v1::scheduler::Event::Subscribed> subscribed; - EXPECT_CALL(*scheduler, subscribed(_, _)) - .WillOnce(FutureArg<1>(&subscribed)); - - EXPECT_CALL(*scheduler, heartbeat(_)) - .WillRepeatedly(Return()); // Ignore heartbeats. - - Future<v1::scheduler::Event::Offers> offers; - EXPECT_CALL(*scheduler, offers(_, _)) - .WillOnce(FutureArg<1>(&offers)) - .WillRepeatedly(Return()); - - auto mesos = std::make_shared<v1::scheduler::TestMesos>( - master.get()->pid, ContentType::PROTOBUF, scheduler); - - AWAIT_READY(subscribed); - v1::FrameworkID frameworkId(subscribed->framework_id()); - - AWAIT_READY(offers); - ASSERT_FALSE(offers->offers().empty()); - - const v1::Offer& offer = offers->offers(0); - const v1::AgentID& agentId = offer.agent_id(); - - Try<v1::Resources> resources = - v1::Resources::parse("cpus:0.1;mem:64;disk:64"); - - ASSERT_SOME(resources); - - v1::TaskInfo taskInfo = - v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000)); - - testing::Sequence updateSequence; - Future<v1::scheduler::Event::Update> startingUpdate; - Future<v1::scheduler::Event::Update> runningUpdate; - Future<v1::scheduler::Event::Update> unreachableUpdate; - Future<v1::scheduler::Event::Update> runningUpdate2; - Future<v1::scheduler::Event::Update> killedUpdate; - - // Make absolutely sure the agent receives these two acknowledgements - // before forcing the agent offline. - Future<StatusUpdateAcknowledgementMessage> startingAck = - FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _); - Future<StatusUpdateAcknowledgementMessage> runningAck = - FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _); - - EXPECT_CALL( - *scheduler, - update(_, AllOf( - TaskStatusUpdateTaskIdEq(taskInfo.task_id()), - TaskStatusUpdateStateEq(v1::TASK_STARTING)))) - .InSequence(updateSequence) - .WillOnce(DoAll( - FutureArg<1>(&startingUpdate), - v1::scheduler::SendAcknowledge(frameworkId, agentId))); - - EXPECT_CALL( - *scheduler, - update(_, AllOf( - TaskStatusUpdateTaskIdEq(taskInfo.task_id()), - TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) - .InSequence(updateSequence) - .WillOnce(DoAll( - FutureArg<1>(&runningUpdate), - v1::scheduler::SendAcknowledge(frameworkId, agentId))); - - EXPECT_CALL( - *scheduler, - update(_, AllOf( - TaskStatusUpdateTaskIdEq(taskInfo.task_id()), - TaskStatusUpdateStateEq(v1::TASK_UNREACHABLE)))) - .InSequence(updateSequence) - .WillOnce(DoAll( - FutureArg<1>(&unreachableUpdate), - v1::scheduler::SendAcknowledge(frameworkId, agentId))); - - // When the agent is brought back, we expect a TASK_RUNNING followed by - // a TASK_KILLED (due to draining). - EXPECT_CALL( - *scheduler, - update(_, AllOf( - TaskStatusUpdateTaskIdEq(taskInfo.task_id()), - TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) - .InSequence(updateSequence) - .WillOnce(DoAll( - FutureArg<1>(&runningUpdate2), - v1::scheduler::SendAcknowledge(frameworkId, agentId))); - - EXPECT_CALL( - *scheduler, - update(_, AllOf( - TaskStatusUpdateTaskIdEq(taskInfo.task_id()), - TaskStatusUpdateStateEq(v1::TASK_KILLED)))) - .InSequence(updateSequence) - .WillOnce(DoAll( - FutureArg<1>(&killedUpdate), - v1::scheduler::SendAcknowledge(frameworkId, agentId))); - - mesos->send( - v1::createCallAccept( - frameworkId, - offer, - {v1::LAUNCH({taskInfo})})); - - AWAIT_READY(startingUpdate); - AWAIT_READY(startingAck); - AWAIT_READY(runningUpdate); - AWAIT_READY(runningAck); - - // Simulate an agent crash, so that it disconnects from the master. - slave.get()->terminate(); - slave->reset(); - - Clock::advance(masterFlags.agent_reregister_timeout); - AWAIT_READY(unreachableUpdate); - - // Start draining the unreachable agent. - ContentType contentType = GetParam(); - - { - v1::master::Call::DrainAgent drainAgent; - drainAgent.mutable_agent_id()->CopyFrom(agentId); - - v1::master::Call call; - call.set_type(v1::master::Call::DRAIN_AGENT); - call.mutable_drain_agent()->CopyFrom(drainAgent); - - post(master.get()->pid, call, contentType); - } - - // Bring the agent back. - Future<ReregisterExecutorMessage> reregisterExecutor = - FUTURE_PROTOBUF(ReregisterExecutorMessage(), _, _); - - Future<SlaveReregisteredMessage> slaveReregisteredMessage = - FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); - - Future<DrainSlaveMessage> drainSlaveMesage = - FUTURE_PROTOBUF(DrainSlaveMessage(), _, _); - - Try<Owned<cluster::Slave>> recoveredSlave = - StartSlave(detector.get(), agentFlags); - ASSERT_SOME(recoveredSlave); - - AWAIT_READY(reregisterExecutor); - Clock::advance(agentFlags.executor_reregistration_timeout); - Clock::settle(); - Clock::advance(agentFlags.registration_backoff_factor); - Clock::settle(); - AWAIT_READY(slaveReregisteredMessage); - - // The agent should be told to drain once it reregisters. - AWAIT_READY(drainSlaveMesage); - AWAIT_READY(runningUpdate2); - AWAIT_READY(killedUpdate); -} - - class AgentAPITest : public MesosTest, public WithParamInterface<ContentType> diff --git a/src/tests/master_draining_tests.cpp b/src/tests/master_draining_tests.cpp new file mode 100644 index 0000000..16d0c85 --- /dev/null +++ b/src/tests/master_draining_tests.cpp @@ -0,0 +1,662 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include <string> + +#include <mesos/http.hpp> + +#include <mesos/v1/mesos.hpp> +#include <mesos/v1/resources.hpp> + +#include <mesos/v1/master/master.hpp> + +#include <mesos/v1/scheduler/scheduler.hpp> + +#include <process/clock.hpp> +#include <process/future.hpp> +#include <process/gmock.hpp> +#include <process/gtest.hpp> +#include <process/http.hpp> +#include <process/owned.hpp> + +#include <stout/gtest.hpp> +#include <stout/nothing.hpp> +#include <stout/stringify.hpp> +#include <stout/try.hpp> + +#include "common/http.hpp" +#include "common/protobuf_utils.hpp" +#include "common/resources_utils.hpp" + +#include "messages/messages.hpp" + +#include "tests/cluster.hpp" +#include "tests/mesos.hpp" +#include "tests/resources_utils.hpp" + +namespace http = process::http; + + +using mesos::master::detector::MasterDetector; + +using process::Clock; +using process::Failure; +using process::Future; +using process::Owned; + +using testing::_; +using testing::AllOf; +using testing::DoAll; +using testing::Return; +using testing::Sequence; +using testing::WithParamInterface; + +namespace mesos { +namespace internal { +namespace tests { + +class MasterDrainingTest + : public MesosTest, + public WithParamInterface<ContentType> +{ +public: + master::Flags CreateMasterFlags() override + { + // Turn off periodic allocations to avoid the race between + // `HierarchicalAllocator::updateAvailable()` and periodic allocations. + master::Flags flags = MesosTest::CreateMasterFlags(); + flags.allocation_interval = Seconds(1000); + return flags; + } + + // Helper function to post a request to "/api/v1" master endpoint and return + // the response. + Future<v1::master::Response> post( + const process::PID<master::Master>& pid, + const v1::master::Call& call, + const ContentType& contentType, + const Credential& credential = DEFAULT_CREDENTIAL) + { + http::Headers headers = createBasicAuthHeaders(credential); + headers["Accept"] = stringify(contentType); + + return http::post( + pid, + "api/v1", + headers, + serialize(contentType, call), + stringify(contentType)) + .then([contentType](const http::Response& response) + -> Future<v1::master::Response> { + if (response.status != http::OK().status) { + return Failure("Unexpected response status " + response.status); + } + return deserialize<v1::master::Response>(contentType, response.body); + }); + } +}; + + +// These tests are parameterized by the content type of the HTTP request. +INSTANTIATE_TEST_CASE_P( + ContentType, + MasterDrainingTest, + ::testing::Values(ContentType::PROTOBUF, ContentType::JSON)); + + +// When an operator submits a DRAIN_AGENT call, the agent should kill all +// running tasks. +TEST_P(MasterDrainingTest, DrainAgent) +{ + Clock::pause(); + + master::Flags masterFlags = CreateMasterFlags(); + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + slave::Flags agentFlags = CreateSlaveFlags(); + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags); + ASSERT_SOME(slave); + + Clock::advance(agentFlags.registration_backoff_factor); + + AWAIT_READY(slaveRegisteredMessage); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; + frameworkInfo.add_capabilities()->set_type( + v1::FrameworkInfo::Capability::PARTITION_AWARE); + + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); + + auto mesos = std::make_shared<v1::scheduler::TestMesos>( + master.get()->pid, ContentType::PROTOBUF, scheduler); + + AWAIT_READY(subscribed); + v1::FrameworkID frameworkId(subscribed->framework_id()); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + const v1::Offer& offer = offers->offers(0); + const v1::AgentID& agentId = offer.agent_id(); + + Try<v1::Resources> resources = + v1::Resources::parse("cpus:0.1;mem:64;disk:64"); + + ASSERT_SOME(resources); + + v1::TaskInfo taskInfo = + v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000)); + + testing::Sequence updateSequence; + Future<v1::scheduler::Event::Update> startingUpdate; + Future<v1::scheduler::Event::Update> runningUpdate; + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo.task_id()), + TaskStatusUpdateStateEq(v1::TASK_STARTING)))) + .InSequence(updateSequence) + .WillOnce(DoAll( + FutureArg<1>(&startingUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo.task_id()), + TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) + .InSequence(updateSequence) + .WillOnce(DoAll( + FutureArg<1>(&runningUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillRepeatedly(Return()); + + mesos->send( + v1::createCallAccept( + frameworkId, + offer, + {v1::LAUNCH({taskInfo})})); + + AWAIT_READY(startingUpdate); + AWAIT_READY(runningUpdate); + + Future<v1::scheduler::Event::Update> killedUpdate; + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo.task_id()), + TaskStatusUpdateStateEq(v1::TASK_KILLED)))) + .WillOnce(DoAll( + FutureArg<1>(&killedUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + Future<Nothing> registrarApplyDrained; + EXPECT_CALL(*master.get()->registrar, apply(_)) + .WillOnce(DoDefault()) + .WillOnce(DoAll( + FutureSatisfy(®istrarApplyDrained), + Invoke(master.get()->registrar.get(), &MockRegistrar::unmocked_apply))); + + ContentType contentType = GetParam(); + + { + v1::master::Call::DrainAgent drainAgent; + drainAgent.mutable_agent_id()->CopyFrom(agentId); + drainAgent.mutable_max_grace_period()->set_seconds(0); + + v1::master::Call call; + call.set_type(v1::master::Call::DRAIN_AGENT); + call.mutable_drain_agent()->CopyFrom(drainAgent); + + post(master.get()->pid, call, contentType); + } + + AWAIT_READY(killedUpdate); + AWAIT_READY(registrarApplyDrained); + + // Ensure that the update acknowledgement has been processed. + Clock::settle(); + + mesos::v1::DrainInfo drainInfo; + drainInfo.set_state(mesos::v1::DRAINED); + drainInfo.mutable_config()->set_mark_gone(false); + drainInfo.mutable_config()->mutable_max_grace_period()->set_nanoseconds(0); + + // Ensure that the agent's drain info is reflected in the master's + // GET_AGENTS response. + { + v1::master::Call call; + call.set_type(v1::master::Call::GET_AGENTS); + + Future<v1::master::Response> response = + post(master.get()->pid, call, contentType); + + AWAIT_READY(response); + ASSERT_TRUE(response->IsInitialized()); + ASSERT_EQ(v1::master::Response::GET_AGENTS, response->type()); + ASSERT_EQ(response->get_agents().agents_size(), 1); + + const v1::master::Response::GetAgents::Agent& agent = + response->get_agents().agents(0); + + EXPECT_EQ(agent.deactivated(), true); + + EXPECT_EQ(agent.drain_info(), drainInfo); + EXPECT_LT(0, agent.estimated_drain_start_time().nanoseconds()); + } + + // Ensure that the agent's drain info is reflected in the master's + // '/state' response. + { + Future<process::http::Response> response = process::http::get( + master.get()->pid, + "state", + None(), + createBasicAuthHeaders(DEFAULT_CREDENTIAL)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response); + AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); + + Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body); + ASSERT_SOME(parse); + + Result<JSON::Object> stateDrainInfo = parse->find<JSON::Object>( + "slaves[0].drain_info"); + + ASSERT_SOME_EQ(JSON::protobuf(drainInfo), stateDrainInfo); + + Result<JSON::Number> stateDrainStartTime = parse->find<JSON::Number>( + "slaves[0].estimated_drain_start_time_seconds"); + + ASSERT_SOME(stateDrainStartTime); + EXPECT_LT(0, stateDrainStartTime->as<int>()); + } + + // Ensure that the agent's drain info is reflected in the master's + // '/state-summary' response. + { + Future<process::http::Response> response = process::http::get( + master.get()->pid, + "state-summary", + None(), + createBasicAuthHeaders(DEFAULT_CREDENTIAL)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response); + AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); + + Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body); + ASSERT_SOME(parse); + + Result<JSON::Object> stateDrainInfo = parse->find<JSON::Object>( + "slaves[0].drain_info"); + + ASSERT_SOME_EQ(JSON::protobuf(drainInfo), stateDrainInfo); + + Result<JSON::Number> stateDrainStartTime = + parse->find<JSON::Number>("slaves[0].estimated_drain_start_time_seconds"); + + ASSERT_SOME(stateDrainStartTime); + EXPECT_LT(0, stateDrainStartTime->as<int>()); + } +} + + +// When an operator submits a DRAIN_AGENT call with 'mark_gone == true', the +// agent should kill all running tasks and the master should mark the agent gone +// once terminal ACKs have been received. +TEST_P(MasterDrainingTest, DrainAgentMarkGone) +{ + Clock::pause(); + + master::Flags masterFlags = CreateMasterFlags(); + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + slave::Flags agentFlags = CreateSlaveFlags(); + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags); + ASSERT_SOME(slave); + + Clock::advance(agentFlags.registration_backoff_factor); + + AWAIT_READY(slaveRegisteredMessage); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; + frameworkInfo.add_capabilities()->set_type( + v1::FrameworkInfo::Capability::PARTITION_AWARE); + + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); + + auto mesos = std::make_shared<v1::scheduler::TestMesos>( + master.get()->pid, ContentType::PROTOBUF, scheduler); + + AWAIT_READY(subscribed); + v1::FrameworkID frameworkId(subscribed->framework_id()); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + const v1::Offer& offer = offers->offers(0); + const v1::AgentID& agentId = offer.agent_id(); + + Try<v1::Resources> resources = + v1::Resources::parse("cpus:0.1;mem:64;disk:64"); + + ASSERT_SOME(resources); + + v1::TaskInfo taskInfo = + v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000)); + + testing::Sequence updateSequence; + Future<v1::scheduler::Event::Update> startingUpdate; + Future<v1::scheduler::Event::Update> runningUpdate; + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo.task_id()), + TaskStatusUpdateStateEq(v1::TASK_STARTING)))) + .InSequence(updateSequence) + .WillOnce(DoAll( + FutureArg<1>(&startingUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo.task_id()), + TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) + .InSequence(updateSequence) + .WillOnce(DoAll( + FutureArg<1>(&runningUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillRepeatedly(Return()); + + mesos->send( + v1::createCallAccept( + frameworkId, + offer, + {v1::LAUNCH({taskInfo})})); + + AWAIT_READY(startingUpdate); + AWAIT_READY(runningUpdate); + + Future<v1::scheduler::Event::Update> goneUpdate; + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo.task_id()), + TaskStatusUpdateStateEq(v1::TASK_GONE_BY_OPERATOR)))) + .WillOnce(DoAll( + FutureArg<1>(&goneUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + // When the terminal ACK is received by the master, the agent should be marked + // gone, which entails sending a `ShutdownMessage`. + Future<ShutdownMessage> shutdownMessage = + FUTURE_PROTOBUF(ShutdownMessage(), _, _); + + ContentType contentType = GetParam(); + + { + v1::master::Call::DrainAgent drainAgent; + drainAgent.mutable_agent_id()->CopyFrom(agentId); + drainAgent.set_mark_gone(true); + + v1::master::Call call; + call.set_type(v1::master::Call::DRAIN_AGENT); + call.mutable_drain_agent()->CopyFrom(drainAgent); + + post(master.get()->pid, call, contentType); + } + + AWAIT_READY(goneUpdate); + AWAIT_READY(shutdownMessage); +} + + +// When an operator submits a DRAIN_AGENT call with an agent that has gone +// unreachable, the call should succeed, and the agent should be drained +// if/when it returns to the cluster. +TEST_P(MasterDrainingTest, DrainAgentUnreachable) +{ + Clock::pause(); + + master::Flags masterFlags = CreateMasterFlags(); + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + slave::Flags agentFlags = CreateSlaveFlags(); + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags); + ASSERT_SOME(slave); + + Clock::advance(agentFlags.registration_backoff_factor); + + AWAIT_READY(slaveRegisteredMessage); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_checkpoint(true); + frameworkInfo.add_capabilities()->set_type( + v1::FrameworkInfo::Capability::PARTITION_AWARE); + + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); + + auto mesos = std::make_shared<v1::scheduler::TestMesos>( + master.get()->pid, ContentType::PROTOBUF, scheduler); + + AWAIT_READY(subscribed); + v1::FrameworkID frameworkId(subscribed->framework_id()); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + const v1::Offer& offer = offers->offers(0); + const v1::AgentID& agentId = offer.agent_id(); + + Try<v1::Resources> resources = + v1::Resources::parse("cpus:0.1;mem:64;disk:64"); + + ASSERT_SOME(resources); + + v1::TaskInfo taskInfo = + v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000)); + + testing::Sequence updateSequence; + Future<v1::scheduler::Event::Update> startingUpdate; + Future<v1::scheduler::Event::Update> runningUpdate; + Future<v1::scheduler::Event::Update> unreachableUpdate; + Future<v1::scheduler::Event::Update> runningUpdate2; + Future<v1::scheduler::Event::Update> killedUpdate; + + // Make absolutely sure the agent receives these two acknowledgements + // before forcing the agent offline. + Future<StatusUpdateAcknowledgementMessage> startingAck = + FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _); + Future<StatusUpdateAcknowledgementMessage> runningAck = + FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo.task_id()), + TaskStatusUpdateStateEq(v1::TASK_STARTING)))) + .InSequence(updateSequence) + .WillOnce(DoAll( + FutureArg<1>(&startingUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo.task_id()), + TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) + .InSequence(updateSequence) + .WillOnce(DoAll( + FutureArg<1>(&runningUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo.task_id()), + TaskStatusUpdateStateEq(v1::TASK_UNREACHABLE)))) + .InSequence(updateSequence) + .WillOnce(DoAll( + FutureArg<1>(&unreachableUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + // When the agent is brought back, we expect a TASK_RUNNING followed by + // a TASK_KILLED (due to draining). + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo.task_id()), + TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) + .InSequence(updateSequence) + .WillOnce(DoAll( + FutureArg<1>(&runningUpdate2), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo.task_id()), + TaskStatusUpdateStateEq(v1::TASK_KILLED)))) + .InSequence(updateSequence) + .WillOnce(DoAll( + FutureArg<1>(&killedUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + mesos->send( + v1::createCallAccept( + frameworkId, + offer, + {v1::LAUNCH({taskInfo})})); + + AWAIT_READY(startingUpdate); + AWAIT_READY(startingAck); + AWAIT_READY(runningUpdate); + AWAIT_READY(runningAck); + + // Simulate an agent crash, so that it disconnects from the master. + slave.get()->terminate(); + slave->reset(); + + Clock::advance(masterFlags.agent_reregister_timeout); + AWAIT_READY(unreachableUpdate); + + // Start draining the unreachable agent. + ContentType contentType = GetParam(); + + { + v1::master::Call::DrainAgent drainAgent; + drainAgent.mutable_agent_id()->CopyFrom(agentId); + + v1::master::Call call; + call.set_type(v1::master::Call::DRAIN_AGENT); + call.mutable_drain_agent()->CopyFrom(drainAgent); + + post(master.get()->pid, call, contentType); + } + + // Bring the agent back. + Future<ReregisterExecutorMessage> reregisterExecutor = + FUTURE_PROTOBUF(ReregisterExecutorMessage(), _, _); + + Future<SlaveReregisteredMessage> slaveReregisteredMessage = + FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); + + Future<DrainSlaveMessage> drainSlaveMesage = + FUTURE_PROTOBUF(DrainSlaveMessage(), _, _); + + Try<Owned<cluster::Slave>> recoveredSlave = + StartSlave(detector.get(), agentFlags); + ASSERT_SOME(recoveredSlave); + + AWAIT_READY(reregisterExecutor); + Clock::advance(agentFlags.executor_reregistration_timeout); + Clock::settle(); + Clock::advance(agentFlags.registration_backoff_factor); + Clock::settle(); + AWAIT_READY(slaveReregisteredMessage); + + // The agent should be told to drain once it reregisters. + AWAIT_READY(drainSlaveMesage); + AWAIT_READY(runningUpdate2); + AWAIT_READY(killedUpdate); +} + +} // namespace tests { +} // namespace internal { +} // namespace mesos {