Added tests for `GROW_VOLUME` and `SHRINK_VOLUME` operations. Review: https://reviews.apache.org/r/66220/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/274f2e68 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/274f2e68 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/274f2e68 Branch: refs/heads/master Commit: 274f2e6804b3f9322ae9e19c2c0acecd179cbd10 Parents: fa1ca07 Author: Zhitao Li <zhitaoli...@gmail.com> Authored: Thu May 3 17:04:48 2018 -0700 Committer: Chun-Hung Hsiao <chhs...@mesosphere.io> Committed: Thu May 3 17:04:48 2018 -0700 ---------------------------------------------------------------------- src/tests/persistent_volume_tests.cpp | 537 +++++++++++++++++++++++++++++ 1 file changed, 537 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/274f2e68/src/tests/persistent_volume_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp index c3de96e..5b2a23c 100644 --- a/src/tests/persistent_volume_tests.cpp +++ b/src/tests/persistent_volume_tests.cpp @@ -448,6 +448,543 @@ TEST_P(PersistentVolumeTest, CreateAndDestroyPersistentVolumes) } +// This test verifies that a framework can grow a persistent volume and receive +// the grown volume in further offers. +TEST_P(PersistentVolumeTest, GrowVolume) +{ + if (GetParam() == MOUNT) { + // It is not possible to have a valid `GrowVolume` on a MOUNT disk because + // the volume must use up all disk space at `Create` and no space will be + // left for `addition`. Therefore we skip this test. + // TODO(zhitao): Make MOUNT a meaningful parameter value for this test, or + // create a new fixture to avoid testing against it. + return; + } + + Clock::pause(); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE); + + // Create a master. + master::Flags masterFlags = CreateMasterFlags(); + + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + slave::Flags slaveFlags = CreateSlaveFlags(); + slaveFlags.resources = getSlaveResources(); + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + Future<vector<Offer>> offersBeforeCreate; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offersBeforeCreate)); + + driver.start(); + + Clock::advance(masterFlags.allocation_interval); + + AWAIT_READY(offersBeforeCreate); + ASSERT_FALSE(offersBeforeCreate->empty()); + + Offer offer = offersBeforeCreate->at(0); + + // The disk spaces will be merged if the fixture parameter is `NONE`. + Bytes totalBytes = GetParam() == NONE ? Megabytes(4096) : Megabytes(2048); + + Bytes additionBytes = Megabytes(512); + + // Construct a persistent volume which does not use up all disk resources. + Resource volume = createPersistentVolume( + getDiskResource(totalBytes - additionBytes, 1), + "id1", + "path1", + None(), + frameworkInfo.principal()); + + Resource addition = getDiskResource(additionBytes, 1); + + Resource grownVolume = createPersistentVolume( + getDiskResource(totalBytes, 1), + "id1", + "path1", + None(), + frameworkInfo.principal()); + + Future<vector<Offer>> offersBeforeGrow; + + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offersBeforeGrow)); + + // We use the filter explicitly here so that the resources will not + // be filtered for 5 seconds (the default). + Filters filters; + filters.set_refuse_seconds(0); + + // Create the persistent volume. + driver.acceptOffers( + {offer.id()}, + {CREATE(volume)}, + filters); + + Clock::settle(); + Clock::advance(masterFlags.allocation_interval); + + AWAIT_READY(offersBeforeGrow); + ASSERT_FALSE(offersBeforeGrow->empty()); + + offer = offersBeforeGrow->at(0); + + EXPECT_EQ( + allocatedResources(Resources(volume), frameworkInfo.roles(0)), + Resources(offer.resources()).persistentVolumes()); + + EXPECT_TRUE( + Resources(offer.resources()).contains( + allocatedResources(addition, frameworkInfo.roles(0)))); + + // Make sure the volume exists, and leave a non-empty file there. + string volumePath = slave::paths::getPersistentVolumePath( + slaveFlags.work_dir, volume); + + EXPECT_TRUE(os::exists(volumePath)); + string filePath = path::join(volumePath, "file"); + ASSERT_SOME(os::write(filePath, "abc")); + + Future<vector<Offer>> offersAfterGrow; + + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offersAfterGrow)); + + // Grow the volume. + driver.acceptOffers( + {offer.id()}, + {GROW_VOLUME(volume, addition)}, + filters); + + Clock::settle(); + Clock::advance(masterFlags.allocation_interval); + + AWAIT_READY(offersAfterGrow); + ASSERT_FALSE(offersAfterGrow->empty()); + + EXPECT_TRUE(os::exists(volumePath)); + EXPECT_SOME_EQ("abc", os::read(filePath)); + + offer = offersAfterGrow->at(0); + + EXPECT_EQ( + allocatedResources(Resources(grownVolume), frameworkInfo.roles(0)), + Resources(offer.resources()).persistentVolumes()); + + EXPECT_FALSE( + Resources(offer.resources()).contains( + allocatedResources(addition, frameworkInfo.roles(0)))); + + Clock::resume(); + + driver.stop(); + driver.join(); +} + + +// This test verifies that a framework can shrink a persistent volume and see +// the shrunk volume in further offers. +TEST_P(PersistentVolumeTest, ShrinkVolume) +{ + Clock::pause(); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE); + + // Create a master. + master::Flags masterFlags = CreateMasterFlags(); + + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + slave::Flags slaveFlags = CreateSlaveFlags(); + slaveFlags.resources = getSlaveResources(); + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + Future<vector<Offer>> offersBeforeCreate; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offersBeforeCreate)); + + driver.start(); + + Clock::advance(masterFlags.allocation_interval); + + AWAIT_READY(offersBeforeCreate); + ASSERT_FALSE(offersBeforeCreate->empty()); + + Offer offer = offersBeforeCreate->at(0); + + // The disk spaces will be merged if the fixture parameter is `NONE`. + Bytes totalBytes = GetParam() == NONE ? Megabytes(4096) : Megabytes(2048); + + Bytes shrinkBytes = Megabytes(512); + + // Construct a persistent volume which uses up all disk resources. + Resource volume = createPersistentVolume( + getDiskResource(totalBytes, 1), + "id1", + "path1", + None(), + frameworkInfo.principal()); + + Resource subtract = getDiskResource(shrinkBytes, 1); + + Resource shrunkVolume = createPersistentVolume( + getDiskResource(totalBytes - shrinkBytes, 1), + "id1", + "path1", + None(), + frameworkInfo.principal()); + + Future<vector<Offer>> offersBeforeShrink; + + // Expect an offer containing the original volume. + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offersBeforeShrink)); + + // We use the filter explicitly here so that the resources will not + // be filtered for 5 seconds (the default). + Filters filters; + filters.set_refuse_seconds(0); + + // Create the persistent volume. + driver.acceptOffers( + {offer.id()}, + {CREATE(volume)}, + filters); + + Clock::settle(); + Clock::advance(masterFlags.allocation_interval); + + AWAIT_READY(offersBeforeShrink); + ASSERT_FALSE(offersBeforeShrink->empty()); + + offer = offersBeforeShrink->at(0); + + EXPECT_EQ( + allocatedResources(Resources(volume), frameworkInfo.roles(0)), + Resources(offer.resources()).persistentVolumes()); + + EXPECT_FALSE( + Resources(offer.resources()).contains( + allocatedResources(subtract, frameworkInfo.roles(0)))); + + // Make sure the volume exists, and leaves a non-empty file there. + string volumePath = slave::paths::getPersistentVolumePath( + slaveFlags.work_dir, volume); + + EXPECT_TRUE(os::exists(volumePath)); + string filePath = path::join(volumePath, "file"); + ASSERT_SOME(os::write(filePath, "abc")); + + Future<vector<Offer>> offersAfterShrink; + + // Expect an offer containing shrunk volume. + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offersAfterShrink)); + + driver.acceptOffers( + {offer.id()}, + {SHRINK_VOLUME(volume, subtract.scalar())}, + filters); + + Clock::settle(); + Clock::advance(masterFlags.allocation_interval); + + AWAIT_READY(offersAfterShrink); + ASSERT_FALSE(offersAfterShrink->empty()); + + EXPECT_TRUE(os::exists(volumePath)); + EXPECT_SOME_EQ("abc", os::read(filePath)); + + offer = offersAfterShrink->at(0); + + if (GetParam() != MOUNT) { + EXPECT_EQ( + allocatedResources(Resources(shrunkVolume), frameworkInfo.roles(0)), + Resources(offer.resources()).persistentVolumes()); + + EXPECT_TRUE( + Resources(offer.resources()).contains( + allocatedResources(subtract, frameworkInfo.roles(0)))); + } else { + EXPECT_EQ( + allocatedResources(Resources(volume), frameworkInfo.roles(0)), + Resources(offer.resources()).persistentVolumes()); + + EXPECT_FALSE( + Resources(offer.resources()).contains( + allocatedResources(subtract, frameworkInfo.roles(0)))); + } + + Clock::resume(); + + driver.stop(); + driver.join(); +} + + +// This test verifies that a subsequent `LAUNCH` depending on a grown volume +// will be dropped because we intend to keep `GROW_VOLUME` non-speculative. +TEST_P(PersistentVolumeTest, NonSpeculativeGrowAndLaunch) +{ + if (GetParam() == MOUNT) { + // It is not possible to have a valid `GrowVolume` on a MOUNT disk because + // the volume must use up all disk space at `Create` and no space will be + // left for `addition`. Therefore we skip this test. + // TODO(zhitao): Make MOUNT a meaningful parameter value for this test, or + // create a new fixture to avoid testing against it. + return; + } + + Clock::pause(); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE); + + // Create a master. + master::Flags masterFlags = CreateMasterFlags(); + + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + slave::Flags slaveFlags = CreateSlaveFlags(); + slaveFlags.resources = getSlaveResources(); + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + Future<vector<Offer>> offersBeforeOperations; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offersBeforeOperations)); + + driver.start(); + + Clock::settle(); + Clock::advance(masterFlags.allocation_interval); + + AWAIT_READY(offersBeforeOperations); + ASSERT_FALSE(offersBeforeOperations->empty()); + + Offer offer = offersBeforeOperations->at(0); + + // Disk spaces will be merged if fixture parameter is `NONE`. + Bytes totalBytes = GetParam() == NONE ? Megabytes(4096) : Megabytes(2048); + + Bytes additionBytes = Megabytes(512); + + // Construct a persistent volume which do not use up all disk resources. + Resource volume = createPersistentVolume( + getDiskResource(totalBytes - additionBytes, 1), + "id1", + "path1", + None(), + frameworkInfo.principal()); + + Resource addition = getDiskResource(additionBytes, 1); + + Resource grownVolume = createPersistentVolume( + getDiskResource(totalBytes, 1), + "id1", + "path1", + None(), + frameworkInfo.principal()); + + Future<TaskStatus> taskError; + + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&taskError)); + + TaskInfo task = createTask( + offer.slave_id(), + Resources::parse("cpus:1;mem:128").get() + grownVolume, + "echo abc > path1/file"); + + Future<vector<Offer>> offersAfterOperations; + + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offersAfterOperations)); + + // We use the filter explicitly here so that the resources will not + // be filtered for 5 seconds (the default). + Filters filters; + filters.set_refuse_seconds(0); + + // Create and grow will succeed, but launch will be droppd with + // `TASK_ERROR`. + driver.acceptOffers( + {offer.id()}, + {CREATE(volume), GROW_VOLUME(volume, addition), LAUNCH({task})}, + filters); + + AWAIT_READY(taskError); + EXPECT_EQ(task.task_id(), taskError->task_id()); + EXPECT_EQ(TASK_ERROR, taskError->state()); + + Clock::settle(); + Clock::advance(masterFlags.allocation_interval); + + AWAIT_READY(offersAfterOperations); + ASSERT_FALSE(offersAfterOperations->empty()); + + offer = offersAfterOperations->at(0); + + EXPECT_EQ( + allocatedResources(Resources(grownVolume), frameworkInfo.roles(0)), + Resources(offer.resources()).persistentVolumes()); + + Clock::resume(); + + driver.stop(); + driver.join(); +} + + +// This test verifies that a subsequent `LAUNCH` depends on a shrunk volume +// will be dropped because we intend to keep `SHRINK_VOLUME` non-speculative. +TEST_P(PersistentVolumeTest, NonSpeculativeShrinkAndLaunch) +{ + if (GetParam() == MOUNT) { + // It is not possible to have a valid `GrowVolume` on a MOUNT disk because + // the volume must use up all disk space at `Create` and no space will be + // left for `addition`. Therefore we skip this test. + // TODO(zhitao): Make MOUNT a meaningful parameter value for this test, or + // create a new fixture to avoid testing against it. + return; + } + + Clock::pause(); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE); + + // Create a master. + master::Flags masterFlags = CreateMasterFlags(); + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + slave::Flags slaveFlags = CreateSlaveFlags(); + slaveFlags.resources = getSlaveResources(); + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + Future<vector<Offer>> offersBeforeOperations; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offersBeforeOperations)); + + driver.start(); + + Clock::advance(masterFlags.allocation_interval); + + AWAIT_READY(offersBeforeOperations); + ASSERT_FALSE(offersBeforeOperations->empty()); + + Offer offer = offersBeforeOperations->at(0); + + Resource volume = createPersistentVolume( + getDiskResource(Megabytes(1024), 1), + "id1", + "path1", + None(), + frameworkInfo.principal()); + + Value::Scalar subtract; + subtract.set_value(512); + + Resource shrunkVolume = createPersistentVolume( + getDiskResource(Megabytes(512), 1), + "id1", + "path1", + None(), + frameworkInfo.principal()); + + Future<TaskStatus> taskError; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&taskError)); + + TaskInfo task = createTask( + offer.slave_id(), + Resources::parse("cpus:1;mem:128").get() + shrunkVolume, + "echo abc > path1/file"); + + Future<vector<Offer>> offersAfterOperations; + + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offersAfterOperations)); + + // We use the filter explicitly here so that the resources will not + // be filtered for 5 seconds (the default). + Filters filters; + filters.set_refuse_seconds(0); + + // Create and shrink volume will succeed, but launch will be dropped with + // `TASK_ERROR`. + driver.acceptOffers( + {offer.id()}, + {CREATE(volume), SHRINK_VOLUME(volume, subtract), LAUNCH({task})}, + filters); + + AWAIT_READY(taskError); + EXPECT_EQ(task.task_id(), taskError->task_id()); + EXPECT_EQ(TASK_ERROR, taskError->state()); + + Clock::settle(); + Clock::advance(masterFlags.allocation_interval); + + AWAIT_READY(offersAfterOperations); + ASSERT_FALSE(offersAfterOperations->empty()); + offer = offersAfterOperations->at(0); + + EXPECT_EQ( + allocatedResources(Resources(shrunkVolume), frameworkInfo.roles(0)), + Resources(offer.resources()).persistentVolumes()); + + Clock::resume(); + + driver.stop(); + driver.join(); +} + + // This test verifies that the slave checkpoints the resources for // persistent volumes to the disk, recovers them upon restart, and // sends them to the master during re-registration.