Repository: mesos Updated Branches: refs/heads/master 1fdea7dec -> 06d2e23dc
Recover resources when offer is rescinded on DESTROY of shared volume. When a framework issues a DESTROY of a shared volume, and that volume is not in use by a running or a pending task, we rescind the pending offers in which the shared volume is present so that the deleted volume is not assigned to any task in a future ACCEPT call. At that time, we need to recover the resources as well for proper accounting of such resources by the allocator. Review: https://reviews.apache.org/r/52288/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/06d2e23d Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/06d2e23d Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/06d2e23d Branch: refs/heads/master Commit: 06d2e23dccc1ecc3c1d0e0cfb22ccca18bb6e56b Parents: 1fdea7d Author: Anindya Sinha <anindya_si...@apple.com> Authored: Wed Oct 12 00:20:07 2016 -0700 Committer: Jiang Yan Xu <xuj...@apple.com> Committed: Wed Oct 12 10:49:12 2016 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 6 ++ src/tests/persistent_volume_tests.cpp | 146 +++++++++++++++++++++++++++++ 2 files changed, 152 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/06d2e23d/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index ad8993a..7ef8987 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -3975,6 +3975,12 @@ void Master::_accept( const Resources& offered = offer->resources(); foreach (const Resource& volume, operation.destroy().volumes()) { if (offered.contains(volume)) { + allocator->recoverResources( + offer->framework_id(), + offer->slave_id(), + offer->resources(), + None()); + removeOffer(offer, true); } } http://git-wip-us.apache.org/repos/asf/mesos/blob/06d2e23d/src/tests/persistent_volume_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp index e10a79e..b7d1c1a 100644 --- a/src/tests/persistent_volume_tests.cpp +++ b/src/tests/persistent_volume_tests.cpp @@ -908,6 +908,8 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleTasks) FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; frameworkInfo.set_role(DEFAULT_TEST_ROLE); + frameworkInfo.add_capabilities()->set_type( + FrameworkInfo::Capability::SHARED_RESOURCES); MockScheduler sched; MesosSchedulerDriver driver( @@ -998,6 +1000,150 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleTasks) driver.join(); } +// This test verifies that pending offers with shared persistent volumes +// are rescinded when the volumes are destroyed. +TEST_P(PersistentVolumeTest, SharedPersistentVolumeRescindOnDestroy) +{ + Clock::pause(); + + master::Flags masterFlags = CreateMasterFlags(); + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + slave::Flags slaveFlags = CreateSlaveFlags(); + + slaveFlags.resources = getSlaveResources(); + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + // 1. Create framework1 so that all resources are offered to this framework. + FrameworkInfo frameworkInfo1 = DEFAULT_FRAMEWORK_INFO; + frameworkInfo1.set_role(DEFAULT_TEST_ROLE); + frameworkInfo1.add_capabilities()->set_type( + FrameworkInfo::Capability::SHARED_RESOURCES); + + MockScheduler sched1; + MesosSchedulerDriver driver1( + &sched1, frameworkInfo1, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched1, registered(&driver1, _, _)); + + Future<vector<Offer>> offers1; + EXPECT_CALL(sched1, resourceOffers(&driver1, _)) + .WillOnce(FutureArg<1>(&offers1)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver1.start(); + + AWAIT_READY(offers1); + EXPECT_FALSE(offers1.get().empty()); + + Offer offer1 = offers1.get()[0]; + + // 2. framework1 CREATEs a shared volume, and LAUNCHes a task with a subset + // of resources from the offer. + Resource volume = createPersistentVolume( + getDiskResource(Megabytes(2048)), + "id1", + "path1", + None(), + frameworkInfo1.principal(), + true); // Shared volume. + + // Create a task which uses a portion of the offered resources, so that + // the remaining resources can be offered to framework2. It's not important + // whether the volume is used (the task is killed soon and its purpose is + // only for splitting the offer). + TaskInfo task = createTask( + offer1.slave_id(), + Resources::parse("cpus:1;mem:128").get(), + "sleep 1000"); + + // Expect an offer containing the persistent volume. + EXPECT_CALL(sched1, resourceOffers(&driver1, _)) + .WillOnce(FutureArg<1>(&offers1)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + // We use a filter of 0 seconds so the resources will be available + // in the next allocation cycle. + Filters filters; + filters.set_refuse_seconds(0); + + driver1.acceptOffers( + {offer1.id()}, + {CREATE(volume), + LAUNCH({task})}, + filters); + + // Make sure the call is processed before framework2 registers. + Clock::settle(); + + // 3. Create framework2 of the same role. It would be offered resources + // recovered from the framework1 call. + FrameworkInfo frameworkInfo2 = DEFAULT_FRAMEWORK_INFO; + frameworkInfo2.set_role(DEFAULT_TEST_ROLE); + frameworkInfo2.add_capabilities()->set_type( + FrameworkInfo::Capability::SHARED_RESOURCES); + + MockScheduler sched2; + MesosSchedulerDriver driver2( + &sched2, frameworkInfo2, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched2, registered(&driver2, _, _)); + + Future<vector<Offer>> offers2; + EXPECT_CALL(sched2, resourceOffers(&driver2, _)) + .WillOnce(FutureArg<1>(&offers2)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver2.start(); + + AWAIT_READY(offers2); + + Offer offer2 = offers2.get()[0]; + + EXPECT_TRUE(Resources(offer2.resources()).contains(volume)); + + // 4. framework1 kills the task which results in an offer to framework1 + // with the shared volume. At this point, both frameworks will have + // the shared resource in their pending offers. + EXPECT_CALL(sched1, statusUpdate(_, _)) + .WillOnce(DoDefault()); + + driver1.killTask(task.task_id()); + + // Advance the clock until the allocator allocates + // the recovered resources. + Clock::advance(masterFlags.allocation_interval); + + AWAIT_READY(offers1); + + offer1 = offers1.get()[0]; + + EXPECT_TRUE(Resources(offer1.resources()).contains(volume)); + + // 5. DESTROY the shared volume via framework2 which would result in + // framework1 being rescinded the offer. + Future<Nothing> rescinded; + EXPECT_CALL(sched1, offerRescinded(&driver1, _)) + .WillOnce(FutureSatisfy(&rescinded)); + + driver2.acceptOffers( + {offer2.id()}, + {DESTROY(volume)}, + filters); + + AWAIT_READY(rescinded); + + driver1.stop(); + driver1.join(); + + driver2.stop(); + driver2.join(); +} + // This test verifies that persistent volumes are recovered properly // after the slave restarts. The idea is to launch a command which