This is an automated email from the ASF dual-hosted git repository. asekretenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 495db0e983222a5038a26b93e0ca689f2ce009a0 Author: Andrei Sekretenko <asekrete...@mesosphere.com> AuthorDate: Wed Jan 29 15:24:32 2020 +0100 Converted ACCEPT to synchronous authorization. This patch converts ACCEPT call to synchronous authorization (see MESOS-10056), thus fixing race between ACCEPT and REVIVE (MESOS-10023) and removing potential for other similar races. It also moves authorization of scheduler API operations after their validation (thus fixing MESOS-10083) and effectively gets rid of the concept of a "task pending authorization". Tests are converted from mocking `Authorizer::authorized(...)` to mocking `Authorizer::provideObjectApprover(...)` as necessary. Review: https://reviews.apache.org/r/72098 --- src/master/authorization.cpp | 16 +- src/master/framework.cpp | 16 +- src/master/master.cpp | 478 +++++----------------- src/master/master.hpp | 4 +- src/tests/master_authorization_tests.cpp | 657 ++----------------------------- src/tests/master_tests.cpp | 103 ----- src/tests/reconciliation_tests.cpp | 86 ---- 7 files changed, 149 insertions(+), 1211 deletions(-) diff --git a/src/master/authorization.cpp b/src/master/authorization.cpp index 77719eb..6dfa59a 100644 --- a/src/master/authorization.cpp +++ b/src/master/authorization.cpp @@ -387,10 +387,20 @@ ostream& operator<<(ostream& stream, const ActionObject& actionObject) } switch (actionObject.action()) { - case authorization::RUN_TASK: + case authorization::RUN_TASK: { + const TaskInfo& task = object->task_info(); + const FrameworkInfo& framework = object->framework_info(); return stream - << "launch task " << object->task_info().task_id() - << " of framework " << object->framework_info().id(); + << "launch task " << task.task_id() + << " of framework " << framework.id() + << " under user '" + << (task.has_command() && task.command().has_user() + ? task.command().user() + : task.has_executor() && task.executor().command().has_user() + ? task.executor().command().user() + : framework.user()) + << "'"; + } case authorization::REGISTER_FRAMEWORK: return stream diff --git a/src/master/framework.cpp b/src/master/framework.cpp index ffcf367..7e46469 100644 --- a/src/master/framework.cpp +++ b/src/master/framework.cpp @@ -740,7 +740,21 @@ Try<bool> Framework::approved(const ActionObject& actionObject) const constexpr std::initializer_list<authorization::Action> SCHEDULER_API_ACTIONS{ - authorization::REGISTER_FRAMEWORK}; + authorization::REGISTER_FRAMEWORK, + authorization::RUN_TASK, + + authorization::UNRESERVE_RESOURCES, + authorization::RESERVE_RESOURCES, + + authorization::CREATE_VOLUME, + authorization::DESTROY_VOLUME, + authorization::RESIZE_VOLUME, + + authorization::CREATE_MOUNT_DISK, + authorization::CREATE_BLOCK_DISK, + authorization::DESTROY_MOUNT_DISK, + authorization::DESTROY_BLOCK_DISK, + authorization::DESTROY_RAW_DISK}; Future<Owned<ObjectApprovers>> Framework::createObjectApprovers( diff --git a/src/master/master.cpp b/src/master/master.cpp index 07685fe..b09ce8e 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -3721,8 +3721,6 @@ Future<bool> Master::authorize( *request.mutable_object() = *(std::move(actionObject).object()); } - // TODO(asekretenko): Use a background-refreshed ObjectApprover - // when they become available (see MESOS-10056). return authorizer.get()->authorized(request); } @@ -4317,126 +4315,18 @@ void Master::accept( } } - const Option<Principal> principal = framework->info.has_principal() - ? Principal(framework->info.principal()) - : Option<Principal>::none(); - - // TODO(asekretenko): Use background-refreshed ObjectApprovers - // instead of asynchronous authorization. - vector<Future<bool>> futures; - for (const Offer::Operation& operation : accept.operations()) { - switch (operation.type()) { - case Offer::Operation::LAUNCH: - case Offer::Operation::LAUNCH_GROUP: { - for (const TaskInfo& task : getOperationTasks(operation)) { - futures.emplace_back(authorize( - principal, ActionObject::taskLaunch(task, framework->info))); - } - break; - } - - // NOTE: When handling RESERVE and UNRESERVE operations, authorization - // will proceed even if no principal is specified, although currently - // resources cannot be reserved or unreserved unless a principal is - // provided. Any RESERVE/UNRESERVE operation with no associated principal - // will be found invalid when `validate()` is called in `_accept()` below. - - // The RESERVE operation allows a principal to reserve resources. - case Offer::Operation::RESERVE: { - futures.push_back(authorize( - principal, ActionObject::reserve(operation.reserve()))); - - break; - } - - // The UNRESERVE operation allows a principal to unreserve resources. - case Offer::Operation::UNRESERVE: { - futures.push_back(authorize( - principal, ActionObject::unreserve(operation.unreserve()))); - - break; - } - - // The CREATE operation allows the creation of a persistent volume. - case Offer::Operation::CREATE: { - futures.push_back(authorize( - principal, ActionObject::createVolume(operation.create()))); - - break; - } - - // The DESTROY operation allows the destruction of a persistent volume. - case Offer::Operation::DESTROY: { - futures.push_back(authorize( - principal, ActionObject::destroyVolume(operation.destroy()))); - - break; - } - - case Offer::Operation::GROW_VOLUME: { - futures.push_back(authorize( - principal, ActionObject::growVolume(operation.grow_volume()))); - - break; - } - - case Offer::Operation::SHRINK_VOLUME: { - futures.push_back(authorize( - principal, ActionObject::shrinkVolume(operation.shrink_volume()))); - - break; - } - - case Offer::Operation::CREATE_DISK: { - Try<ActionObject> actionObject = - ActionObject::createDisk(operation.create_disk()); - - if (actionObject.isError()) { - futures.push_back(Failure(actionObject.error())); - } else { - futures.push_back(authorize(principal, std::move(*actionObject))); - } - - break; - } - - case Offer::Operation::DESTROY_DISK: { - Try<ActionObject> actionObject = - ActionObject::destroyDisk(operation.destroy_disk()); - - if (actionObject.isError()) { - futures.push_back(Failure(actionObject.error())); - } else { - futures.push_back(authorize(principal, std::move(*actionObject))); - } - - break; - } - - case Offer::Operation::UNKNOWN: { - // TODO(vinod): Send an error event to the scheduler? - LOG(WARNING) << "Ignoring unknown operation"; - break; - } - } - } - - // Wait for all the tasks to be authorized. - await(futures) - .onAny(defer(self(), - &Master::_accept, - framework->id(), - slaveId, - std::move(accept), - lambda::_1)); + // TODO(asekretenko): Dismantle `_accept(...)` (which, before synchronous + // authorization was introduced, used to be a deferred continuation of ACCEPT + // call processing, but now is kept only for limiting variable scopes) and + // handle operations one-by-one. + _accept(framework->id(), slaveId, std::move(accept)); } void Master::_accept( const FrameworkID& frameworkId, const SlaveID& slaveId, - scheduler::Call::Accept&& accept, - const Future<vector<Future<bool>>>& _authorizations) + scheduler::Call::Accept&& accept) { auto discardOffers = [this](const RepeatedPtrField<OfferID>& ids) { for (const OfferID& offerId : ids) { @@ -4583,56 +4473,60 @@ void Master::_accept( // The order of the conversions is important and preserved. vector<ResourceConversion> conversions; - // The order of `authorizations` must match the order of the operations and/or - // tasks in `accept.operations()` as they are iterated through simultaneously. - CHECK_READY(_authorizations); - std::deque<Future<bool>> authorizations( - _authorizations->begin(), _authorizations->end()); - foreach (const Offer::Operation& operation, accept.operations()) { - switch (operation.type()) { - // The RESERVE operation allows a principal to reserve resources. - case Offer::Operation::RESERVE: { - CHECK(!authorizations.empty()); - Future<bool> authorization = authorizations.front(); - authorizations.pop_front(); + auto authorized_ = + [&framework, &operation](const ActionObject& actionObject) + -> Option<Error> { + const Try<bool> authorized = framework->approved(actionObject); + if (authorized.isError()) { + return Error( + "Failed to authorize principal '" + framework->info.principal() + + "' to perform " + Offer::Operation::Type_Name(operation.type()) + + ": " + authorized.error()); + } - CHECK(!authorization.isDiscarded()); + if (!*authorized) { + return Error( + "Principal '" + framework->info.principal() + + "' no authorized to " + stringify(actionObject)); + } - if (authorization.isFailed()) { - // TODO(greggomann): We may want to retry this failed authorization - // request rather than dropping it immediately. - drop(framework, - operation, - "Authorization of principal '" + framework->info.principal() + - "' to reserve resources failed: " + authorization.failure()); + return None(); + }; - continue; - } else if (!authorization.get()) { - drop(framework, - operation, - "Not authorized to reserve resources as '" + - framework->info.principal() + "'"); + auto authorized = overload( + authorized_, + [&authorized_](const vector<ActionObject>& actionObjects) { + for (const ActionObject& actionObject : actionObjects) { + const Option<Error> error = authorized_(actionObject); + if (error.isSome()) { + return error; + } + } + + return Option<Error>::none(); + }); - continue; - } + switch (operation.type()) { + // The RESERVE operation allows a principal to reserve resources. + case Offer::Operation::RESERVE: { Option<Principal> principal = framework->info.has_principal() ? Principal(framework->info.principal()) : Option<Principal>::none(); - // Make sure this reserve operation is valid. Option<Error> error = validation::operation::validate( operation.reserve(), principal, slave->capabilities, framework->info); + error = error.isSome() + ? Error(error->message + "; on agent " + stringify(*slave)) + : authorized(ActionObject::reserve(operation.reserve())); + if (error.isSome()) { - drop( - framework, - operation, - error->message + "; on agent " + stringify(*slave)); + drop(framework, operation, error->message); continue; } @@ -4669,35 +4563,13 @@ void Master::_accept( // The UNRESERVE operation allows a principal to unreserve resources. case Offer::Operation::UNRESERVE: { - CHECK(!authorizations.empty()); - Future<bool> authorization = authorizations.front(); - authorizations.pop_front(); - - CHECK(!authorization.isDiscarded()); - - if (authorization.isFailed()) { - // TODO(greggomann): We may want to retry this failed authorization - // request rather than dropping it immediately. - drop(framework, - operation, - "Authorization of principal '" + framework->info.principal() + - "' to unreserve resources failed: " + - authorization.failure()); - - continue; - } else if (!authorization.get()) { - drop(framework, - operation, - "Not authorized to unreserve resources as '" + - framework->info.principal() + "'"); - - continue; - } - - // Make sure this unreserve operation is valid. Option<Error> error = validation::operation::validate(operation.unreserve()); + error = error.isSome() + ? Error(error->message + "; on agent " + stringify(*slave)) + : authorized(ActionObject::unreserve(operation.unreserve())); + if (error.isSome()) { drop(framework, operation, error->message); continue; @@ -4735,31 +4607,6 @@ void Master::_accept( } case Offer::Operation::CREATE: { - CHECK(!authorizations.empty()); - Future<bool> authorization = authorizations.front(); - authorizations.pop_front(); - - CHECK(!authorization.isDiscarded()); - - if (authorization.isFailed()) { - // TODO(greggomann): We may want to retry this failed authorization - // request rather than dropping it immediately. - drop(framework, - operation, - "Authorization of principal '" + framework->info.principal() + - "' to create persistent volumes failed: " + - authorization.failure()); - - continue; - } else if (!authorization.get()) { - drop(framework, - operation, - "Not authorized to create persistent volumes as '" + - framework->info.principal() + "'"); - - continue; - } - Option<Principal> principal = framework->info.has_principal() ? Principal(framework->info.principal()) : Option<Principal>::none(); @@ -4772,11 +4619,12 @@ void Master::_accept( slave->capabilities, framework->info); + error = error.isSome() + ? Error(error->message + "; on agent " + stringify(*slave)) + : authorized(ActionObject::createVolume(operation.create())); + if (error.isSome()) { - drop( - framework, - operation, - error->message + "; on agent " + stringify(*slave)); + drop(framework, operation, error->message); continue; } @@ -4813,38 +4661,16 @@ void Master::_accept( } case Offer::Operation::DESTROY: { - CHECK(!authorizations.empty()); - Future<bool> authorization = authorizations.front(); - authorizations.pop_front(); - - CHECK(!authorization.isDiscarded()); - - if (authorization.isFailed()) { - // TODO(greggomann): We may want to retry this failed authorization - // request rather than dropping it immediately. - drop(framework, - operation, - "Authorization of principal '" + framework->info.principal() + - "' to destroy persistent volumes failed: " + - authorization.failure()); - - continue; - } else if (!authorization.get()) { - drop(framework, - operation, - "Not authorized to destroy persistent volumes as '" + - framework->info.principal() + "'"); - - continue; - } - - // Make sure this destroy operation is valid. Option<Error> error = validation::operation::validate( operation.destroy(), slave->checkpointedResources, slave->usedResources, slave->pendingTasks); + error = error.isSome() + ? Error(error->message + "; on agent " + stringify(*slave)) + : authorized(ActionObject::destroyVolume(operation.destroy())); + if (error.isSome()) { drop(framework, operation, error->message); continue; @@ -4901,40 +4727,15 @@ void Master::_accept( } case Offer::Operation::GROW_VOLUME: { - CHECK(!authorizations.empty()); - Future<bool> authorization = authorizations.front(); - authorizations.pop_front(); - - CHECK(!authorization.isDiscarded()); - - if (authorization.isFailed()) { - // TODO(greggomann): We may want to retry this failed authorization - // request rather than dropping it immediately. - drop(framework, - operation, - "Authorization of principal '" + framework->info.principal() + - "' to grow a volume failed: " + - authorization.failure()); - - continue; - } else if (!authorization.get()) { - drop(framework, - operation, - "Not authorized to grow a volume as '" + - framework->info.principal() + "'"); - - continue; - } - - // Make sure this grow volume operation is valid. Option<Error> error = validation::operation::validate( operation.grow_volume(), slave->capabilities); + error = error.isSome() ? + Error(error->message + "; on agent " + stringify(*slave)) + : authorized(ActionObject::growVolume(operation.grow_volume())); + if (error.isSome()) { - drop( - framework, - operation, - error->message + "; on agent " + stringify(*slave)); + drop(framework, operation, error->message); continue; } @@ -4984,40 +4785,15 @@ void Master::_accept( } case Offer::Operation::SHRINK_VOLUME: { - CHECK(!authorizations.empty()); - Future<bool> authorization = authorizations.front(); - authorizations.pop_front(); - - CHECK(!authorization.isDiscarded()); - - if (authorization.isFailed()) { - // TODO(greggomann): We may want to retry this failed authorization - // request rather than dropping it immediately. - drop(framework, - operation, - "Authorization of principal '" + framework->info.principal() + - "' to shrink a volume failed: " + - authorization.failure()); - - continue; - } else if (!authorization.get()) { - drop(framework, - operation, - "Not authorized to shrink a volume as '" + - framework->info.principal() + "'"); - - continue; - } - - // Make sure this shrink volume operation is valid. Option<Error> error = validation::operation::validate( operation.shrink_volume(), slave->capabilities); + error = error.isSome() + ? Error(error->message + "; on agent " + stringify(*slave)) + : authorized(ActionObject::shrinkVolume(operation.shrink_volume())); + if (error.isSome()) { - drop( - framework, - operation, - error->message + "; on agent " + stringify(*slave)); + drop(framework, operation, error->message); continue; } @@ -5068,10 +4844,6 @@ void Master::_accept( case Offer::Operation::LAUNCH: { foreach (const TaskInfo& task, operation.launch().task_infos()) { - CHECK(!authorizations.empty()); - Future<bool> authorization = authorizations.front(); - authorizations.pop_front(); - // The task will not be in `pendingTasks` if it has been // killed in the interim. No need to send TASK_KILLED in // this case as it has already been sent. Note however that @@ -5084,7 +4856,10 @@ void Master::_accept( // TODO(bmahler): We may send TASK_ERROR after a TASK_KILLED // if a task was killed (removed from `pendingTasks`) *and* // the task is invalid or unauthorized here. - + // + // TODO(asekretenko): Now that ACCEPT is authorized synchronously, + // master state cannot change while the task is being authorized, + // and all the code for tracking pending tasks can be removed. bool pending = framework->pendingTasks.contains(task.task_id()); framework->pendingTasks.erase(task.task_id()); slave->pendingTasks[framework->id()].erase(task.task_id()); @@ -5092,17 +4867,10 @@ void Master::_accept( slave->pendingTasks.erase(framework->id()); } - CHECK(!authorization.isDiscarded()); - - if (authorization.isFailed() || !authorization.get()) { - string user = framework->info.user(); // Default user. - if (task.has_command() && task.command().has_user()) { - user = task.command().user(); - } else if (task.has_executor() && - task.executor().command().has_user()) { - user = task.executor().command().user(); - } + const Option<Error> authorizationError = + authorized(ActionObject::taskLaunch(task, framework->info)); + if (authorizationError.isSome()) { const StatusUpdate& update = protobuf::createStatusUpdate( framework->id(), task.slave_id(), @@ -5110,9 +4878,7 @@ void Master::_accept( TASK_ERROR, TaskStatus::SOURCE_MASTER, None(), - authorization.isFailed() ? - "Authorization failure: " + authorization.failure() : - "Not authorized to launch as user '" + user + "'", + authorizationError->message, TaskStatus::REASON_TASK_UNAUTHORIZED); metrics->tasks_error++; @@ -5315,29 +5081,19 @@ void Master::_accept( Option<Error> error; Option<TaskStatus::Reason> reason; - // NOTE: We check for the authorization errors first and never break the - // loop to ensure that all authorization futures for this task group are - // iterated through. foreach (const TaskInfo& task, taskGroup.tasks()) { - CHECK(!authorizations.empty()); - Future<bool> authorization = authorizations.front(); - authorizations.pop_front(); + const ActionObject actionObject = + ActionObject::taskLaunch(task, framework->info); - CHECK(!authorization.isDiscarded()); + const Try<bool> approval = framework->approved(actionObject); - if (authorization.isFailed()) { + if (approval.isError()) { error = Error("Failed to authorize task" " '" + stringify(task.task_id()) + "'" - ": " + authorization.failure()); - } else if (!authorization.get()) { - string user = framework->info.user(); // Default user. - if (task.has_command() && task.command().has_user()) { - user = task.command().user(); - } - + ": " + approval.error()); + } else if (!*approval) { error = Error("Task '" + stringify(task.task_id()) + "'" - " is not authorized to launch as" - " user '" + user + "'"); + " is not authorized to" + stringify(actionObject)); } } @@ -5498,34 +5254,6 @@ void Master::_accept( } case Offer::Operation::CREATE_DISK: { - const Resource::DiskInfo::Source::Type diskType = - operation.create_disk().target_type(); - - CHECK(!authorizations.empty()); - Future<bool> authorization = authorizations.front(); - authorizations.pop_front(); - - CHECK(!authorization.isDiscarded()); - - if (authorization.isFailed()) { - // TODO(greggomann): We may want to retry this failed authorization - // request rather than dropping it immediately. - drop(framework, - operation, - "Authorization of principal '" + framework->info.principal() + - "' to create a " + stringify(diskType) + " disk failed: " + - authorization.failure()); - - continue; - } else if (!authorization.get()) { - drop(framework, - operation, - "Not authorized to create a " + stringify(diskType) + - " disk as '" + framework->info.principal() + "'"); - - continue; - } - if (!slave->capabilities.resourceProvider) { drop(framework, operation, @@ -5537,6 +5265,11 @@ void Master::_accept( Option<Error> error = validation::operation::validate( operation.create_disk()); + error = error.isSome() + ? Error(error->message + "; on agent " + stringify(*slave)) + : authorized(CHECK_NOTERROR( + ActionObject::createDisk(operation.create_disk()))); + if (error.isSome()) { drop(framework, operation, error->message); continue; @@ -5565,34 +5298,6 @@ void Master::_accept( } case Offer::Operation::DESTROY_DISK: { - const Resource::DiskInfo::Source::Type diskType = - operation.destroy_disk().source().disk().source().type(); - - CHECK(!authorizations.empty()); - Future<bool> authorization = authorizations.front(); - authorizations.pop_front(); - - CHECK(!authorization.isDiscarded()); - - if (authorization.isFailed()) { - // TODO(greggomann): We may want to retry this failed authorization - // request rather than dropping it immediately. - drop(framework, - operation, - "Authorization of principal '" + framework->info.principal() + - "' to destroy a " + stringify(diskType) + " disk failed: " + - authorization.failure()); - - continue; - } else if (!authorization.get()) { - drop(framework, - operation, - "Not authorized to destroy a " + stringify(diskType) + - " disk as '" + framework->info.principal() + "'"); - - continue; - } - if (!slave->capabilities.resourceProvider) { drop(framework, operation, @@ -5604,6 +5309,12 @@ void Master::_accept( Option<Error> error = validation::operation::validate( operation.destroy_disk()); + error = error.isSome() + ? Error(error->message + "; on agent " + stringify(*slave)) + : authorized(CHECK_NOTERROR( + ActionObject::destroyDisk(operation.destroy_disk()))); + + if (error.isSome()) { drop(framework, operation, error->message); continue; @@ -5638,11 +5349,6 @@ void Master::_accept( } } - CHECK(authorizations.empty()) - << "Authorization results not processed: " - << stringify( - vector<Future<bool>>(authorizations.begin(), authorizations.end())); - // Update the allocator based on the operations. if (!conversions.empty()) { allocator->updateAllocation( diff --git a/src/master/master.hpp b/src/master/master.hpp index f766b5c..7281815 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -956,9 +956,7 @@ private: void _accept( const FrameworkID& frameworkId, const SlaveID& slaveId, - mesos::scheduler::Call::Accept&& accept, - const process::Future< - std::vector<process::Future<bool>>>& authorizations); + mesos::scheduler::Call::Accept&& accept); void acceptInverseOffers( Framework* framework, diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp index 4074a18..2d8fa7c 100644 --- a/src/tests/master_authorization_tests.cpp +++ b/src/tests/master_authorization_tests.cpp @@ -390,628 +390,6 @@ TEST_F(MasterAuthorizationTest, UnauthorizedTaskGroup) driver.join(); } - -// This test verifies that a 'killTask()' that comes before -// '_accept()' is called results in TASK_KILLED. -TEST_F(MasterAuthorizationTest, KillTask) -{ - MockAuthorizer authorizer; - Try<Owned<cluster::Master>> master = StartMaster(&authorizer); - ASSERT_SOME(master); - - MockExecutor exec(DEFAULT_EXECUTOR_ID); - TestContainerizer containerizer(&exec); - - Owned<MasterDetector> detector = master.get()->createDetector(); - Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer); - ASSERT_SOME(slave); - - MockScheduler sched; - MesosSchedulerDriver driver( - &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); - - EXPECT_CALL(sched, registered(&driver, _, _)); - - Future<vector<Offer>> offers; - EXPECT_CALL(sched, resourceOffers(&driver, _)) - .WillOnce(FutureArg<1>(&offers)) - .WillRepeatedly(Return()); // Ignore subsequent offers. - - driver.start(); - - AWAIT_READY(offers); - ASSERT_FALSE(offers->empty()); - - TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID); - - // Return a pending future from authorizer. - Future<Nothing> authorize; - Promise<bool> promise; - EXPECT_CALL(authorizer, authorized(_)) - .WillOnce(DoAll(FutureSatisfy(&authorize), - Return(promise.future()))); - - driver.launchTasks(offers.get()[0].id(), {task}); - - // Wait until authorization is in progress. - AWAIT_READY(authorize); - - Future<TaskStatus> status; - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&status)); - - Future<Nothing> recoverResources = - FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources); - - // Now kill the task. - driver.killTask(task.task_id()); - - // Framework should get a TASK_KILLED right away. - AWAIT_READY(status); - EXPECT_EQ(TASK_KILLED, status->state()); - EXPECT_EQ(TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH, status->reason()); - - // Now complete authorization. - promise.set(true); - - // No task launch should happen resulting in all resources being - // returned to the allocator. - AWAIT_READY(recoverResources); - - // Make sure the task is not known to master anymore. - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .Times(0); - - driver.reconcileTasks({}); - - // We settle the clock here to ensure any updates sent by the master - // are received. There shouldn't be any updates in this case. - Clock::pause(); - Clock::settle(); - - driver.stop(); - driver.join(); -} - - -// This test verifies that if a pending task in a task group -// is killed, then the entire group will be killed. -TEST_F(MasterAuthorizationTest, KillPendingTaskInTaskGroup) -{ - MockAuthorizer authorizer; - Try<Owned<cluster::Master>> master = StartMaster(&authorizer); - ASSERT_SOME(master); - - Owned<MasterDetector> detector = master.get()->createDetector(); - Try<Owned<cluster::Slave>> slave = StartSlave(detector.get()); - ASSERT_SOME(slave); - - MockScheduler sched; - MesosSchedulerDriver driver( - &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); - - Future<FrameworkID> frameworkId; - EXPECT_CALL(sched, registered(&driver, _, _)) - .WillOnce(FutureArg<1>(&frameworkId)); - - Future<vector<Offer>> offers; - EXPECT_CALL(sched, resourceOffers(&driver, _)) - .WillOnce(FutureArg<1>(&offers)) - .WillRepeatedly(Return()); // Ignore subsequent offers. - - driver.start(); - - AWAIT_READY(frameworkId); - - AWAIT_READY(offers); - ASSERT_FALSE(offers->empty()); - - Resources resources = - Resources::parse("cpus:0.1;mem:32;disk:32").get(); - - ExecutorInfo executor; - executor.set_type(ExecutorInfo::DEFAULT); - executor.mutable_executor_id()->set_value("E"); - executor.mutable_framework_id()->CopyFrom(frameworkId.get()); - executor.mutable_resources()->CopyFrom(resources); - - TaskInfo task1; - task1.set_name("1"); - task1.mutable_task_id()->set_value("1"); - task1.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); - task1.mutable_resources()->MergeFrom(resources); - - TaskInfo task2; - task2.set_name("2"); - task2.mutable_task_id()->set_value("2"); - task2.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); - task2.mutable_resources()->MergeFrom(resources); - - TaskGroupInfo taskGroup; - taskGroup.add_tasks()->CopyFrom(task1); - taskGroup.add_tasks()->CopyFrom(task2); - - // Return a pending future from authorizer. - Future<Nothing> authorize1; - Future<Nothing> authorize2; - Promise<bool> promise1; - Promise<bool> promise2; - EXPECT_CALL(authorizer, authorized(_)) - .WillOnce(DoAll(FutureSatisfy(&authorize1), - Return(promise1.future()))) - .WillOnce(DoAll(FutureSatisfy(&authorize2), - Return(promise2.future()))); - - Future<TaskStatus> task1Status; - Future<TaskStatus> task2Status; - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&task1Status)) - .WillOnce(FutureArg<1>(&task2Status)); - - Offer::Operation operation; - operation.set_type(Offer::Operation::LAUNCH_GROUP); - - Offer::Operation::LaunchGroup* launchGroup = - operation.mutable_launch_group(); - - launchGroup->mutable_executor()->CopyFrom(executor); - launchGroup->mutable_task_group()->CopyFrom(taskGroup); - - driver.acceptOffers({offers.get()[0].id()}, {operation}); - - // Wait until all authorizations are in progress. - AWAIT_READY(authorize1); - AWAIT_READY(authorize2); - - Future<Nothing> recoverResources = - FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources); - - // Now kill task1. - driver.killTask(task1.task_id()); - - AWAIT_READY(task1Status); - EXPECT_EQ(TASK_KILLED, task1Status->state()); - EXPECT_TRUE(strings::contains( - task1Status->message(), "Killed before delivery to the agent")); - EXPECT_EQ(TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH, - task1Status->reason()); - - // Now complete authorizations for task1 and task2. - promise1.set(true); - promise2.set(true); - - AWAIT_READY(task2Status); - EXPECT_EQ(TASK_KILLED, task2Status->state()); - EXPECT_TRUE(strings::contains( - task2Status->message(), - "A task within the task group was killed before delivery to the agent")); - EXPECT_EQ(TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH, - task2Status->reason()); - - // No task launch should happen resulting in all resources being - // returned to the allocator. - AWAIT_READY(recoverResources); - - // Make sure the task group is not known to master anymore. - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .Times(0); - - driver.reconcileTasks({}); - - // We settle the clock here to ensure any updates sent by the master - // are received. There shouldn't be any updates in this case. - Clock::pause(); - Clock::settle(); - - driver.stop(); - driver.join(); -} - - -// This test verifies that a slave removal that comes before -// '_accept()' is called results in TASK_LOST for a framework that is -// not partition-aware. -TEST_F(MasterAuthorizationTest, SlaveRemovedLost) -{ - MockAuthorizer authorizer; - Try<Owned<cluster::Master>> master = StartMaster(&authorizer); - ASSERT_SOME(master); - - MockExecutor exec(DEFAULT_EXECUTOR_ID); - TestContainerizer containerizer(&exec); - - Owned<MasterDetector> detector = master.get()->createDetector(); - Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer); - ASSERT_SOME(slave); - - MockScheduler sched; - MesosSchedulerDriver driver( - &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); - - EXPECT_CALL(sched, registered(&driver, _, _)); - - Future<vector<Offer>> offers; - EXPECT_CALL(sched, resourceOffers(&driver, _)) - .WillOnce(FutureArg<1>(&offers)) - .WillRepeatedly(Return()); // Ignore subsequent offers. - - driver.start(); - - AWAIT_READY(offers); - ASSERT_FALSE(offers->empty()); - - TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID); - - // Return a pending future from authorizer. - Future<Nothing> authorize; - Promise<bool> promise; - EXPECT_CALL(authorizer, authorized(_)) - .WillOnce(DoAll(FutureSatisfy(&authorize), - Return(promise.future()))); - - driver.launchTasks(offers.get()[0].id(), {task}); - - // Wait until authorization is in progress. - AWAIT_READY(authorize); - - Future<Nothing> slaveLost; - EXPECT_CALL(sched, slaveLost(&driver, _)) - .WillOnce(FutureSatisfy(&slaveLost)); - - Future<Nothing> recoverResources = - FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources); - - // Stop the slave with explicit shutdown as otherwise with - // checkpointing the master will wait for the slave to reconnect. - slave.get()->shutdown(); - slave->reset(); - - AWAIT_READY(slaveLost); - - Future<TaskStatus> status; - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&status)); - - // Now complete authorization. - promise.set(true); - - // Framework should get a TASK_LOST. - AWAIT_READY(status); - - EXPECT_EQ(TASK_LOST, status->state()); - EXPECT_EQ(TaskStatus::SOURCE_MASTER, status->source()); - EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, status->reason()); - - // No task launch should happen resulting in all resources being - // returned to the allocator. - AWAIT_READY(recoverResources); - - // Check metrics. - JSON::Object stats = Metrics(); - EXPECT_EQ(0u, stats.values["master/tasks_dropped"]); - EXPECT_EQ(1u, stats.values["master/tasks_lost"]); - EXPECT_EQ( - 1u, stats.values["master/task_lost/source_master/reason_slave_removed"]); - - // Make sure the task is not known to master anymore. - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .Times(0); - - driver.reconcileTasks({}); - - // We settle the clock here to ensure any updates sent by the master - // are received. There shouldn't be any updates in this case. - Clock::pause(); - Clock::settle(); - - driver.stop(); - driver.join(); -} - - -// This test verifies that a slave removal that comes before -// '_accept()' is called results in TASK_DROPPED for a framework that -// is partition-aware. -TEST_F(MasterAuthorizationTest, SlaveRemovedDropped) -{ - MockAuthorizer authorizer; - Try<Owned<cluster::Master>> master = StartMaster(&authorizer); - ASSERT_SOME(master); - - MockExecutor exec(DEFAULT_EXECUTOR_ID); - TestContainerizer containerizer(&exec); - - Owned<MasterDetector> detector = master.get()->createDetector(); - Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer); - ASSERT_SOME(slave); - - FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; - frameworkInfo.add_capabilities()->set_type( - FrameworkInfo::Capability::PARTITION_AWARE); - - MockScheduler sched; - MesosSchedulerDriver driver( - &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); - - EXPECT_CALL(sched, registered(&driver, _, _)); - - Future<vector<Offer>> offers; - EXPECT_CALL(sched, resourceOffers(&driver, _)) - .WillOnce(FutureArg<1>(&offers)) - .WillRepeatedly(Return()); // Ignore subsequent offers. - - driver.start(); - - AWAIT_READY(offers); - ASSERT_FALSE(offers->empty()); - - TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID); - - // Return a pending future from authorizer. - Future<Nothing> authorize; - Promise<bool> promise; - EXPECT_CALL(authorizer, authorized(_)) - .WillOnce(DoAll(FutureSatisfy(&authorize), - Return(promise.future()))); - - driver.launchTasks(offers.get()[0].id(), {task}); - - // Wait until authorization is in progress. - AWAIT_READY(authorize); - - Future<Nothing> slaveLost; - EXPECT_CALL(sched, slaveLost(&driver, _)) - .WillOnce(FutureSatisfy(&slaveLost)); - - Future<Nothing> recoverResources = - FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources); - - // Stop the slave with explicit shutdown as otherwise with - // checkpointing the master will wait for the slave to reconnect. - slave.get()->shutdown(); - slave->reset(); - - AWAIT_READY(slaveLost); - - Future<TaskStatus> status; - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&status)); - - // Now complete authorization. - promise.set(true); - - // Framework should get a TASK_DROPPED. - AWAIT_READY(status); - - EXPECT_EQ(TASK_DROPPED, status->state()); - EXPECT_EQ(TaskStatus::SOURCE_MASTER, status->source()); - EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, status->reason()); - - // No task launch should happen resulting in all resources being - // returned to the allocator. - AWAIT_READY(recoverResources); - - // Check metrics. - JSON::Object stats = Metrics(); - EXPECT_EQ(0u, stats.values["master/tasks_lost"]); - EXPECT_EQ(1u, stats.values["master/tasks_dropped"]); - EXPECT_EQ( - 1u, - stats.values["master/task_dropped/source_master/reason_slave_removed"]); - - // Make sure the task is not known to master anymore. - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .Times(0); - - driver.reconcileTasks({}); - - // We settle the clock here to ensure any updates sent by the master - // are received. There shouldn't be any updates in this case. - Clock::pause(); - Clock::settle(); - - driver.stop(); - driver.join(); -} - - -// This test verifies that a framework removal that comes before -// '_accept()' is called results in recovery of resources. -TEST_F(MasterAuthorizationTest, FrameworkRemoved) -{ - MockAuthorizer authorizer; - Try<Owned<cluster::Master>> master = StartMaster(&authorizer); - ASSERT_SOME(master); - - MockExecutor exec(DEFAULT_EXECUTOR_ID); - TestContainerizer containerizer(&exec); - - Owned<MasterDetector> detector = master.get()->createDetector(); - Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer); - ASSERT_SOME(slave); - - MockScheduler sched; - MesosSchedulerDriver driver( - &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); - - EXPECT_CALL(sched, registered(&driver, _, _)); - - Future<vector<Offer>> offers; - EXPECT_CALL(sched, resourceOffers(&driver, _)) - .WillOnce(FutureArg<1>(&offers)) - .WillRepeatedly(Return()); // Ignore subsequent offers. - - driver.start(); - - AWAIT_READY(offers); - ASSERT_FALSE(offers->empty()); - - TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID); - - // Return a pending future from authorizer. - Future<Nothing> authorize; - Promise<bool> promise; - EXPECT_CALL(authorizer, authorized(_)) - .WillOnce(DoAll(FutureSatisfy(&authorize), - Return(promise.future()))); - - driver.launchTasks(offers.get()[0].id(), {task}); - - // Wait until authorization is in progress. - AWAIT_READY(authorize); - - Future<Nothing> removeFramework = - FUTURE_DISPATCH(_, &MesosAllocatorProcess::removeFramework); - - Future<Nothing> recoverResources = - FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources); - - // Now stop the framework. - driver.stop(); - driver.join(); - - AWAIT_READY(removeFramework); - - // Now complete authorization. - promise.set(true); - - // No task launch should happen resulting in all resources being - // returned to the allocator. - AWAIT_READY(recoverResources); -} - - -// This test verifies that two tasks each launched on a different -// slave with same executor id but different executor info are -// allowed even when the first task is pending due to authorization. -TEST_F(MasterAuthorizationTest, PendingExecutorInfoDiffersOnDifferentSlaves) -{ - MockAuthorizer authorizer; - Try<Owned<cluster::Master>> master = StartMaster(&authorizer); - ASSERT_SOME(master); - - MockScheduler sched; - MesosSchedulerDriver driver( - &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); - - Future<Nothing> registered; - EXPECT_CALL(sched, registered(&driver, _, _)) - .WillOnce(FutureSatisfy(®istered)); - - driver.start(); - - AWAIT_READY(registered); - - Future<vector<Offer>> offers1; - EXPECT_CALL(sched, resourceOffers(&driver, _)) - .WillOnce(FutureArg<1>(&offers1)); - - // Start the first slave. - MockExecutor exec1(DEFAULT_EXECUTOR_ID); - TestContainerizer containerizer1(&exec1); - - Owned<MasterDetector> detector = master.get()->createDetector(); - Try<Owned<cluster::Slave>> slave1 = - StartSlave(detector.get(), &containerizer1); - ASSERT_SOME(slave1); - - AWAIT_READY(offers1); - ASSERT_FALSE(offers1->empty()); - - // Launch the first task with the default executor id. - ExecutorInfo executor1; - executor1 = DEFAULT_EXECUTOR_INFO; - executor1.mutable_command()->set_value("exit 1"); - - TaskInfo task1 = createTask( - offers1.get()[0], executor1.command().value(), executor1.executor_id()); - - // Return a pending future from authorizer. - // Note that we retire this expectation after its use because - // the authorizer will next be called when `slave2` registers and - // this expectation would be hit again (and be oversaturated) if - // we don't retire. New expectations on `authorizer` will be set - // after `slave2` is registered. - Future<Nothing> authorize; - Promise<bool> promise; - EXPECT_CALL(authorizer, authorized(_)) - .WillOnce(DoAll(FutureSatisfy(&authorize), - Return(promise.future()))) - .RetiresOnSaturation(); - - driver.launchTasks(offers1.get()[0].id(), {task1}); - - // Wait until authorization is in progress. - AWAIT_READY(authorize); - - Future<vector<Offer>> offers2; - EXPECT_CALL(sched, resourceOffers(&driver, _)) - .WillOnce(FutureArg<1>(&offers2)) - .WillRepeatedly(Return()); // Ignore subsequent offers. - - // Now start the second slave. - MockExecutor exec2(DEFAULT_EXECUTOR_ID); - TestContainerizer containerizer2(&exec2); - - Try<Owned<cluster::Slave>> slave2 = - StartSlave(detector.get(), &containerizer2); - ASSERT_SOME(slave2); - - AWAIT_READY(offers2); - ASSERT_FALSE(offers2->empty()); - - // Now launch the second task with the same executor id but - // a different executor command. - ExecutorInfo executor2; - executor2 = executor1; - executor2.mutable_command()->set_value("exit 2"); - - TaskInfo task2 = createTask( - offers2.get()[0], executor2.command().value(), executor2.executor_id()); - - EXPECT_CALL(exec2, registered(_, _, _, _)); - - EXPECT_CALL(exec2, launchTask(_, _)) - .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); - - Future<TaskStatus> status2; - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&status2)); - - EXPECT_CALL(authorizer, authorized(_)) - .WillOnce(Return(true)); - - driver.launchTasks(offers2.get()[0].id(), {task2}); - - AWAIT_READY(status2); - ASSERT_EQ(TASK_RUNNING, status2->state()); - - EXPECT_CALL(exec1, registered(_, _, _, _)); - - EXPECT_CALL(exec1, launchTask(_, _)) - .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); - - Future<TaskStatus> status1; - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&status1)); - - // Complete authorization of 'task1'. - promise.set(true); - - AWAIT_READY(status1); - ASSERT_EQ(TASK_RUNNING, status1->state()); - - EXPECT_CALL(exec1, shutdown(_)) - .Times(AtMost(1)); - - EXPECT_CALL(exec2, shutdown(_)) - .Times(AtMost(1)); - - driver.stop(); - driver.join(); -} - - // This test verifies that a framework registration with authorized // role is successful. TEST_F(MasterAuthorizationTest, AuthorizedRole) @@ -3206,6 +2584,23 @@ public: }; +class ControllableObjectApprover : public ObjectApprover +{ +public: + ControllableObjectApprover(bool permissive_) : permissive(permissive_) {} + void disable() { permissive.store(false); } + + Try<bool> approved( + const Option<ObjectApprover::Object>&) const noexcept override + { + return permissive.load(); + } + +private: + std::atomic_bool permissive; +}; + + INSTANTIATE_TEST_CASE_P( AllowedAction, MasterOperationAuthorizationTest, @@ -3232,14 +2627,18 @@ TEST_P(MasterOperationAuthorizationTest, Accept) { Clock::pause(); - // We use this flag to control when the mock authorizer starts to deny - // disallowed actions. - std::atomic_bool permissive(true); + const auto controllableApprover = + std::make_shared<ControllableObjectApprover>(true); MockAuthorizer authorizer; - EXPECT_CALL(authorizer, authorized(_)) - .WillRepeatedly(Invoke([&](const authorization::Request& request) { - return permissive || request.action() == GetParam(); + + const authorization::Action allowedAction = GetParam(); + EXPECT_CALL(authorizer, getApprover(_, _)) + .WillRepeatedly(Invoke([controllableApprover, allowedAction]( + const Option<authorization::Subject>&, + const authorization::Action& action) { + return action == allowedAction ? getAcceptingObjectApprover() + : controllableApprover; })); Try<Owned<cluster::Master>> master = StartMaster(&authorizer); @@ -3493,7 +2892,7 @@ TEST_P(MasterOperationAuthorizationTest, Accept) EXPECT_EQ(frameworkId, subscribed->framework_id()); // Start to deny disallowed actions. - permissive = false; + controllableApprover->disable(); AWAIT_READY(offers); ASSERT_EQ(1, offers->offers_size()); diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp index 9688f5f..c47d4c3 100644 --- a/src/tests/master_tests.cpp +++ b/src/tests/master_tests.cpp @@ -10572,109 +10572,6 @@ TEST_P(MasterTestPrePostReservationRefinement, } -// This test verifies that hitting the `/state` endpoint before '_accept()' -// is called results in pending tasks being reported correctly. -TEST_P(MasterTestPrePostReservationRefinement, StateEndpointPendingTasks) -{ - FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; - frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE); - - // TODO(mpark): Remove this once `RESERVATION_REFINEMENT` - // is removed from `DEFAULT_FRAMEWORK_INFO`. - frameworkInfo.clear_capabilities(); - frameworkInfo.add_capabilities()->set_type( - FrameworkInfo::Capability::MULTI_ROLE); - - if (GetParam()) { - frameworkInfo.add_capabilities()->set_type( - FrameworkInfo::Capability::RESERVATION_REFINEMENT); - } - - MockAuthorizer authorizer; - Try<Owned<cluster::Master>> master = StartMaster(&authorizer); - ASSERT_SOME(master); - - Owned<MasterDetector> detector = master.get()->createDetector(); - Try<Owned<cluster::Slave>> slave = StartSlave(detector.get()); - ASSERT_SOME(slave); - - MockScheduler sched; - MesosSchedulerDriver driver( - &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); - - EXPECT_CALL(sched, registered(&driver, _, _)); - - Future<vector<Offer>> offers; - EXPECT_CALL(sched, resourceOffers(&driver, _)) - .WillOnce(FutureArg<1>(&offers)) - .WillRepeatedly(Return()); // Ignore subsequent offers. - - driver.start(); - - AWAIT_READY(offers); - ASSERT_FALSE(offers->empty()); - - Offer offer = offers->front(); - - TaskInfo task; - task.set_name(""); - task.mutable_task_id()->set_value("1"); - task.mutable_slave_id()->MergeFrom(offer.slave_id()); - task.mutable_resources()->MergeFrom(offer.resources()); - task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); - - // Return a pending future from authorizer. - Future<Nothing> authorize; - Promise<bool> promise; - EXPECT_CALL(authorizer, authorized(_)) - .WillOnce(DoAll(FutureSatisfy(&authorize), - Return(promise.future()))); - - driver.launchTasks(offer.id(), {task}); - - // Wait until authorization is in progress. - AWAIT_READY(authorize); - - Future<Response> response = process::http::get( - master.get()->pid, - "state", - None(), - createBasicAuthHeaders(DEFAULT_CREDENTIAL)); - - AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); - - Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body); - ASSERT_SOME(parse); - - JSON::Value result = parse.get(); - - JSON::Object expected = { - { - "frameworks", - JSON::Array { - JSON::Object { - { - "tasks", - JSON::Array { - JSON::Object { - { "id", "1" }, - { "role", frameworkInfo.roles(0) }, - { "state", "TASK_STAGING" } - } - } - } - } - } - } - }; - - EXPECT_TRUE(result.contains(expected)); - - driver.stop(); - driver.join(); -} - - // This test verifies that an operator can reserve and unreserve // resources through the master operator API in both // "(pre|post)-reservation-refinement" formats. diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp index cdff370..4c84b1a 100644 --- a/src/tests/reconciliation_tests.cpp +++ b/src/tests/reconciliation_tests.cpp @@ -846,92 +846,6 @@ TEST_F(ReconciliationTest, ImplicitTerminalTask) } -// This test ensures that reconciliation requests for tasks that are -// pending are exposed in reconciliation. -TEST_F(ReconciliationTest, PendingTask) -{ - MockAuthorizer authorizer; - Try<Owned<cluster::Master>> master = StartMaster(&authorizer); - ASSERT_SOME(master); - - MockExecutor exec(DEFAULT_EXECUTOR_ID); - TestContainerizer containerizer(&exec); - - Future<SlaveRegisteredMessage> slaveRegisteredMessage = - FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); - - Owned<MasterDetector> detector = master.get()->createDetector(); - Try<Owned<cluster::Slave>> slave = StartSlave(detector.get()); - ASSERT_SOME(slave); - - // Wait for the slave to register and get the slave id. - AWAIT_READY(slaveRegisteredMessage); - const SlaveID slaveId = slaveRegisteredMessage->slave_id(); - - MockScheduler sched; - MesosSchedulerDriver driver( - &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); - - EXPECT_CALL(sched, registered(&driver, _, _)); - - Future<vector<Offer>> offers; - EXPECT_CALL(sched, resourceOffers(&driver, _)) - .WillOnce(FutureArg<1>(&offers)) - .WillRepeatedly(Return()); // Ignore subsequent offers. - - driver.start(); - - AWAIT_READY(offers); - ASSERT_FALSE(offers->empty()); - - // Return a pending future from authorizer. - Future<Nothing> authorize; - Promise<bool> promise; - EXPECT_CALL(authorizer, authorized(_)) - .WillOnce(DoAll(FutureSatisfy(&authorize), - Return(promise.future()))); - - TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID); - - driver.launchTasks(offers.get()[0].id(), {task}); - - // Wait until authorization is in progress. - AWAIT_READY(authorize); - - // First send an implicit reconciliation request for this task. - Future<TaskStatus> update; - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&update)); - - driver.reconcileTasks({}); - - AWAIT_READY(update); - EXPECT_EQ(TASK_STAGING, update->state()); - EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, update->reason()); - EXPECT_TRUE(update->has_slave_id()); - - // Now send an explicit reconciliation request for this task. - Future<TaskStatus> update2; - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&update2)); - - TaskStatus status; - status.mutable_task_id()->CopyFrom(task.task_id()); - status.mutable_slave_id()->CopyFrom(slaveId); - status.set_state(TASK_STAGING); // Dummy value. - - driver.reconcileTasks({status}); - - AWAIT_READY(update2); - EXPECT_EQ(TASK_STAGING, update2->state()); - EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, update2->reason()); - EXPECT_TRUE(update2->has_slave_id()); - - driver.stop(); - driver.join(); -} - - // This test ensures that the master responds with the latest state // for tasks that are terminal at the master, but have not been // acknowledged by the framework. See MESOS-1389.