V1 API: Split Resource offers and Inverse Offers. Review: https://reviews.apache.org/r/46756
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e9c3b6df Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e9c3b6df Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e9c3b6df Branch: refs/heads/master Commit: e9c3b6df59c2d968bb1e5ee5f2f1ac74d121a660 Parents: 79a98ef Author: Joris Van Remoortere <joris.van.remoort...@gmail.com> Authored: Tue Mar 29 16:38:40 2016 +0200 Committer: Joris Van Remoortere <joris.van.remoort...@gmail.com> Committed: Tue May 31 15:25:54 2016 -0700 ---------------------------------------------------------------------- include/mesos/scheduler/scheduler.proto | 37 +++++++++------- include/mesos/v1/scheduler/scheduler.proto | 37 +++++++++------- src/cli/execute.cpp | 1 + src/examples/long_lived_framework.cpp | 1 + src/examples/test_http_framework.cpp | 5 +++ src/internal/evolve.cpp | 15 ++++++- src/internal/evolve.hpp | 1 + src/master/master.cpp | 2 +- src/messages/messages.proto | 19 +++++---- src/sched/sched.cpp | 1 + src/tests/master_maintenance_tests.cpp | 56 ++++++++++++------------- src/tests/mesos.hpp | 6 +++ 12 files changed, 115 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/e9c3b6df/include/mesos/scheduler/scheduler.proto ---------------------------------------------------------------------- diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto index fd99992..b56e5dc 100644 --- a/include/mesos/scheduler/scheduler.proto +++ b/include/mesos/scheduler/scheduler.proto @@ -39,13 +39,14 @@ message Event { // in a backwards-compatible way. See: MESOS-4997. UNKNOWN = 0; - SUBSCRIBED = 1; // See 'Subscribed' below. - OFFERS = 2; // See 'Offers' below. - RESCIND = 3; // See 'Rescind' below. - UPDATE = 4; // See 'Update' below. - MESSAGE = 5; // See 'Message' below. - FAILURE = 6; // See 'Failure' below. - ERROR = 7; // See 'Error' below. + SUBSCRIBED = 1; // See 'Subscribed' below. + OFFERS = 2; // See 'Offers' below. + INVERSE_OFFERS = 9; // See 'InverseOffers' below. + RESCIND = 3; // See 'Rescind' below. + UPDATE = 4; // See 'Update' below. + MESSAGE = 5; // See 'Message' below. + FAILURE = 6; // See 'Failure' below. + ERROR = 7; // See 'Error' below. // Periodic message sent by the Mesos master according to // 'Subscribed.heartbeat_interval_seconds'. If the scheduler does @@ -67,15 +68,22 @@ message Event { } // Received whenever there are new resources that are offered to the - // scheduler or resources requested back from the scheduler. Each - // offer corresponds to a set of resources on a slave. Until the - // scheduler accepts or declines an offer the resources are - // considered allocated to the scheduler. Accepting or Declining an - // inverse offer informs the allocator of the scheduler's ability to - // release the resources without violating an SLA. + // scheduler. Each offer corresponds to a set of resources on an + // agent. Until the scheduler accepts or declines an offer the + // resources are considered allocated to the scheduler. message Offers { repeated Offer offers = 1; - repeated InverseOffer inverse_offers = 2; + } + + // Received whenever there are resources requested back from the + // scheduler. Each inverse offer specifies the agent, and + // optionally specific resources. Accepting or Declining an inverse + // offer informs the allocator of the scheduler's ability to release + // the specified resources without violating an SLA. If no resources + // are specified then all resources on the agent are requested to be + // released. + message InverseOffers { + repeated InverseOffer inverse_offers = 1; } // Received when a particular offer is no longer valid (e.g., the @@ -144,6 +152,7 @@ message Event { optional Subscribed subscribed = 2; optional Offers offers = 3; + optional InverseOffers inverse_offers = 9; optional Rescind rescind = 4; optional Update update = 5; optional Message message = 6; http://git-wip-us.apache.org/repos/asf/mesos/blob/e9c3b6df/include/mesos/v1/scheduler/scheduler.proto ---------------------------------------------------------------------- diff --git a/include/mesos/v1/scheduler/scheduler.proto b/include/mesos/v1/scheduler/scheduler.proto index 131e1a4..0b44930 100644 --- a/include/mesos/v1/scheduler/scheduler.proto +++ b/include/mesos/v1/scheduler/scheduler.proto @@ -39,13 +39,14 @@ message Event { // in a backwards-compatible way. See: MESOS-4997. UNKNOWN = 0; - SUBSCRIBED = 1; // See 'Subscribed' below. - OFFERS = 2; // See 'Offers' below. - RESCIND = 3; // See 'Rescind' below. - UPDATE = 4; // See 'Update' below. - MESSAGE = 5; // See 'Message' below. - FAILURE = 6; // See 'Failure' below. - ERROR = 7; // See 'Error' below. + SUBSCRIBED = 1; // See 'Subscribed' below. + OFFERS = 2; // See 'Offers' below. + INVERSE_OFFERS = 9; // See 'InverseOffers' below. + RESCIND = 3; // See 'Rescind' below. + UPDATE = 4; // See 'Update' below. + MESSAGE = 5; // See 'Message' below. + FAILURE = 6; // See 'Failure' below. + ERROR = 7; // See 'Error' below. // Periodic message sent by the Mesos master according to // 'Subscribed.heartbeat_interval_seconds'. If the scheduler does @@ -67,15 +68,22 @@ message Event { } // Received whenever there are new resources that are offered to the - // scheduler or resources requested back from the scheduler. Each - // offer corresponds to a set of resources on an agent. Until the - // scheduler accepts or declines an offer the resources are - // considered allocated to the scheduler. Accepting or Declining an - // inverse offer informs the allocator of the scheduler's ability to - // release the resources without violating an SLA. + // scheduler. Each offer corresponds to a set of resources on an + // agent. Until the scheduler accepts or declines an offer the + // resources are considered allocated to the scheduler. message Offers { repeated Offer offers = 1; - repeated InverseOffer inverse_offers = 2; + } + + // Received whenever there are resources requested back from the + // scheduler. Each inverse offer specifies the agent, and + // optionally specific resources. Accepting or Declining an inverse + // offer informs the allocator of the scheduler's ability to release + // the specified resources without violating an SLA. If no resources + // are specified then all resources on the agent are requested to be + // released. + message InverseOffers { + repeated InverseOffer inverse_offers = 1; } // Received when a particular offer is no longer valid (e.g., the @@ -144,6 +152,7 @@ message Event { optional Subscribed subscribed = 2; optional Offers offers = 3; + optional InverseOffers inverse_offers = 9; optional Rescind rescind = 4; optional Update update = 5; optional Message message = 6; http://git-wip-us.apache.org/repos/asf/mesos/blob/e9c3b6df/src/cli/execute.cpp ---------------------------------------------------------------------- diff --git a/src/cli/execute.cpp b/src/cli/execute.cpp index 81a0388..95e1db9 100644 --- a/src/cli/execute.cpp +++ b/src/cli/execute.cpp @@ -446,6 +446,7 @@ protected: } case Event::HEARTBEAT: + case Event::INVERSE_OFFERS: case Event::FAILURE: case Event::RESCIND: case Event::MESSAGE: { http://git-wip-us.apache.org/repos/asf/mesos/blob/e9c3b6df/src/examples/long_lived_framework.cpp ---------------------------------------------------------------------- diff --git a/src/examples/long_lived_framework.cpp b/src/examples/long_lived_framework.cpp index 8269e38..848849f 100644 --- a/src/examples/long_lived_framework.cpp +++ b/src/examples/long_lived_framework.cpp @@ -218,6 +218,7 @@ protected: } case Event::HEARTBEAT: + case Event::INVERSE_OFFERS: case Event::RESCIND: case Event::MESSAGE: { break; http://git-wip-us.apache.org/repos/asf/mesos/blob/e9c3b6df/src/examples/test_http_framework.cpp ---------------------------------------------------------------------- diff --git a/src/examples/test_http_framework.cpp b/src/examples/test_http_framework.cpp index 6f70bbe..88e4495 100644 --- a/src/examples/test_http_framework.cpp +++ b/src/examples/test_http_framework.cpp @@ -126,6 +126,11 @@ public: break; } + case Event::INVERSE_OFFERS: { + cout << endl << "Received an INVERSE_OFFERS event" << endl; + break; + } + case Event::RESCIND: { cout << endl << "Received a RESCIND event" << endl; break; http://git-wip-us.apache.org/repos/asf/mesos/blob/e9c3b6df/src/internal/evolve.cpp ---------------------------------------------------------------------- diff --git a/src/internal/evolve.cpp b/src/internal/evolve.cpp index 7d3d450..5e0be6b 100644 --- a/src/internal/evolve.cpp +++ b/src/internal/evolve.cpp @@ -192,7 +192,20 @@ v1::scheduler::Event evolve(const ResourceOffersMessage& message) v1::scheduler::Event::Offers* offers = event.mutable_offers(); offers->mutable_offers()->CopyFrom(evolve<v1::Offer>(message.offers())); - offers->mutable_inverse_offers()->CopyFrom(evolve<v1::InverseOffer>( + + return event; +} + + +v1::scheduler::Event evolve(const InverseOffersMessage& message) +{ + v1::scheduler::Event event; + event.set_type(v1::scheduler::Event::INVERSE_OFFERS); + + v1::scheduler::Event::InverseOffers* inverse_offers = + event.mutable_inverse_offers(); + + inverse_offers->mutable_inverse_offers()->CopyFrom(evolve<v1::InverseOffer>( message.inverse_offers())); return event; http://git-wip-us.apache.org/repos/asf/mesos/blob/e9c3b6df/src/internal/evolve.hpp ---------------------------------------------------------------------- diff --git a/src/internal/evolve.hpp b/src/internal/evolve.hpp index 0bf0844..51c2d6c 100644 --- a/src/internal/evolve.hpp +++ b/src/internal/evolve.hpp @@ -83,6 +83,7 @@ v1::scheduler::Event evolve(const scheduler::Event& event); v1::scheduler::Event evolve(const FrameworkRegisteredMessage& message); v1::scheduler::Event evolve(const FrameworkReregisteredMessage& message); v1::scheduler::Event evolve(const ResourceOffersMessage& message); +v1::scheduler::Event evolve(const InverseOffersMessage& message); v1::scheduler::Event evolve(const RescindResourceOfferMessage& message); v1::scheduler::Event evolve(const StatusUpdateMessage& message); v1::scheduler::Event evolve(const LostSlaveMessage& message); http://git-wip-us.apache.org/repos/asf/mesos/blob/e9c3b6df/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index a6f740f..26e8d3c 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -5707,7 +5707,7 @@ void Master::inverseOffer( } // Create an inverse offer for each slave and add it to the message. - ResourceOffersMessage message; + InverseOffersMessage message; Framework* framework = CHECK_NOTNULL(frameworks.registered[frameworkId]); foreachpair (const SlaveID& slaveId, http://git-wip-us.apache.org/repos/asf/mesos/blob/e9c3b6df/src/messages/messages.proto ---------------------------------------------------------------------- diff --git a/src/messages/messages.proto b/src/messages/messages.proto index 05e6476..10e3776 100644 --- a/src/messages/messages.proto +++ b/src/messages/messages.proto @@ -212,14 +212,19 @@ message ResourceRequestMessage { message ResourceOffersMessage { repeated Offer offers = 1; repeated string pids = 2; +} + - // The `inverse_offers` field is added here because we currently use it in - // `master.cpp` when constructing the message to send to schedulers. We use - // the original version of the proto API until we do a full refactor of all - // the messages being sent. - // It is not fully implemented in the old scheduler; only the V1 scheduler - // currently implements inverse offers. - repeated InverseOffer inverse_offers = 3; +/** + * Sends inverse offers to the scheduler. + * NOTE: This message is only sent through the V1 HTTP API. Driver + * based schedulers will not receive it. + * + * See scheduler::Event::InverseOffers. + */ +message InverseOffersMessage { + repeated InverseOffer inverse_offers = 1; + repeated string pids = 2; } http://git-wip-us.apache.org/repos/asf/mesos/blob/e9c3b6df/src/sched/sched.cpp ---------------------------------------------------------------------- diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp index 5eca265..2e2c16f 100644 --- a/src/sched/sched.cpp +++ b/src/sched/sched.cpp @@ -671,6 +671,7 @@ protected: break; } + case Event::INVERSE_OFFERS: case Event::HEARTBEAT: { break; } http://git-wip-us.apache.org/repos/asf/mesos/blob/e9c3b6df/src/tests/master_maintenance_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_maintenance_tests.cpp b/src/tests/master_maintenance_tests.cpp index 971c447..32a7a90 100644 --- a/src/tests/master_maintenance_tests.cpp +++ b/src/tests/master_maintenance_tests.cpp @@ -434,9 +434,11 @@ TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest) .WillOnce(FutureSatisfy(&offerRescinded)); Future<Event::Offers> unavailabilityOffers; - Future<Event::Offers> inverseOffers; EXPECT_CALL(*scheduler, offers(_, _)) - .WillOnce(FutureArg<1>(&unavailabilityOffers)) + .WillOnce(FutureArg<1>(&unavailabilityOffers)); + + Future<Event::InverseOffers> inverseOffers; + EXPECT_CALL(*scheduler, inverseOffers(_, _)) .WillOnce(FutureArg<1>(&inverseOffers)); // Schedule this slave for maintenance. @@ -1172,7 +1174,6 @@ TEST_F(MasterMaintenanceTest, InverseOffers) AWAIT_READY(event); EXPECT_EQ(Event::OFFERS, event.get().type()); EXPECT_NE(0, event.get().offers().offers().size()); - EXPECT_EQ(0, event.get().offers().inverse_offers().size()); // All the offers should have unavailability. foreach (const v1::Offer& offer, event.get().offers().offers()) { @@ -1212,12 +1213,12 @@ TEST_F(MasterMaintenanceTest, InverseOffers) // Expect an inverse offer. event = events.get(); AWAIT_READY(event); - EXPECT_EQ(Event::OFFERS, event.get().type()); - EXPECT_EQ(0, event.get().offers().offers().size()); - EXPECT_EQ(1, event.get().offers().inverse_offers().size()); + EXPECT_EQ(Event::INVERSE_OFFERS, event.get().type()); + EXPECT_EQ(1, event.get().inverse_offers().inverse_offers().size()); // Save this inverse offer so we can decline it later. - v1::InverseOffer inverseOffer = event.get().offers().inverse_offers(0); + v1::InverseOffer inverseOffer = + event.get().inverse_offers().inverse_offers(0); // Wait for the task to start running. event = events.get(); @@ -1271,12 +1272,11 @@ TEST_F(MasterMaintenanceTest, InverseOffers) // Expect another inverse offer. event = events.get(); AWAIT_READY(event); - EXPECT_EQ(Event::OFFERS, event.get().type()); + EXPECT_EQ(Event::INVERSE_OFFERS, event.get().type()); Clock::resume(); - EXPECT_EQ(0, event.get().offers().offers().size()); - EXPECT_EQ(1, event.get().offers().inverse_offers().size()); - inverseOffer = event.get().offers().inverse_offers(0); + EXPECT_EQ(1, event.get().inverse_offers().inverse_offers().size()); + inverseOffer = event.get().inverse_offers().inverse_offers(0); // Check that the status endpoint shows the inverse offer as declined. response = process::http::get( @@ -1336,11 +1336,10 @@ TEST_F(MasterMaintenanceTest, InverseOffers) // Expect yet another inverse offer. event = events.get(); AWAIT_READY(event); - EXPECT_EQ(Event::OFFERS, event.get().type()); + EXPECT_EQ(Event::INVERSE_OFFERS, event.get().type()); Clock::resume(); - EXPECT_EQ(0, event.get().offers().offers().size()); - EXPECT_EQ(1, event.get().offers().inverse_offers().size()); + EXPECT_EQ(1, event.get().inverse_offers().inverse_offers().size()); // Check that the status endpoint shows the inverse offer as accepted. response = process::http::get( @@ -1517,7 +1516,6 @@ TEST_F(MasterMaintenanceTest, InverseOffersFilters) AWAIT_READY(event); EXPECT_EQ(Event::OFFERS, event.get().type()); EXPECT_EQ(2, event.get().offers().offers().size()); - EXPECT_EQ(0, event.get().offers().inverse_offers().size()); // All the offers should have unavailability. foreach (const v1::Offer& offer, event.get().offers().offers()) { @@ -1570,13 +1568,15 @@ TEST_F(MasterMaintenanceTest, InverseOffersFilters) // Expect two inverse offers. event = events.get(); AWAIT_READY(event); - EXPECT_EQ(Event::OFFERS, event.get().type()); - EXPECT_EQ(0, event.get().offers().offers().size()); - EXPECT_EQ(2, event.get().offers().inverse_offers().size()); + EXPECT_EQ(Event::INVERSE_OFFERS, event.get().type()); + EXPECT_EQ(2, event.get().inverse_offers().inverse_offers().size()); // Save these inverse offers. - v1::InverseOffer inverseOffer1 = event.get().offers().inverse_offers(0); - v1::InverseOffer inverseOffer2 = event.get().offers().inverse_offers(1); + v1::InverseOffer inverseOffer1 = + event.get().inverse_offers().inverse_offers(0); + + v1::InverseOffer inverseOffer2 = + event.get().inverse_offers().inverse_offers(1); // We want to acknowledge TASK_RUNNING updates for the two tasks we // have launched. We don't know which task will be launched first, @@ -1689,14 +1689,13 @@ TEST_F(MasterMaintenanceTest, InverseOffersFilters) event = events.get(); AWAIT_READY(event); - EXPECT_EQ(Event::OFFERS, event.get().type()); - EXPECT_EQ(0, event.get().offers().offers().size()); - EXPECT_EQ(1, event.get().offers().inverse_offers().size()); + EXPECT_EQ(Event::INVERSE_OFFERS, event.get().type()); + EXPECT_EQ(1, event.get().inverse_offers().inverse_offers().size()); EXPECT_EQ( inverseOffer1.agent_id(), - event.get().offers().inverse_offers(0).agent_id()); + event.get().inverse_offers().inverse_offers(0).agent_id()); - inverseOffer1 = event.get().offers().inverse_offers(0); + inverseOffer1 = event.get().inverse_offers().inverse_offers(0); updateInverseOffer = FUTURE_DISPATCH(_, &MesosAllocatorProcess::updateInverseOffer); @@ -1725,12 +1724,11 @@ TEST_F(MasterMaintenanceTest, InverseOffersFilters) event = events.get(); AWAIT_READY(event); - EXPECT_EQ(Event::OFFERS, event.get().type()); - EXPECT_EQ(0, event.get().offers().offers().size()); - EXPECT_EQ(1, event.get().offers().inverse_offers().size()); + EXPECT_EQ(Event::INVERSE_OFFERS, event.get().type()); + EXPECT_EQ(1, event.get().inverse_offers().inverse_offers().size()); EXPECT_EQ( inverseOffer1.agent_id(), - event.get().offers().inverse_offers(0).agent_id()); + event.get().inverse_offers().inverse_offers(0).agent_id()); EXPECT_CALL(exec1, shutdown(_)) .Times(AtMost(1)); http://git-wip-us.apache.org/repos/asf/mesos/blob/e9c3b6df/src/tests/mesos.hpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp index 83c5d41..fef2bec 100644 --- a/src/tests/mesos.hpp +++ b/src/tests/mesos.hpp @@ -972,6 +972,9 @@ public: MOCK_METHOD1_T(heartbeat, void(Mesos*)); MOCK_METHOD2_T(subscribed, void(Mesos*, const typename Event::Subscribed&)); MOCK_METHOD2_T(offers, void(Mesos*, const typename Event::Offers&)); + MOCK_METHOD2_T( + inverseOffers, + void(Mesos*, const typename Event::InverseOffers&)); MOCK_METHOD2_T(rescind, void(Mesos*, const typename Event::Rescind&)); MOCK_METHOD2_T(update, void(Mesos*, const typename Event::Update&)); MOCK_METHOD2_T(message, void(Mesos*, const typename Event::Message&)); @@ -987,6 +990,9 @@ public: case Event::OFFERS: offers(mesos, event.offers()); break; + case Event::INVERSE_OFFERS: + inverseOffers(mesos, event.inverse_offers()); + break; case Event::RESCIND: rescind(mesos, event.rescind()); break;