Repository: mesos
Updated Branches:
  refs/heads/master 79a98ef73 -> c6e9ce168


Updated Master to deal with Accept Offer vs InverseOffer correctly.

Review: https://reviews.apache.org/r/46759


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c6e9ce16
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c6e9ce16
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c6e9ce16

Branch: refs/heads/master
Commit: c6e9ce16850f69fda719d4e32be3f2a2e1d80387
Parents: 82e9f45
Author: Joris Van Remoortere <joris.van.remoort...@gmail.com>
Authored: Fri Apr 1 10:22:00 2016 +0200
Committer: Joris Van Remoortere <joris.van.remoort...@gmail.com>
Committed: Tue May 31 15:25:54 2016 -0700

----------------------------------------------------------------------
 src/master/http.cpp       |   8 +++
 src/master/master.cpp     | 139 ++++++++++++++++++++++++++++++-----------
 src/master/master.hpp     |  15 ++++-
 src/master/validation.cpp | 121 +++++++++++++++++++++++++++++++----
 src/master/validation.hpp |   8 +++
 5 files changed, 241 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c6e9ce16/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index d26e26d..1245326 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -898,6 +898,14 @@ Future<Response> Master::Http::scheduler(
       master->decline(framework, call.decline());
       return Accepted();
 
+    case scheduler::Call::ACCEPT_INVERSE_OFFERS:
+      master->acceptInverseOffers(framework, call.accept_inverse_offers());
+      return Accepted();
+
+    case scheduler::Call::DECLINE_INVERSE_OFFERS:
+      master->declineInverseOffers(framework, call.decline_inverse_offers());
+      return Accepted();
+
     case scheduler::Call::REVIVE:
       master->revive(framework);
       return Accepted();

http://git-wip-us.apache.org/repos/asf/mesos/blob/c6e9ce16/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 3e992aa..feabcc8 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2128,6 +2128,14 @@ void Master::receive(
       decline(framework, call.decline());
       break;
 
+    case scheduler::Call::ACCEPT_INVERSE_OFFERS:
+      acceptInverseOffers(framework, call.accept_inverse_offers());
+      break;
+
+    case scheduler::Call::DECLINE_INVERSE_OFFERS:
+      declineInverseOffers(framework, call.decline_inverse_offers());
+      break;
+
     case scheduler::Call::REVIVE:
       revive(framework);
       break;
@@ -3365,11 +3373,6 @@ void Master::accept(
     // validation failed, return resources to the allocator.
     foreach (const OfferID& offerId, accept.offer_ids()) {
       Offer* offer = getOffer(offerId);
-
-      // Since we re-use `OfferID`s, it is possible to arrive here with either
-      // a resource offer, or an inverse offer. We first try as a resource 
offer
-      // and if that fails, then we assume it is an inverse offer. This is
-      // correct as those are currently the only 2 ways to get an `OfferID`.
       if (offer != NULL) {
         slaveId = offer->slave_id();
         offeredResources += offer->resources();
@@ -3385,30 +3388,8 @@ void Master::accept(
         continue;
       }
 
-      // Try it as an inverse offer. If this fails then the offer is no longer
-      // valid.
-      InverseOffer* inverseOffer = getInverseOffer(offerId);
-      if (inverseOffer != NULL) {
-        mesos::master::InverseOfferStatus status;
-        status.set_status(mesos::master::InverseOfferStatus::ACCEPT);
-        status.mutable_framework_id()->CopyFrom(inverseOffer->framework_id());
-        status.mutable_timestamp()->CopyFrom(protobuf::getCurrentTime());
-
-        allocator->updateInverseOffer(
-            inverseOffer->slave_id(),
-            inverseOffer->framework_id(),
-            UnavailableResources{
-                inverseOffer->resources(),
-                inverseOffer->unavailability()},
-            status,
-            accept.filters());
-
-        removeInverseOffer(inverseOffer);
-        continue;
-      }
-
-      // If the offer was neither in our offer or inverse offer sets, then this
-      // offer is no longer valid.
+      // If the offer was not in our offer set, then this offer is no
+      // longer valid.
       LOG(WARNING) << "Ignoring accept of offer " << offerId
                    << " since it is no longer valid";
     }
@@ -3992,6 +3973,74 @@ void Master::_accept(
 }
 
 
+void Master::acceptInverseOffers(
+    Framework* framework,
+    const scheduler::Call::AcceptInverseOffers& accept)
+{
+  CHECK_NOTNULL(framework);
+
+  Option<Error> error = None();
+
+  if (accept.inverse_offer_ids().size() == 0) {
+    error = Error("No inverse offers specified");
+  } else {
+    // Validate the inverse offers.
+    error = validation::offer::validateInverseOffers(
+        accept.inverse_offer_ids(),
+        this,
+        framework);
+
+    Option<SlaveID> slaveId;
+
+    // Update each inverse offer in the allocator with the accept and
+    // filter.
+    foreach (const OfferID& offerId, accept.inverse_offer_ids()) {
+      InverseOffer* inverseOffer = getInverseOffer(offerId);
+      if (inverseOffer != NULL) {
+        CHECK(inverseOffer->has_slave_id());
+        slaveId = inverseOffer->slave_id();
+
+        mesos::master::InverseOfferStatus status;
+        status.set_status(mesos::master::InverseOfferStatus::ACCEPT);
+        status.mutable_framework_id()->CopyFrom(inverseOffer->framework_id());
+        status.mutable_timestamp()->CopyFrom(protobuf::getCurrentTime());
+
+        allocator->updateInverseOffer(
+            inverseOffer->slave_id(),
+            inverseOffer->framework_id(),
+            UnavailableResources{
+                inverseOffer->resources(),
+                inverseOffer->unavailability()},
+            status,
+            accept.filters());
+
+        removeInverseOffer(inverseOffer);
+        continue;
+      }
+
+      // If the offer was not in our inverse offer set, then this
+      // offer is no longer valid.
+      LOG(WARNING) << "Ignoring accept of inverse offer " << offerId
+                   << " since it is no longer valid";
+    }
+
+    CHECK_SOME(slaveId);
+    Slave* slave = slaves.registered.get(slaveId.get());
+    CHECK_NOTNULL(slave);
+
+    LOG(INFO)
+        << "Processing ACCEPT_INVERSE_OFFERS call for inverse offers: "
+        << accept.inverse_offer_ids() << " on slave " << *slave
+        << " for framework " << *framework;
+  }
+
+  if (error.isSome()) {
+    LOG(WARNING) << "ACCEPT_INVERSE_OFFERS call used invalid offers '"
+                 << accept.inverse_offer_ids() << "': " << error.get().message;
+  }
+}
+
+
 void Master::decline(
     Framework* framework,
     const scheduler::Call::Decline& decline)
@@ -4005,10 +4054,6 @@ void Master::decline(
 
   //  Return resources to the allocator.
   foreach (const OfferID& offerId, decline.offer_ids()) {
-    // Since we re-use `OfferID`s, it is possible to arrive here with either a
-    // resource offer, or an inverse offer. We first try as a resource offer 
and
-    // if that fails, then we assume it is an inverse offer. This is correct as
-    // those are currently the only 2 ways to get an `OfferID`.
     Offer* offer = getOffer(offerId);
     if (offer != NULL) {
       allocator->recoverResources(
@@ -4021,8 +4066,28 @@ void Master::decline(
       continue;
     }
 
-    // Try it as an inverse offer. If this fails then the offer is no longer
-    // valid.
+    // If the offer was not in our offer set, then this offer is no
+    // longer valid.
+    LOG(WARNING) << "Ignoring decline of offer " << offerId
+                 << " since it is no longer valid";
+  }
+}
+
+
+void Master::declineInverseOffers(
+    Framework* framework,
+    const scheduler::Call::DeclineInverseOffers& decline)
+{
+  CHECK_NOTNULL(framework);
+
+  LOG(INFO) << "Processing DECLINE_INVERSE_OFFERS call for inverse offers: "
+            << decline.inverse_offer_ids() << " for framework " << *framework;
+
+  // Update each inverse offer in the allocator with the decline and
+  // filter.
+  foreach (const OfferID& offerId, decline.inverse_offer_ids()) {
+    // Try it as an inverse offer. If this fails then the offer is no
+    // longer valid.
     InverseOffer* inverseOffer = getInverseOffer(offerId);
     if (inverseOffer != NULL) { // If this is an inverse offer.
       mesos::master::InverseOfferStatus status;
@@ -4043,9 +4108,9 @@ void Master::decline(
       continue;
     }
 
-    // If the offer was neither in our offer or inverse offer sets, then this
+    // If the offer was not in our inverse offer set, then this
     // offer is no longer valid.
-    LOG(WARNING) << "Ignoring decline of offer " << offerId
+    LOG(WARNING) << "Ignoring decline of inverse offer " << offerId
                  << " since it is no longer valid";
   }
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/c6e9ce16/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 3be0497..846edf3 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -880,10 +880,18 @@ private:
     const scheduler::Call::Accept& accept,
     const process::Future<std::list<process::Future<bool>>>& authorizations);
 
+  void acceptInverseOffers(
+      Framework* framework,
+      const scheduler::Call::AcceptInverseOffers& accept);
+
   void decline(
       Framework* framework,
       const scheduler::Call::Decline& decline);
 
+  void declineInverseOffers(
+      Framework* framework,
+      const scheduler::Call::DeclineInverseOffers& decline);
+
   void revive(Framework* framework);
 
   void kill(
@@ -1311,11 +1319,14 @@ private:
   friend struct Framework;
   friend struct Metrics;
 
-  // NOTE: Since 'getOffer' and 'slaves' are protected,
-  // we need to make the following functions friends.
+  // NOTE: Since 'getOffer', 'getInverseOffer' and 'slaves' are
+  // protected, we need to make the following functions friends.
   friend Offer* validation::offer::getOffer(
       Master* master, const OfferID& offerId);
 
+  friend InverseOffer* validation::offer::getInverseOffer(
+      Master* master, const OfferID& offerId);
+
   friend Slave* validation::offer::getSlave(
       Master* master, const SlaveID& slaveId);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c6e9ce16/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 4f52084..309fbed 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -765,6 +765,13 @@ Offer* getOffer(Master* master, const OfferID& offerId)
 }
 
 
+InverseOffer* getInverseOffer(Master* master, const OfferID& offerId)
+{
+  CHECK_NOTNULL(master);
+  return master->getInverseOffer(offerId);
+}
+
+
 Slave* getSlave(Master* master, const SlaveID& slaveId)
 {
   CHECK_NOTNULL(master);
@@ -772,6 +779,71 @@ Slave* getSlave(Master* master, const SlaveID& slaveId)
 }
 
 
+Try<SlaveID> getSlaveId(Master* master, const OfferID& offerId)
+{
+  // Try as an offer.
+  Offer* offer = getOffer(master, offerId);
+  if (offer != NULL) {
+    return offer->slave_id();
+  }
+
+  InverseOffer* inverseOffer = getInverseOffer(master, offerId);
+  if (inverseOffer != NULL) {
+    return inverseOffer->slave_id();
+  }
+
+  return Error("Offer id no longer valid");
+}
+
+
+Try<FrameworkID> getFrameworkId(Master* master, const OfferID& offerId)
+{
+  // Try as an offer.
+  Offer* offer = getOffer(master, offerId);
+  if (offer != NULL) {
+    return offer->framework_id();
+  }
+
+  InverseOffer* inverseOffer = getInverseOffer(master, offerId);
+  if (inverseOffer != NULL) {
+    return inverseOffer->framework_id();
+  }
+
+  return Error("Offer id no longer valid");
+}
+
+
+Option<Error> validateOfferIds(
+    Master* master,
+    const RepeatedPtrField<OfferID>& offerIds)
+{
+  foreach (const OfferID& offerId, offerIds) {
+    Offer* offer = getOffer(master, offerId);
+    if (offer == NULL) {
+      return Error("Offer " + stringify(offerId) + " is no longer valid");
+    }
+  }
+
+  return None();
+}
+
+
+Option<Error> validateInverseOfferIds(
+    Master* master,
+    const RepeatedPtrField<OfferID>& offerIds)
+{
+  foreach (const OfferID& offerId, offerIds) {
+    InverseOffer* inverseOffer = getInverseOffer(master, offerId);
+    if (inverseOffer == NULL) {
+      return Error(
+          "Inverse offer " + stringify(offerId) + " is no longer valid");
+    }
+  }
+
+  return None();
+}
+
+
 // Validates that an offer only appears once in offer list.
 Option<Error> validateUniqueOfferID(const RepeatedPtrField<OfferID>& offerIds)
 {
@@ -796,15 +868,15 @@ Option<Error> validateFramework(
     Framework* framework)
 {
   foreach (const OfferID& offerId, offerIds) {
-    Offer* offer = getOffer(master, offerId);
-    if (offer == NULL) {
-      return Error("Offer " + stringify(offerId) + " is no longer valid");
+    Try<FrameworkID> offerFrameworkId = getFrameworkId(master, offerId);
+    if (offerFrameworkId.isError()) {
+      return offerFrameworkId.error();
     }
 
-    if (framework->id() != offer->framework_id()) {
+    if (framework->id() != offerFrameworkId.get()) {
       return Error(
-          "Offer " + stringify(offer->id()) +
-          " has invalid framework " + stringify(offer->framework_id()) +
+          "Offer " + stringify(offerId) +
+          " has invalid framework " + stringify(offerFrameworkId.get()) +
           " while framework " + stringify(framework->id()) + " is expected");
     }
   }
@@ -820,17 +892,17 @@ Option<Error> validateSlave(
   Option<SlaveID> slaveId;
 
   foreach (const OfferID& offerId, offerIds) {
-    Offer* offer = getOffer(master, offerId);
-    if (offer == NULL) {
-      return Error("Offer " + stringify(offerId) + " is no longer valid");
+    Try<SlaveID> offerSlaveId = getSlaveId(master, offerId);
+    if (offerSlaveId.isError()) {
+      return offerSlaveId.error();
     }
 
-    Slave* slave = getSlave(master, offer->slave_id());
+    Slave* slave = getSlave(master, offerSlaveId.get());
 
     // This is not possible because the offer should've been removed.
     CHECK(slave != NULL)
       << "Offer " << offerId
-      << " outlived agent " << offer->slave_id();
+      << " outlived agent " << offerSlaveId.get();
 
     // This is not possible because the offer should've been removed.
     CHECK(slave->connected)
@@ -865,6 +937,33 @@ Option<Error> validate(
 
   vector<lambda::function<Option<Error>()>> validators = {
     lambda::bind(validateUniqueOfferID, offerIds),
+    lambda::bind(validateOfferIds, master, offerIds),
+    lambda::bind(validateFramework, offerIds, master, framework),
+    lambda::bind(validateSlave, offerIds, master)
+  };
+
+  foreach (const lambda::function<Option<Error>()>& validator, validators) {
+    Option<Error> error = validator();
+    if (error.isSome()) {
+      return error;
+    }
+  }
+
+  return None();
+}
+
+
+Option<Error> validateInverseOffers(
+    const RepeatedPtrField<OfferID>& offerIds,
+    Master* master,
+    Framework* framework)
+{
+  CHECK_NOTNULL(master);
+  CHECK_NOTNULL(framework);
+
+  vector<lambda::function<Option<Error>()>> validators = {
+    lambda::bind(validateUniqueOfferID, offerIds),
+    lambda::bind(validateInverseOfferIds, master, offerIds),
     lambda::bind(validateFramework, offerIds, master, framework),
     lambda::bind(validateSlave, offerIds, master)
   };

http://git-wip-us.apache.org/repos/asf/mesos/blob/c6e9ce16/src/master/validation.hpp
----------------------------------------------------------------------
diff --git a/src/master/validation.hpp b/src/master/validation.hpp
index 104263b..e1271bb 100644
--- a/src/master/validation.hpp
+++ b/src/master/validation.hpp
@@ -108,6 +108,7 @@ namespace offer {
 // NOTE: These two functions are placed in the header file because we
 // need to declare them as friends of Master.
 Offer* getOffer(Master* master, const OfferID& offerId);
+InverseOffer* getInverseOffer(Master* master, const OfferID& offerId);
 Slave* getSlave(Master* master, const SlaveID& slaveId);
 
 
@@ -117,6 +118,13 @@ Option<Error> validate(
     Master* master,
     Framework* framework);
 
+
+// Validates the given inverse offers.
+Option<Error> validateInverseOffers(
+    const google::protobuf::RepeatedPtrField<OfferID>& offerIds,
+    Master* master,
+    Framework* framework);
+
 } // namespace offer {
 
 

Reply via email to