This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 18bc6c95a67e2ac2dd4d5557608d75b7fb01d383 Author: Chun-Hung Hsiao <chhs...@mesosphere.io> AuthorDate: Mon Feb 11 20:52:26 2019 -0800 Added SLRP unit tests for destroying unpublished persistent volumes. This patch adds 3 unit tests: `DestroyUnpublishedPersistentVolume`, `DestroyUnpublishedPersistentVolumeWithRecovery`, and `DestroyUnpublishedPersistentVolumeWithReboot` to test that the SLRP is resilient to misbehaved CSI plugins that fail to publish volumes. Review: https://reviews.apache.org/r/69955 --- .../storage_local_resource_provider_tests.cpp | 667 +++++++++++++++++++++ 1 file changed, 667 insertions(+) diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp index efc03c2..a2d2705 100644 --- a/src/tests/storage_local_resource_provider_tests.cpp +++ b/src/tests/storage_local_resource_provider_tests.cpp @@ -416,6 +416,46 @@ public: UNREACHABLE(); } + // Set up an expected `NodePublishVolume` CSI call for a given mock CSI + // plugin. When the call is made to the mock plugin, `result` will be + // responded. When the response is received by the volume manager, the + // returned future will be satisfied. + Future<Nothing> futureNodePublishVolumeCall( + MockCSIPlugin* plugin, const Try<Nothing, StatusError>& result) + { + if (GetParam() == csi::v0::API_VERSION) { + EXPECT_CALL(*plugin, NodePublishVolume( + _, _, A<csi::v0::NodePublishVolumeResponse*>())) + .WillOnce(Invoke([result]( + grpc::ServerContext* context, + const csi::v0::NodePublishVolumeRequest* request, + csi::v0::NodePublishVolumeResponse* response) { + return result.isError() ? result.error().status : grpc::Status::OK; + })); + + return FUTURE_DISPATCH(_, &csi::v0::VolumeManagerProcess::__call< + csi::v0::NodePublishVolumeResponse>); + } else if (GetParam() == csi::v1::API_VERSION) { + EXPECT_CALL(*plugin, NodePublishVolume( + _, _, A<csi::v1::NodePublishVolumeResponse*>())) + .WillOnce(Invoke([result]( + grpc::ServerContext* context, + const csi::v1::NodePublishVolumeRequest* request, + csi::v1::NodePublishVolumeResponse* response) { + return result.isError() ? result.error().status : grpc::Status::OK; + })); + + return FUTURE_DISPATCH(_, &csi::v1::VolumeManagerProcess::__call< + csi::v1::NodePublishVolumeResponse>); + } + + // This extra closure is necessary in order to use `FAIL` as it requires a + // void return type. + [&] { FAIL() << "Unsupported CSI API version " << GetParam(); }(); + + UNREACHABLE(); + } + // Create a JSON string representing a disk profile mapping containing the // given profile-parameter pairs. static string createDiskProfileMapping( @@ -3130,6 +3170,633 @@ TEST_P(StorageLocalResourceProviderTest, CreatePersistentBlockVolume) } +// This test verifies that if a persistent volumes is never published by the +// storage local resource provider, the volume can be destroyed. +// +// To accomplish this: +// 1. Create a MOUNT disk from a RAW disk resource. +// 2. Create a persistent volume on the MOUNT disk then launches a task to +// write a file into it. +// 3. Return `UNIMPLEMENTED` for the `NodePublishVolume` call. The task will +// fail to launch. +// 4. Destroy the persistent volume and the MOUNT disk. +TEST_P(StorageLocalResourceProviderTest, DestroyUnpublishedPersistentVolume) +{ + const string profilesPath = path::join(sandbox.get(), "profiles.json"); + + ASSERT_SOME( + os::write(profilesPath, createDiskProfileMapping({{"test", None()}}))); + + loadUriDiskProfileAdaptorModule(profilesPath); + + const string mockCsiEndpoint = + "unix://" + path::join(sandbox.get(), "mock_csi.sock"); + + MockCSIPlugin plugin; + ASSERT_SOME(plugin.startup(mockCsiEndpoint)); + + setupResourceProviderConfig(Bytes(0), None(), None(), mockCsiEndpoint); + + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + slave::Flags slaveFlags = CreateSlaveFlags(); + slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME; + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + AWAIT_READY(slaveRegisteredMessage); + + // Register a framework to exercise operations. + FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO; + framework.set_roles(0, "storage"); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + // We use the following filter to filter offers that do not have wanted + // resources for 365 days (the maximum). + Filters declineFilters; + declineFilters.set_refuse_seconds(Days(365).secs()); + + // Decline unwanted offers. The master can send such offers before the + // resource provider receives profile updates. + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillRepeatedly(DeclineOffers(declineFilters)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( + std::bind(isStoragePool<Resource>, lambda::_1, "test")))) + .WillOnce(FutureArg<1>(&offers)); + + driver.start(); + + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); + + Offer offer = offers->at(0); + + // Create a MOUNT disk. + Resource raw = *Resources(offer.resources()) + .filter(std::bind(isStoragePool<Resource>, lambda::_1, "test")) + .begin(); + + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( + std::bind(isMountDisk<Resource>, lambda::_1, "test")))) + .WillOnce(FutureArg<1>(&offers)); + + driver.acceptOffers( + {offer.id()}, {CREATE_DISK(raw, Resource::DiskInfo::Source::MOUNT)}); + + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); + + offer = offers->at(0); + + Resource created = *Resources(offer.resources()) + .filter(std::bind(isMountDisk<Resource>, lambda::_1, "test")) + .begin(); + + // Create a persistent MOUNT volume then launch a task to write a file. + Resource persistentVolume = created; + persistentVolume.mutable_disk()->mutable_persistence() + ->set_id(id::UUID::random().toString()); + persistentVolume.mutable_disk()->mutable_persistence() + ->set_principal(framework.principal()); + persistentVolume.mutable_disk()->mutable_volume() + ->set_container_path("volume"); + persistentVolume.mutable_disk()->mutable_volume()->set_mode(Volume::RW); + + Future<Nothing> taskFailed; + EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_FAILED))) + .WillOnce(FutureSatisfy(&taskFailed)); + + EXPECT_CALL( + sched, resourceOffers(&driver, OffersHaveResource(persistentVolume))) + .WillOnce(FutureArg<1>(&offers)); + + // Fail resource publishing. + Future<Nothing> nodePublishVolumeCall = futureNodePublishVolumeCall( + &plugin, StatusError(grpc::Status(grpc::UNIMPLEMENTED, ""))); + + driver.acceptOffers( + {offer.id()}, + {CREATE(persistentVolume), + LAUNCH({createTask( + offer.slave_id(), + persistentVolume, + createCommandInfo("touch " + path::join("volume", "file")))})}); + + AWAIT_READY(nodePublishVolumeCall); + + AWAIT_READY(taskFailed); + + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); + + offer = offers->at(0); + + // Destroy the persistent volume and the MOUNT disk. + // + // NOTE: The order of the two `FUTURE_PROTOBUF`s is reversed because + // Google Mock will search the expectations in reverse order. + Future<UpdateOperationStatusMessage> destroyDiskOperationStatus = + FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _); + Future<UpdateOperationStatusMessage> destroyOperationStatus = + FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _); + + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(raw))) + .WillOnce(FutureArg<1>(&offers)); + + driver.acceptOffers( + {offer.id()}, {DESTROY(persistentVolume), DESTROY_DISK(created)}); + + AWAIT_READY(destroyOperationStatus); + EXPECT_EQ(OPERATION_FINISHED, destroyOperationStatus->status().state()); + + AWAIT_READY(destroyDiskOperationStatus); + EXPECT_EQ(OPERATION_FINISHED, destroyDiskOperationStatus->status().state()); + + AWAIT_EXPECT_READY(offers) + << "Failed to wait for an offer containing resource '" << raw << "'"; +} + + +// This test verifies that if a persistent volumes is never published by the +// storage local resource provider, the volume can be destroyed after recovery. +// +// To accomplish this: +// 1. Create a MOUNT disk from a RAW disk resource. +// 2. Create a persistent volume on the MOUNT disk then launches a task to +// write a file into it. +// 3. Return `UNIMPLEMENTED` for the `NodePublishVolume` call. The task will +// fail to launch. +// 4. Restart the agent. +// 5. Destroy the persistent volume and the MOUNT disk. +TEST_P( + StorageLocalResourceProviderTest, + DestroyUnpublishedPersistentVolumeWithRecovery) +{ + const string profilesPath = path::join(sandbox.get(), "profiles.json"); + + ASSERT_SOME( + os::write(profilesPath, createDiskProfileMapping({{"test", None()}}))); + + loadUriDiskProfileAdaptorModule(profilesPath); + + const string mockCsiEndpoint = + "unix://" + path::join(sandbox.get(), "mock_csi.sock"); + + MockCSIPlugin plugin; + ASSERT_SOME(plugin.startup(mockCsiEndpoint)); + + setupResourceProviderConfig(Bytes(0), None(), None(), mockCsiEndpoint); + + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + slave::Flags slaveFlags = CreateSlaveFlags(); + slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME; + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + AWAIT_READY(slaveRegisteredMessage); + + // Register a framework to exercise operations. + FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO; + framework.set_roles(0, "storage"); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + // We use the following filter to filter offers that do not have wanted + // resources for 365 days (the maximum). + Filters declineFilters; + declineFilters.set_refuse_seconds(Days(365).secs()); + + // Decline unwanted offers. The master can send such offers before the + // resource provider receives profile updates. + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillRepeatedly(DeclineOffers(declineFilters)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( + std::bind(isStoragePool<Resource>, lambda::_1, "test")))) + .WillOnce(FutureArg<1>(&offers)); + + driver.start(); + + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); + + Offer offer = offers->at(0); + + // Create a MOUNT disk. + Resource raw = *Resources(offer.resources()) + .filter(std::bind(isStoragePool<Resource>, lambda::_1, "test")) + .begin(); + + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( + std::bind(isMountDisk<Resource>, lambda::_1, "test")))) + .WillOnce(FutureArg<1>(&offers)); + + driver.acceptOffers( + {offer.id()}, {CREATE_DISK(raw, Resource::DiskInfo::Source::MOUNT)}); + + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); + + offer = offers->at(0); + + Resource created = *Resources(offer.resources()) + .filter(std::bind(isMountDisk<Resource>, lambda::_1, "test")) + .begin(); + + // Create a persistent MOUNT volume then launch a task to write a file. + Resource persistentVolume = created; + persistentVolume.mutable_disk()->mutable_persistence() + ->set_id(id::UUID::random().toString()); + persistentVolume.mutable_disk()->mutable_persistence() + ->set_principal(framework.principal()); + persistentVolume.mutable_disk()->mutable_volume() + ->set_container_path("volume"); + persistentVolume.mutable_disk()->mutable_volume()->set_mode(Volume::RW); + + Future<Nothing> taskFailed; + EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_FAILED))) + .WillOnce(FutureSatisfy(&taskFailed)); + + EXPECT_CALL( + sched, resourceOffers(&driver, OffersHaveResource(persistentVolume))) + .WillOnce(FutureArg<1>(&offers)); + + // Fail resource publishing. + Future<Nothing> nodePublishVolumeCall = futureNodePublishVolumeCall( + &plugin, StatusError(grpc::Status(grpc::UNIMPLEMENTED, ""))); + + driver.acceptOffers( + {offer.id()}, + {CREATE(persistentVolume), + LAUNCH({createTask( + offer.slave_id(), + persistentVolume, + createCommandInfo("touch " + path::join("volume", "file")))})}); + + AWAIT_READY(nodePublishVolumeCall); + + AWAIT_READY(taskFailed); + + AWAIT_READY(offers); + + // Restart the agent. + EXPECT_CALL(sched, offerRescinded(_, _)); + + slave.get()->terminate(); + + // NOTE: The mock CSI plugin always returns 4GB for `GetCapacity` calls, hence + // the resource provider would report an extra storage pool, and when it shows + // up, we know that the resource provider has finished reconciling storage + // pools and thus operations won't be dropped. + // + // To achieve this, we drop `SlaveRegisteredMessage`s other than the first one + // to avoid unexpected `UpdateSlaveMessage`s. Then, we also drop the first two + // `UpdateSlaveMessage`s (one sent after agent reregistration and one after + // resource provider reregistration) and wait for the third one, which should + // contain the extra storage pool. We let it fall through to trigger an offer + // allocation for the persistent volume. + // + // Since the extra storage pool is never used, we reject the offers if only + // the storage pool is presented. + // + // TODO(chhsiao): Remove this workaround once MESOS-9553 is done. + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( + std::bind(isStoragePool<Resource>, lambda::_1, "test")))) + .WillRepeatedly(DeclineOffers(declineFilters)); + + // NOTE: The order of these expectations is reversed because Google Mock will + // search the expectations in reverse order. + Future<UpdateSlaveMessage> updateSlaveMessage = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + + DROP_PROTOBUF(UpdateSlaveMessage(), _, _); + DROP_PROTOBUF(UpdateSlaveMessage(), _, _); + DROP_PROTOBUFS(SlaveReregisteredMessage(), _, _); + FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); + + EXPECT_CALL( + sched, resourceOffers(&driver, OffersHaveResource(persistentVolume))) + .WillOnce(FutureArg<1>(&offers)); + + slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + AWAIT_READY(updateSlaveMessage); + ASSERT_EQ(1, updateSlaveMessage->resource_providers().providers_size()); + ASSERT_FALSE(Resources( + updateSlaveMessage->resource_providers().providers(0).total_resources()) + .filter(std::bind(isStoragePool<Resource>, lambda::_1, "test")) + .empty()); + + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); + + offer = offers->at(0); + + // Destroy the persistent volume and the MOUNT disk. + // + // NOTE: The order of the two `FUTURE_PROTOBUF`s is reversed because + // Google Mock will search the expectations in reverse order. + Future<UpdateOperationStatusMessage> destroyDiskOperationStatus = + FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _); + Future<UpdateOperationStatusMessage> destroyOperationStatus = + FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _); + + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(raw))) + .WillOnce(FutureArg<1>(&offers)); + + driver.acceptOffers( + {offer.id()}, {DESTROY(persistentVolume), DESTROY_DISK(created)}); + + AWAIT_READY(destroyOperationStatus); + EXPECT_EQ(OPERATION_FINISHED, destroyOperationStatus->status().state()); + + AWAIT_READY(destroyDiskOperationStatus); + EXPECT_EQ(OPERATION_FINISHED, destroyDiskOperationStatus->status().state()); + + AWAIT_EXPECT_READY(offers) + << "Failed to wait for an offer containing resource '" << raw << "'"; +} + + +// This test verifies that if a persistent volumes is never published by the +// storage local resource provider, the volume can be destroyed after agent +// reboot. +// +// To accomplish this: +// 1. Create a MOUNT disk from a RAW disk resource. +// 2. Create a persistent volume on the MOUNT disk then launches a task to +// write a file into it. +// 3. Return `UNIMPLEMENTED` for the `NodePublishVolume` call. The task will +// fail to launch. +// 4. Simulate an agent reboot. +// 5. Destroy the persistent volume and the MOUNT disk. +TEST_P( + StorageLocalResourceProviderTest, + DestroyUnpublishedPersistentVolumeWithReboot) +{ + const string profilesPath = path::join(sandbox.get(), "profiles.json"); + + ASSERT_SOME( + os::write(profilesPath, createDiskProfileMapping({{"test", None()}}))); + + loadUriDiskProfileAdaptorModule(profilesPath); + + const string mockCsiEndpoint = + "unix://" + path::join(sandbox.get(), "mock_csi.sock"); + + MockCSIPlugin plugin; + ASSERT_SOME(plugin.startup(mockCsiEndpoint)); + + setupResourceProviderConfig(Bytes(0), None(), None(), mockCsiEndpoint); + + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + slave::Flags slaveFlags = CreateSlaveFlags(); + slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME; + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + AWAIT_READY(slaveRegisteredMessage); + + // Register a framework to exercise operations. + FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO; + framework.set_roles(0, "storage"); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + // We use the following filter to filter offers that do not have wanted + // resources for 365 days (the maximum). + Filters declineFilters; + declineFilters.set_refuse_seconds(Days(365).secs()); + + // Decline unwanted offers. The master can send such offers before the + // resource provider receives profile updates. + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillRepeatedly(DeclineOffers(declineFilters)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( + std::bind(isStoragePool<Resource>, lambda::_1, "test")))) + .WillOnce(FutureArg<1>(&offers)); + + driver.start(); + + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); + + Offer offer = offers->at(0); + + // Create a MOUNT disk. + Resource raw = *Resources(offer.resources()) + .filter(std::bind(isStoragePool<Resource>, lambda::_1, "test")) + .begin(); + + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( + std::bind(isMountDisk<Resource>, lambda::_1, "test")))) + .WillOnce(FutureArg<1>(&offers)); + + driver.acceptOffers( + {offer.id()}, {CREATE_DISK(raw, Resource::DiskInfo::Source::MOUNT)}); + + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); + + offer = offers->at(0); + + Resource created = *Resources(offer.resources()) + .filter(std::bind(isMountDisk<Resource>, lambda::_1, "test")) + .begin(); + + // Create a persistent MOUNT volume then launch a task to write a file. + Resource persistentVolume = created; + persistentVolume.mutable_disk()->mutable_persistence() + ->set_id(id::UUID::random().toString()); + persistentVolume.mutable_disk()->mutable_persistence() + ->set_principal(framework.principal()); + persistentVolume.mutable_disk()->mutable_volume() + ->set_container_path("volume"); + persistentVolume.mutable_disk()->mutable_volume()->set_mode(Volume::RW); + + Future<Nothing> taskFailed; + EXPECT_CALL(sched, statusUpdate(&driver, TaskStatusStateEq(TASK_FAILED))) + .WillOnce(FutureSatisfy(&taskFailed)); + + EXPECT_CALL( + sched, resourceOffers(&driver, OffersHaveResource(persistentVolume))) + .WillOnce(FutureArg<1>(&offers)); + + // Fail resource publishing. + Future<Nothing> nodePublishVolumeCall = futureNodePublishVolumeCall( + &plugin, StatusError(grpc::Status(grpc::UNIMPLEMENTED, ""))); + + driver.acceptOffers( + {offer.id()}, + {CREATE(persistentVolume), + LAUNCH({createTask( + offer.slave_id(), + persistentVolume, + createCommandInfo("touch " + path::join("volume", "file")))})}); + + AWAIT_READY(nodePublishVolumeCall); + + AWAIT_READY(taskFailed); + + AWAIT_READY(offers); + + // Shutdown the agent and unmount all CSI volumes to simulate a reboot. + EXPECT_CALL(sched, offerRescinded(_, _)); + + slave->reset(); + + const string csiRootDir = slave::paths::getCsiRootDir(slaveFlags.work_dir); + ASSERT_SOME(fs::unmountAll(csiRootDir)); + + // Inject the boot IDs to simulate a reboot. + ASSERT_SOME(os::write( + slave::paths::getBootIdPath( + slave::paths::getMetaRootDir(slaveFlags.work_dir)), + "rebooted! ;)")); + + Try<list<string>> volumePaths = + csi::paths::getVolumePaths(csiRootDir, "*", "*"); + ASSERT_SOME(volumePaths); + ASSERT_FALSE(volumePaths->empty()); + + foreach (const string& path, volumePaths.get()) { + Try<csi::paths::VolumePath> volumePath = + csi::paths::parseVolumePath(csiRootDir, path); + ASSERT_SOME(volumePath); + + const string volumeStatePath = csi::paths::getVolumeStatePath( + csiRootDir, volumePath->type, volumePath->name, volumePath->volumeId); + + Result<csi::state::VolumeState> volumeState = + slave::state::read<csi::state::VolumeState>(volumeStatePath); + + ASSERT_SOME(volumeState); + + if (volumeState->state() == csi::state::VolumeState::PUBLISHED) { + volumeState->set_boot_id("rebooted! ;)"); + ASSERT_SOME(slave::state::checkpoint(volumeStatePath, volumeState.get())); + } + } + + // NOTE: The mock CSI plugin always returns 4GB for `GetCapacity` calls, hence + // the resource provider would report an extra storage pool, and when it shows + // up, we know that the resource provider has finished reconciling storage + // pools and thus operations won't be dropped. + // + // To achieve this, we drop `SlaveRegisteredMessage`s other than the first one + // to avoid unexpected `UpdateSlaveMessage`s. Then, we also drop the first two + // `UpdateSlaveMessage`s (one sent after agent reregistration and one after + // resource provider reregistration) and wait for the third one, which should + // contain the extra storage pool. We let it fall through to trigger an offer + // allocation for the persistent volume. + // + // Since the extra storage pool is never used, we reject the offers if only + // the storage pool is presented. + // + // TODO(chhsiao): Remove this workaround once MESOS-9553 is done. + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( + std::bind(isStoragePool<Resource>, lambda::_1, "test")))) + .WillRepeatedly(DeclineOffers(declineFilters)); + + // NOTE: The order of these expectations is reversed because Google Mock will + // search the expectations in reverse order. + Future<UpdateSlaveMessage> updateSlaveMessage = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + + DROP_PROTOBUF(UpdateSlaveMessage(), _, _); + DROP_PROTOBUF(UpdateSlaveMessage(), _, _); + DROP_PROTOBUFS(SlaveReregisteredMessage(), _, _); + FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); + + // Restart the agent. + EXPECT_CALL( + sched, resourceOffers(&driver, OffersHaveResource(persistentVolume))) + .WillOnce(FutureArg<1>(&offers)); + + slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + AWAIT_READY(updateSlaveMessage); + ASSERT_EQ(1, updateSlaveMessage->resource_providers().providers_size()); + ASSERT_FALSE(Resources( + updateSlaveMessage->resource_providers().providers(0).total_resources()) + .filter(std::bind(isStoragePool<Resource>, lambda::_1, "test")) + .empty()); + + AWAIT_READY(offers); + ASSERT_EQ(1u, offers->size()); + + offer = offers->at(0); + + // Destroy the persistent volume and the MOUNT disk. + // + // NOTE: The order of the two `FUTURE_PROTOBUF`s is reversed because + // Google Mock will search the expectations in reverse order. + Future<UpdateOperationStatusMessage> destroyDiskOperationStatus = + FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _); + Future<UpdateOperationStatusMessage> destroyOperationStatus = + FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _); + + EXPECT_CALL( + sched, resourceOffers(&driver, OffersHaveResource(raw))) + .WillOnce(FutureArg<1>(&offers)); + + driver.acceptOffers( + {offer.id()}, {DESTROY(persistentVolume), DESTROY_DISK(created)}); + + AWAIT_READY(destroyOperationStatus); + EXPECT_EQ(OPERATION_FINISHED, destroyOperationStatus->status().state()); + + AWAIT_READY(destroyDiskOperationStatus); + EXPECT_EQ(OPERATION_FINISHED, destroyDiskOperationStatus->status().state()); + + AWAIT_EXPECT_READY(offers) + << "Failed to wait for an offer containing resource '" << raw << "'"; +} + + // This test verifies that if the storage local resource provider fails to clean // up a persistent volume, the volume will not be destroyed. //