V1 API: Split Resource offers and Inverse Offers.

Review: https://reviews.apache.org/r/46756


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e9c3b6df
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e9c3b6df
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e9c3b6df

Branch: refs/heads/master
Commit: e9c3b6df59c2d968bb1e5ee5f2f1ac74d121a660
Parents: 79a98ef
Author: Joris Van Remoortere <joris.van.remoort...@gmail.com>
Authored: Tue Mar 29 16:38:40 2016 +0200
Committer: Joris Van Remoortere <joris.van.remoort...@gmail.com>
Committed: Tue May 31 15:25:54 2016 -0700

----------------------------------------------------------------------
 include/mesos/scheduler/scheduler.proto    | 37 +++++++++-------
 include/mesos/v1/scheduler/scheduler.proto | 37 +++++++++-------
 src/cli/execute.cpp                        |  1 +
 src/examples/long_lived_framework.cpp      |  1 +
 src/examples/test_http_framework.cpp       |  5 +++
 src/internal/evolve.cpp                    | 15 ++++++-
 src/internal/evolve.hpp                    |  1 +
 src/master/master.cpp                      |  2 +-
 src/messages/messages.proto                | 19 +++++----
 src/sched/sched.cpp                        |  1 +
 src/tests/master_maintenance_tests.cpp     | 56 ++++++++++++-------------
 src/tests/mesos.hpp                        |  6 +++
 12 files changed, 115 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e9c3b6df/include/mesos/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler/scheduler.proto 
b/include/mesos/scheduler/scheduler.proto
index fd99992..b56e5dc 100644
--- a/include/mesos/scheduler/scheduler.proto
+++ b/include/mesos/scheduler/scheduler.proto
@@ -39,13 +39,14 @@ message Event {
     // in a backwards-compatible way. See: MESOS-4997.
     UNKNOWN = 0;
 
-    SUBSCRIBED = 1; // See 'Subscribed' below.
-    OFFERS = 2;     // See 'Offers' below.
-    RESCIND = 3;    // See 'Rescind' below.
-    UPDATE = 4;     // See 'Update' below.
-    MESSAGE = 5;    // See 'Message' below.
-    FAILURE = 6;    // See 'Failure' below.
-    ERROR = 7;      // See 'Error' below.
+    SUBSCRIBED = 1;     // See 'Subscribed' below.
+    OFFERS = 2;         // See 'Offers' below.
+    INVERSE_OFFERS = 9; // See 'InverseOffers' below.
+    RESCIND = 3;        // See 'Rescind' below.
+    UPDATE = 4;         // See 'Update' below.
+    MESSAGE = 5;        // See 'Message' below.
+    FAILURE = 6;        // See 'Failure' below.
+    ERROR = 7;          // See 'Error' below.
 
     // Periodic message sent by the Mesos master according to
     // 'Subscribed.heartbeat_interval_seconds'. If the scheduler does
@@ -67,15 +68,22 @@ message Event {
   }
 
   // Received whenever there are new resources that are offered to the
-  // scheduler or resources requested back from the scheduler. Each
-  // offer corresponds to a set of resources on a slave. Until the
-  // scheduler accepts or declines an offer the resources are
-  // considered allocated to the scheduler. Accepting or Declining an
-  // inverse offer informs the allocator of the scheduler's ability to
-  // release the resources without violating an SLA.
+  // scheduler. Each offer corresponds to a set of resources on an
+  // agent. Until the scheduler accepts or declines an offer the
+  // resources are considered allocated to the scheduler.
   message Offers {
     repeated Offer offers = 1;
-    repeated InverseOffer inverse_offers = 2;
+  }
+
+  // Received whenever there are resources requested back from the
+  // scheduler. Each inverse offer specifies the agent, and
+  // optionally specific resources. Accepting or Declining an inverse
+  // offer informs the allocator of the scheduler's ability to release
+  // the specified resources without violating an SLA. If no resources
+  // are specified then all resources on the agent are requested to be
+  // released.
+  message InverseOffers {
+    repeated InverseOffer inverse_offers = 1;
   }
 
   // Received when a particular offer is no longer valid (e.g., the
@@ -144,6 +152,7 @@ message Event {
 
   optional Subscribed subscribed = 2;
   optional Offers offers = 3;
+  optional InverseOffers inverse_offers = 9;
   optional Rescind rescind = 4;
   optional Update update = 5;
   optional Message message = 6;

http://git-wip-us.apache.org/repos/asf/mesos/blob/e9c3b6df/include/mesos/v1/scheduler/scheduler.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/scheduler/scheduler.proto 
b/include/mesos/v1/scheduler/scheduler.proto
index 131e1a4..0b44930 100644
--- a/include/mesos/v1/scheduler/scheduler.proto
+++ b/include/mesos/v1/scheduler/scheduler.proto
@@ -39,13 +39,14 @@ message Event {
     // in a backwards-compatible way. See: MESOS-4997.
     UNKNOWN = 0;
 
-    SUBSCRIBED = 1; // See 'Subscribed' below.
-    OFFERS = 2;     // See 'Offers' below.
-    RESCIND = 3;    // See 'Rescind' below.
-    UPDATE = 4;     // See 'Update' below.
-    MESSAGE = 5;    // See 'Message' below.
-    FAILURE = 6;    // See 'Failure' below.
-    ERROR = 7;      // See 'Error' below.
+    SUBSCRIBED = 1;     // See 'Subscribed' below.
+    OFFERS = 2;         // See 'Offers' below.
+    INVERSE_OFFERS = 9; // See 'InverseOffers' below.
+    RESCIND = 3;        // See 'Rescind' below.
+    UPDATE = 4;         // See 'Update' below.
+    MESSAGE = 5;        // See 'Message' below.
+    FAILURE = 6;        // See 'Failure' below.
+    ERROR = 7;          // See 'Error' below.
 
     // Periodic message sent by the Mesos master according to
     // 'Subscribed.heartbeat_interval_seconds'. If the scheduler does
@@ -67,15 +68,22 @@ message Event {
   }
 
   // Received whenever there are new resources that are offered to the
-  // scheduler or resources requested back from the scheduler. Each
-  // offer corresponds to a set of resources on an agent. Until the
-  // scheduler accepts or declines an offer the resources are
-  // considered allocated to the scheduler. Accepting or Declining an
-  // inverse offer informs the allocator of the scheduler's ability to
-  // release the resources without violating an SLA.
+  // scheduler. Each offer corresponds to a set of resources on an
+  // agent. Until the scheduler accepts or declines an offer the
+  // resources are considered allocated to the scheduler.
   message Offers {
     repeated Offer offers = 1;
-    repeated InverseOffer inverse_offers = 2;
+  }
+
+  // Received whenever there are resources requested back from the
+  // scheduler. Each inverse offer specifies the agent, and
+  // optionally specific resources. Accepting or Declining an inverse
+  // offer informs the allocator of the scheduler's ability to release
+  // the specified resources without violating an SLA. If no resources
+  // are specified then all resources on the agent are requested to be
+  // released.
+  message InverseOffers {
+    repeated InverseOffer inverse_offers = 1;
   }
 
   // Received when a particular offer is no longer valid (e.g., the
@@ -144,6 +152,7 @@ message Event {
 
   optional Subscribed subscribed = 2;
   optional Offers offers = 3;
+  optional InverseOffers inverse_offers = 9;
   optional Rescind rescind = 4;
   optional Update update = 5;
   optional Message message = 6;

http://git-wip-us.apache.org/repos/asf/mesos/blob/e9c3b6df/src/cli/execute.cpp
----------------------------------------------------------------------
diff --git a/src/cli/execute.cpp b/src/cli/execute.cpp
index 81a0388..95e1db9 100644
--- a/src/cli/execute.cpp
+++ b/src/cli/execute.cpp
@@ -446,6 +446,7 @@ protected:
         }
 
         case Event::HEARTBEAT:
+        case Event::INVERSE_OFFERS:
         case Event::FAILURE:
         case Event::RESCIND:
         case Event::MESSAGE: {

http://git-wip-us.apache.org/repos/asf/mesos/blob/e9c3b6df/src/examples/long_lived_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/long_lived_framework.cpp 
b/src/examples/long_lived_framework.cpp
index 8269e38..848849f 100644
--- a/src/examples/long_lived_framework.cpp
+++ b/src/examples/long_lived_framework.cpp
@@ -218,6 +218,7 @@ protected:
         }
 
         case Event::HEARTBEAT:
+        case Event::INVERSE_OFFERS:
         case Event::RESCIND:
         case Event::MESSAGE: {
           break;

http://git-wip-us.apache.org/repos/asf/mesos/blob/e9c3b6df/src/examples/test_http_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/test_http_framework.cpp 
b/src/examples/test_http_framework.cpp
index 6f70bbe..88e4495 100644
--- a/src/examples/test_http_framework.cpp
+++ b/src/examples/test_http_framework.cpp
@@ -126,6 +126,11 @@ public:
           break;
         }
 
+        case Event::INVERSE_OFFERS: {
+          cout << endl << "Received an INVERSE_OFFERS event" << endl;
+          break;
+        }
+
         case Event::RESCIND: {
           cout << endl << "Received a RESCIND event" << endl;
           break;

http://git-wip-us.apache.org/repos/asf/mesos/blob/e9c3b6df/src/internal/evolve.cpp
----------------------------------------------------------------------
diff --git a/src/internal/evolve.cpp b/src/internal/evolve.cpp
index 7d3d450..5e0be6b 100644
--- a/src/internal/evolve.cpp
+++ b/src/internal/evolve.cpp
@@ -192,7 +192,20 @@ v1::scheduler::Event evolve(const ResourceOffersMessage& 
message)
 
   v1::scheduler::Event::Offers* offers = event.mutable_offers();
   offers->mutable_offers()->CopyFrom(evolve<v1::Offer>(message.offers()));
-  offers->mutable_inverse_offers()->CopyFrom(evolve<v1::InverseOffer>(
+
+  return event;
+}
+
+
+v1::scheduler::Event evolve(const InverseOffersMessage& message)
+{
+  v1::scheduler::Event event;
+  event.set_type(v1::scheduler::Event::INVERSE_OFFERS);
+
+  v1::scheduler::Event::InverseOffers* inverse_offers =
+    event.mutable_inverse_offers();
+
+  inverse_offers->mutable_inverse_offers()->CopyFrom(evolve<v1::InverseOffer>(
       message.inverse_offers()));
 
   return event;

http://git-wip-us.apache.org/repos/asf/mesos/blob/e9c3b6df/src/internal/evolve.hpp
----------------------------------------------------------------------
diff --git a/src/internal/evolve.hpp b/src/internal/evolve.hpp
index 0bf0844..51c2d6c 100644
--- a/src/internal/evolve.hpp
+++ b/src/internal/evolve.hpp
@@ -83,6 +83,7 @@ v1::scheduler::Event evolve(const scheduler::Event& event);
 v1::scheduler::Event evolve(const FrameworkRegisteredMessage& message);
 v1::scheduler::Event evolve(const FrameworkReregisteredMessage& message);
 v1::scheduler::Event evolve(const ResourceOffersMessage& message);
+v1::scheduler::Event evolve(const InverseOffersMessage& message);
 v1::scheduler::Event evolve(const RescindResourceOfferMessage& message);
 v1::scheduler::Event evolve(const StatusUpdateMessage& message);
 v1::scheduler::Event evolve(const LostSlaveMessage& message);

http://git-wip-us.apache.org/repos/asf/mesos/blob/e9c3b6df/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index a6f740f..26e8d3c 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -5707,7 +5707,7 @@ void Master::inverseOffer(
   }
 
   // Create an inverse offer for each slave and add it to the message.
-  ResourceOffersMessage message;
+  InverseOffersMessage message;
 
   Framework* framework = CHECK_NOTNULL(frameworks.registered[frameworkId]);
   foreachpair (const SlaveID& slaveId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/e9c3b6df/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 05e6476..10e3776 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -212,14 +212,19 @@ message ResourceRequestMessage {
 message ResourceOffersMessage {
   repeated Offer offers = 1;
   repeated string pids = 2;
+}
+
 
-  // The `inverse_offers` field is added here because we currently use it in
-  // `master.cpp` when constructing the message to send to schedulers. We use
-  // the original version of the proto API until we do a full refactor of all
-  // the messages being sent.
-  // It is not fully implemented in the old scheduler; only the V1 scheduler
-  // currently implements inverse offers.
-  repeated InverseOffer inverse_offers = 3;
+/**
+ * Sends inverse offers to the scheduler.
+ * NOTE: This message is only sent through the V1 HTTP API. Driver
+ * based schedulers will not receive it.
+ *
+ * See scheduler::Event::InverseOffers.
+ */
+message InverseOffersMessage {
+  repeated InverseOffer inverse_offers = 1;
+  repeated string pids = 2;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/e9c3b6df/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 5eca265..2e2c16f 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -671,6 +671,7 @@ protected:
         break;
       }
 
+      case Event::INVERSE_OFFERS:
       case Event::HEARTBEAT: {
         break;
       }

http://git-wip-us.apache.org/repos/asf/mesos/blob/e9c3b6df/src/tests/master_maintenance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_maintenance_tests.cpp 
b/src/tests/master_maintenance_tests.cpp
index 971c447..32a7a90 100644
--- a/src/tests/master_maintenance_tests.cpp
+++ b/src/tests/master_maintenance_tests.cpp
@@ -434,9 +434,11 @@ TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest)
     .WillOnce(FutureSatisfy(&offerRescinded));
 
   Future<Event::Offers> unavailabilityOffers;
-  Future<Event::Offers> inverseOffers;
   EXPECT_CALL(*scheduler, offers(_, _))
-    .WillOnce(FutureArg<1>(&unavailabilityOffers))
+    .WillOnce(FutureArg<1>(&unavailabilityOffers));
+
+  Future<Event::InverseOffers> inverseOffers;
+  EXPECT_CALL(*scheduler, inverseOffers(_, _))
     .WillOnce(FutureArg<1>(&inverseOffers));
 
   // Schedule this slave for maintenance.
@@ -1172,7 +1174,6 @@ TEST_F(MasterMaintenanceTest, InverseOffers)
   AWAIT_READY(event);
   EXPECT_EQ(Event::OFFERS, event.get().type());
   EXPECT_NE(0, event.get().offers().offers().size());
-  EXPECT_EQ(0, event.get().offers().inverse_offers().size());
 
   // All the offers should have unavailability.
   foreach (const v1::Offer& offer, event.get().offers().offers()) {
@@ -1212,12 +1213,12 @@ TEST_F(MasterMaintenanceTest, InverseOffers)
   // Expect an inverse offer.
   event = events.get();
   AWAIT_READY(event);
-  EXPECT_EQ(Event::OFFERS, event.get().type());
-  EXPECT_EQ(0, event.get().offers().offers().size());
-  EXPECT_EQ(1, event.get().offers().inverse_offers().size());
+  EXPECT_EQ(Event::INVERSE_OFFERS, event.get().type());
+  EXPECT_EQ(1, event.get().inverse_offers().inverse_offers().size());
 
   // Save this inverse offer so we can decline it later.
-  v1::InverseOffer inverseOffer = event.get().offers().inverse_offers(0);
+  v1::InverseOffer inverseOffer =
+    event.get().inverse_offers().inverse_offers(0);
 
   // Wait for the task to start running.
   event = events.get();
@@ -1271,12 +1272,11 @@ TEST_F(MasterMaintenanceTest, InverseOffers)
   // Expect another inverse offer.
   event = events.get();
   AWAIT_READY(event);
-  EXPECT_EQ(Event::OFFERS, event.get().type());
+  EXPECT_EQ(Event::INVERSE_OFFERS, event.get().type());
   Clock::resume();
 
-  EXPECT_EQ(0, event.get().offers().offers().size());
-  EXPECT_EQ(1, event.get().offers().inverse_offers().size());
-  inverseOffer = event.get().offers().inverse_offers(0);
+  EXPECT_EQ(1, event.get().inverse_offers().inverse_offers().size());
+  inverseOffer = event.get().inverse_offers().inverse_offers(0);
 
   // Check that the status endpoint shows the inverse offer as declined.
   response = process::http::get(
@@ -1336,11 +1336,10 @@ TEST_F(MasterMaintenanceTest, InverseOffers)
   // Expect yet another inverse offer.
   event = events.get();
   AWAIT_READY(event);
-  EXPECT_EQ(Event::OFFERS, event.get().type());
+  EXPECT_EQ(Event::INVERSE_OFFERS, event.get().type());
   Clock::resume();
 
-  EXPECT_EQ(0, event.get().offers().offers().size());
-  EXPECT_EQ(1, event.get().offers().inverse_offers().size());
+  EXPECT_EQ(1, event.get().inverse_offers().inverse_offers().size());
 
   // Check that the status endpoint shows the inverse offer as accepted.
   response = process::http::get(
@@ -1517,7 +1516,6 @@ TEST_F(MasterMaintenanceTest, InverseOffersFilters)
   AWAIT_READY(event);
   EXPECT_EQ(Event::OFFERS, event.get().type());
   EXPECT_EQ(2, event.get().offers().offers().size());
-  EXPECT_EQ(0, event.get().offers().inverse_offers().size());
 
   // All the offers should have unavailability.
   foreach (const v1::Offer& offer, event.get().offers().offers()) {
@@ -1570,13 +1568,15 @@ TEST_F(MasterMaintenanceTest, InverseOffersFilters)
   // Expect two inverse offers.
   event = events.get();
   AWAIT_READY(event);
-  EXPECT_EQ(Event::OFFERS, event.get().type());
-  EXPECT_EQ(0, event.get().offers().offers().size());
-  EXPECT_EQ(2, event.get().offers().inverse_offers().size());
+  EXPECT_EQ(Event::INVERSE_OFFERS, event.get().type());
+  EXPECT_EQ(2, event.get().inverse_offers().inverse_offers().size());
 
   // Save these inverse offers.
-  v1::InverseOffer inverseOffer1 = event.get().offers().inverse_offers(0);
-  v1::InverseOffer inverseOffer2 = event.get().offers().inverse_offers(1);
+  v1::InverseOffer inverseOffer1 =
+    event.get().inverse_offers().inverse_offers(0);
+
+  v1::InverseOffer inverseOffer2 =
+    event.get().inverse_offers().inverse_offers(1);
 
   // We want to acknowledge TASK_RUNNING updates for the two tasks we
   // have launched. We don't know which task will be launched first,
@@ -1689,14 +1689,13 @@ TEST_F(MasterMaintenanceTest, InverseOffersFilters)
   event = events.get();
   AWAIT_READY(event);
 
-  EXPECT_EQ(Event::OFFERS, event.get().type());
-  EXPECT_EQ(0, event.get().offers().offers().size());
-  EXPECT_EQ(1, event.get().offers().inverse_offers().size());
+  EXPECT_EQ(Event::INVERSE_OFFERS, event.get().type());
+  EXPECT_EQ(1, event.get().inverse_offers().inverse_offers().size());
   EXPECT_EQ(
       inverseOffer1.agent_id(),
-      event.get().offers().inverse_offers(0).agent_id());
+      event.get().inverse_offers().inverse_offers(0).agent_id());
 
-  inverseOffer1 = event.get().offers().inverse_offers(0);
+  inverseOffer1 = event.get().inverse_offers().inverse_offers(0);
 
   updateInverseOffer =
     FUTURE_DISPATCH(_, &MesosAllocatorProcess::updateInverseOffer);
@@ -1725,12 +1724,11 @@ TEST_F(MasterMaintenanceTest, InverseOffersFilters)
   event = events.get();
   AWAIT_READY(event);
 
-  EXPECT_EQ(Event::OFFERS, event.get().type());
-  EXPECT_EQ(0, event.get().offers().offers().size());
-  EXPECT_EQ(1, event.get().offers().inverse_offers().size());
+  EXPECT_EQ(Event::INVERSE_OFFERS, event.get().type());
+  EXPECT_EQ(1, event.get().inverse_offers().inverse_offers().size());
   EXPECT_EQ(
       inverseOffer1.agent_id(),
-      event.get().offers().inverse_offers(0).agent_id());
+      event.get().inverse_offers().inverse_offers(0).agent_id());
 
   EXPECT_CALL(exec1, shutdown(_))
     .Times(AtMost(1));

http://git-wip-us.apache.org/repos/asf/mesos/blob/e9c3b6df/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 83c5d41..fef2bec 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -972,6 +972,9 @@ public:
   MOCK_METHOD1_T(heartbeat, void(Mesos*));
   MOCK_METHOD2_T(subscribed, void(Mesos*, const typename Event::Subscribed&));
   MOCK_METHOD2_T(offers, void(Mesos*, const typename Event::Offers&));
+  MOCK_METHOD2_T(
+      inverseOffers,
+      void(Mesos*, const typename Event::InverseOffers&));
   MOCK_METHOD2_T(rescind, void(Mesos*, const typename Event::Rescind&));
   MOCK_METHOD2_T(update, void(Mesos*, const typename Event::Update&));
   MOCK_METHOD2_T(message, void(Mesos*, const typename Event::Message&));
@@ -987,6 +990,9 @@ public:
       case Event::OFFERS:
         offers(mesos, event.offers());
         break;
+      case Event::INVERSE_OFFERS:
+        inverseOffers(mesos, event.inverse_offers());
+        break;
       case Event::RESCIND:
         rescind(mesos, event.rescind());
         break;

Reply via email to