Repository: mesos Updated Branches: refs/heads/master 79a98ef73 -> c6e9ce168
Updated Master to deal with Accept Offer vs InverseOffer correctly. Review: https://reviews.apache.org/r/46759 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c6e9ce16 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c6e9ce16 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c6e9ce16 Branch: refs/heads/master Commit: c6e9ce16850f69fda719d4e32be3f2a2e1d80387 Parents: 82e9f45 Author: Joris Van Remoortere <joris.van.remoort...@gmail.com> Authored: Fri Apr 1 10:22:00 2016 +0200 Committer: Joris Van Remoortere <joris.van.remoort...@gmail.com> Committed: Tue May 31 15:25:54 2016 -0700 ---------------------------------------------------------------------- src/master/http.cpp | 8 +++ src/master/master.cpp | 139 ++++++++++++++++++++++++++++++----------- src/master/master.hpp | 15 ++++- src/master/validation.cpp | 121 +++++++++++++++++++++++++++++++---- src/master/validation.hpp | 8 +++ 5 files changed, 241 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/c6e9ce16/src/master/http.cpp ---------------------------------------------------------------------- diff --git a/src/master/http.cpp b/src/master/http.cpp index d26e26d..1245326 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -898,6 +898,14 @@ Future<Response> Master::Http::scheduler( master->decline(framework, call.decline()); return Accepted(); + case scheduler::Call::ACCEPT_INVERSE_OFFERS: + master->acceptInverseOffers(framework, call.accept_inverse_offers()); + return Accepted(); + + case scheduler::Call::DECLINE_INVERSE_OFFERS: + master->declineInverseOffers(framework, call.decline_inverse_offers()); + return Accepted(); + case scheduler::Call::REVIVE: master->revive(framework); return Accepted(); http://git-wip-us.apache.org/repos/asf/mesos/blob/c6e9ce16/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 3e992aa..feabcc8 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -2128,6 +2128,14 @@ void Master::receive( decline(framework, call.decline()); break; + case scheduler::Call::ACCEPT_INVERSE_OFFERS: + acceptInverseOffers(framework, call.accept_inverse_offers()); + break; + + case scheduler::Call::DECLINE_INVERSE_OFFERS: + declineInverseOffers(framework, call.decline_inverse_offers()); + break; + case scheduler::Call::REVIVE: revive(framework); break; @@ -3365,11 +3373,6 @@ void Master::accept( // validation failed, return resources to the allocator. foreach (const OfferID& offerId, accept.offer_ids()) { Offer* offer = getOffer(offerId); - - // Since we re-use `OfferID`s, it is possible to arrive here with either - // a resource offer, or an inverse offer. We first try as a resource offer - // and if that fails, then we assume it is an inverse offer. This is - // correct as those are currently the only 2 ways to get an `OfferID`. if (offer != NULL) { slaveId = offer->slave_id(); offeredResources += offer->resources(); @@ -3385,30 +3388,8 @@ void Master::accept( continue; } - // Try it as an inverse offer. If this fails then the offer is no longer - // valid. - InverseOffer* inverseOffer = getInverseOffer(offerId); - if (inverseOffer != NULL) { - mesos::master::InverseOfferStatus status; - status.set_status(mesos::master::InverseOfferStatus::ACCEPT); - status.mutable_framework_id()->CopyFrom(inverseOffer->framework_id()); - status.mutable_timestamp()->CopyFrom(protobuf::getCurrentTime()); - - allocator->updateInverseOffer( - inverseOffer->slave_id(), - inverseOffer->framework_id(), - UnavailableResources{ - inverseOffer->resources(), - inverseOffer->unavailability()}, - status, - accept.filters()); - - removeInverseOffer(inverseOffer); - continue; - } - - // If the offer was neither in our offer or inverse offer sets, then this - // offer is no longer valid. + // If the offer was not in our offer set, then this offer is no + // longer valid. LOG(WARNING) << "Ignoring accept of offer " << offerId << " since it is no longer valid"; } @@ -3992,6 +3973,74 @@ void Master::_accept( } +void Master::acceptInverseOffers( + Framework* framework, + const scheduler::Call::AcceptInverseOffers& accept) +{ + CHECK_NOTNULL(framework); + + Option<Error> error = None(); + + if (accept.inverse_offer_ids().size() == 0) { + error = Error("No inverse offers specified"); + } else { + // Validate the inverse offers. + error = validation::offer::validateInverseOffers( + accept.inverse_offer_ids(), + this, + framework); + + Option<SlaveID> slaveId; + + // Update each inverse offer in the allocator with the accept and + // filter. + foreach (const OfferID& offerId, accept.inverse_offer_ids()) { + InverseOffer* inverseOffer = getInverseOffer(offerId); + if (inverseOffer != NULL) { + CHECK(inverseOffer->has_slave_id()); + slaveId = inverseOffer->slave_id(); + + mesos::master::InverseOfferStatus status; + status.set_status(mesos::master::InverseOfferStatus::ACCEPT); + status.mutable_framework_id()->CopyFrom(inverseOffer->framework_id()); + status.mutable_timestamp()->CopyFrom(protobuf::getCurrentTime()); + + allocator->updateInverseOffer( + inverseOffer->slave_id(), + inverseOffer->framework_id(), + UnavailableResources{ + inverseOffer->resources(), + inverseOffer->unavailability()}, + status, + accept.filters()); + + removeInverseOffer(inverseOffer); + continue; + } + + // If the offer was not in our inverse offer set, then this + // offer is no longer valid. + LOG(WARNING) << "Ignoring accept of inverse offer " << offerId + << " since it is no longer valid"; + } + + CHECK_SOME(slaveId); + Slave* slave = slaves.registered.get(slaveId.get()); + CHECK_NOTNULL(slave); + + LOG(INFO) + << "Processing ACCEPT_INVERSE_OFFERS call for inverse offers: " + << accept.inverse_offer_ids() << " on slave " << *slave + << " for framework " << *framework; + } + + if (error.isSome()) { + LOG(WARNING) << "ACCEPT_INVERSE_OFFERS call used invalid offers '" + << accept.inverse_offer_ids() << "': " << error.get().message; + } +} + + void Master::decline( Framework* framework, const scheduler::Call::Decline& decline) @@ -4005,10 +4054,6 @@ void Master::decline( // Return resources to the allocator. foreach (const OfferID& offerId, decline.offer_ids()) { - // Since we re-use `OfferID`s, it is possible to arrive here with either a - // resource offer, or an inverse offer. We first try as a resource offer and - // if that fails, then we assume it is an inverse offer. This is correct as - // those are currently the only 2 ways to get an `OfferID`. Offer* offer = getOffer(offerId); if (offer != NULL) { allocator->recoverResources( @@ -4021,8 +4066,28 @@ void Master::decline( continue; } - // Try it as an inverse offer. If this fails then the offer is no longer - // valid. + // If the offer was not in our offer set, then this offer is no + // longer valid. + LOG(WARNING) << "Ignoring decline of offer " << offerId + << " since it is no longer valid"; + } +} + + +void Master::declineInverseOffers( + Framework* framework, + const scheduler::Call::DeclineInverseOffers& decline) +{ + CHECK_NOTNULL(framework); + + LOG(INFO) << "Processing DECLINE_INVERSE_OFFERS call for inverse offers: " + << decline.inverse_offer_ids() << " for framework " << *framework; + + // Update each inverse offer in the allocator with the decline and + // filter. + foreach (const OfferID& offerId, decline.inverse_offer_ids()) { + // Try it as an inverse offer. If this fails then the offer is no + // longer valid. InverseOffer* inverseOffer = getInverseOffer(offerId); if (inverseOffer != NULL) { // If this is an inverse offer. mesos::master::InverseOfferStatus status; @@ -4043,9 +4108,9 @@ void Master::decline( continue; } - // If the offer was neither in our offer or inverse offer sets, then this + // If the offer was not in our inverse offer set, then this // offer is no longer valid. - LOG(WARNING) << "Ignoring decline of offer " << offerId + LOG(WARNING) << "Ignoring decline of inverse offer " << offerId << " since it is no longer valid"; } } http://git-wip-us.apache.org/repos/asf/mesos/blob/c6e9ce16/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 3be0497..846edf3 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -880,10 +880,18 @@ private: const scheduler::Call::Accept& accept, const process::Future<std::list<process::Future<bool>>>& authorizations); + void acceptInverseOffers( + Framework* framework, + const scheduler::Call::AcceptInverseOffers& accept); + void decline( Framework* framework, const scheduler::Call::Decline& decline); + void declineInverseOffers( + Framework* framework, + const scheduler::Call::DeclineInverseOffers& decline); + void revive(Framework* framework); void kill( @@ -1311,11 +1319,14 @@ private: friend struct Framework; friend struct Metrics; - // NOTE: Since 'getOffer' and 'slaves' are protected, - // we need to make the following functions friends. + // NOTE: Since 'getOffer', 'getInverseOffer' and 'slaves' are + // protected, we need to make the following functions friends. friend Offer* validation::offer::getOffer( Master* master, const OfferID& offerId); + friend InverseOffer* validation::offer::getInverseOffer( + Master* master, const OfferID& offerId); + friend Slave* validation::offer::getSlave( Master* master, const SlaveID& slaveId); http://git-wip-us.apache.org/repos/asf/mesos/blob/c6e9ce16/src/master/validation.cpp ---------------------------------------------------------------------- diff --git a/src/master/validation.cpp b/src/master/validation.cpp index 4f52084..309fbed 100644 --- a/src/master/validation.cpp +++ b/src/master/validation.cpp @@ -765,6 +765,13 @@ Offer* getOffer(Master* master, const OfferID& offerId) } +InverseOffer* getInverseOffer(Master* master, const OfferID& offerId) +{ + CHECK_NOTNULL(master); + return master->getInverseOffer(offerId); +} + + Slave* getSlave(Master* master, const SlaveID& slaveId) { CHECK_NOTNULL(master); @@ -772,6 +779,71 @@ Slave* getSlave(Master* master, const SlaveID& slaveId) } +Try<SlaveID> getSlaveId(Master* master, const OfferID& offerId) +{ + // Try as an offer. + Offer* offer = getOffer(master, offerId); + if (offer != NULL) { + return offer->slave_id(); + } + + InverseOffer* inverseOffer = getInverseOffer(master, offerId); + if (inverseOffer != NULL) { + return inverseOffer->slave_id(); + } + + return Error("Offer id no longer valid"); +} + + +Try<FrameworkID> getFrameworkId(Master* master, const OfferID& offerId) +{ + // Try as an offer. + Offer* offer = getOffer(master, offerId); + if (offer != NULL) { + return offer->framework_id(); + } + + InverseOffer* inverseOffer = getInverseOffer(master, offerId); + if (inverseOffer != NULL) { + return inverseOffer->framework_id(); + } + + return Error("Offer id no longer valid"); +} + + +Option<Error> validateOfferIds( + Master* master, + const RepeatedPtrField<OfferID>& offerIds) +{ + foreach (const OfferID& offerId, offerIds) { + Offer* offer = getOffer(master, offerId); + if (offer == NULL) { + return Error("Offer " + stringify(offerId) + " is no longer valid"); + } + } + + return None(); +} + + +Option<Error> validateInverseOfferIds( + Master* master, + const RepeatedPtrField<OfferID>& offerIds) +{ + foreach (const OfferID& offerId, offerIds) { + InverseOffer* inverseOffer = getInverseOffer(master, offerId); + if (inverseOffer == NULL) { + return Error( + "Inverse offer " + stringify(offerId) + " is no longer valid"); + } + } + + return None(); +} + + // Validates that an offer only appears once in offer list. Option<Error> validateUniqueOfferID(const RepeatedPtrField<OfferID>& offerIds) { @@ -796,15 +868,15 @@ Option<Error> validateFramework( Framework* framework) { foreach (const OfferID& offerId, offerIds) { - Offer* offer = getOffer(master, offerId); - if (offer == NULL) { - return Error("Offer " + stringify(offerId) + " is no longer valid"); + Try<FrameworkID> offerFrameworkId = getFrameworkId(master, offerId); + if (offerFrameworkId.isError()) { + return offerFrameworkId.error(); } - if (framework->id() != offer->framework_id()) { + if (framework->id() != offerFrameworkId.get()) { return Error( - "Offer " + stringify(offer->id()) + - " has invalid framework " + stringify(offer->framework_id()) + + "Offer " + stringify(offerId) + + " has invalid framework " + stringify(offerFrameworkId.get()) + " while framework " + stringify(framework->id()) + " is expected"); } } @@ -820,17 +892,17 @@ Option<Error> validateSlave( Option<SlaveID> slaveId; foreach (const OfferID& offerId, offerIds) { - Offer* offer = getOffer(master, offerId); - if (offer == NULL) { - return Error("Offer " + stringify(offerId) + " is no longer valid"); + Try<SlaveID> offerSlaveId = getSlaveId(master, offerId); + if (offerSlaveId.isError()) { + return offerSlaveId.error(); } - Slave* slave = getSlave(master, offer->slave_id()); + Slave* slave = getSlave(master, offerSlaveId.get()); // This is not possible because the offer should've been removed. CHECK(slave != NULL) << "Offer " << offerId - << " outlived agent " << offer->slave_id(); + << " outlived agent " << offerSlaveId.get(); // This is not possible because the offer should've been removed. CHECK(slave->connected) @@ -865,6 +937,33 @@ Option<Error> validate( vector<lambda::function<Option<Error>()>> validators = { lambda::bind(validateUniqueOfferID, offerIds), + lambda::bind(validateOfferIds, master, offerIds), + lambda::bind(validateFramework, offerIds, master, framework), + lambda::bind(validateSlave, offerIds, master) + }; + + foreach (const lambda::function<Option<Error>()>& validator, validators) { + Option<Error> error = validator(); + if (error.isSome()) { + return error; + } + } + + return None(); +} + + +Option<Error> validateInverseOffers( + const RepeatedPtrField<OfferID>& offerIds, + Master* master, + Framework* framework) +{ + CHECK_NOTNULL(master); + CHECK_NOTNULL(framework); + + vector<lambda::function<Option<Error>()>> validators = { + lambda::bind(validateUniqueOfferID, offerIds), + lambda::bind(validateInverseOfferIds, master, offerIds), lambda::bind(validateFramework, offerIds, master, framework), lambda::bind(validateSlave, offerIds, master) }; http://git-wip-us.apache.org/repos/asf/mesos/blob/c6e9ce16/src/master/validation.hpp ---------------------------------------------------------------------- diff --git a/src/master/validation.hpp b/src/master/validation.hpp index 104263b..e1271bb 100644 --- a/src/master/validation.hpp +++ b/src/master/validation.hpp @@ -108,6 +108,7 @@ namespace offer { // NOTE: These two functions are placed in the header file because we // need to declare them as friends of Master. Offer* getOffer(Master* master, const OfferID& offerId); +InverseOffer* getInverseOffer(Master* master, const OfferID& offerId); Slave* getSlave(Master* master, const SlaveID& slaveId); @@ -117,6 +118,13 @@ Option<Error> validate( Master* master, Framework* framework); + +// Validates the given inverse offers. +Option<Error> validateInverseOffers( + const google::protobuf::RepeatedPtrField<OfferID>& offerIds, + Master* master, + Framework* framework); + } // namespace offer {