This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 5ae178611246b4a1ea928ff45f29cef4f9b55bf7 Author: Chun-Hung Hsiao <chhs...@mesosphere.io> AuthorDate: Tue Jan 22 20:18:06 2019 -0800 Preliminary SLRP refactoring for RPC retry. This patch refactors the `StorageLocalResourceProvider::call` function to obtain the latest service future through `getService` before making the actual RPC call. The subsequent patch would utilize this to support RPC retry across plugin restarts. Review: https://reviews.apache.org/r/69811 --- src/resource_provider/storage/provider.cpp | 821 ++++++++++++++--------------- 1 file changed, 396 insertions(+), 425 deletions(-) diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index 6f0c062..811b87e 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -390,12 +390,32 @@ private: void reconcileOperations( const Event::ReconcileOperations& reconcile); - template <csi::v0::RPC rpc> + // Wrapper functions to make CSI calls and update RPC metrics. + // + // The call is made asynchronously and thus no guarantee is provided on the + // order in which calls are sent. Callers need to either ensure to not have + // multiple conflicting calls in flight, or treat results idempotently. + // + // NOTE: We currently ensure this by 1) resource locking to forbid concurrent + // calls on the same volume, and 2) no profile update while there are ongoing + // `CREATE_DISK` or `DESTROY_DISK` operations. + // + // NOTE: Since this function uses `getService` to obtain the latest service + // future, which depends on probe results, it is disabled for making probe + // calls; `_call` should be used directly instead. + template < + csi::v0::RPC rpc, + typename std::enable_if<rpc != csi::v0::PROBE, int>::type = 0> Future<typename csi::v0::RPCTraits<rpc>::response_type> call( + const ContainerID& containerId, + const typename csi::v0::RPCTraits<rpc>::request_type& request); + + template <csi::v0::RPC rpc> + Future<typename csi::v0::RPCTraits<rpc>::response_type> _call( csi::v0::Client client, - typename csi::v0::RPCTraits<rpc>::request_type&& request); + const typename csi::v0::RPCTraits<rpc>::request_type& request); - Future<csi::v0::Client> connect(const string& endpoint); + Future<csi::v0::Client> waitService(const string& endpoint); Future<csi::v0::Client> getService(const ContainerID& containerId); Future<hashmap<ContainerID, Option<ContainerStatus>>> getContainers(); Future<Nothing> waitContainer(const ContainerID& containerId); @@ -1851,15 +1871,29 @@ void StorageLocalResourceProviderProcess::reconcileOperations( } -template <csi::v0::RPC rpc> +template < + csi::v0::RPC rpc, + typename std::enable_if<rpc != csi::v0::PROBE, int>::type> Future<typename csi::v0::RPCTraits<rpc>::response_type> StorageLocalResourceProviderProcess::call( + const ContainerID& containerId, + const typename csi::v0::RPCTraits<rpc>::request_type& request) +{ + // Get the latest service future before making the call. + return getService(containerId) + .then(defer(self(), &Self::_call<rpc>, lambda::_1, request)); +} + + +template <csi::v0::RPC rpc> +Future<typename csi::v0::RPCTraits<rpc>::response_type> +StorageLocalResourceProviderProcess::_call( csi::v0::Client client, - typename csi::v0::RPCTraits<rpc>::request_type&& request) + const typename csi::v0::RPCTraits<rpc>::request_type& request) { ++metrics.csi_plugin_rpcs_pending.at(rpc); - return client.call<rpc>(std::move(request)) + return client.call<rpc>(request) .onAny(defer(self(), [=]( const Future<typename csi::v0::RPCTraits<rpc>::response_type>& future) { --metrics.csi_plugin_rpcs_pending.at(rpc); @@ -1877,18 +1911,18 @@ StorageLocalResourceProviderProcess::call( // Returns a future of a CSI client that waits for the endpoint socket // to appear if necessary, then connects to the socket and check its // readiness. -Future<csi::v0::Client> StorageLocalResourceProviderProcess::connect( +Future<csi::v0::Client> StorageLocalResourceProviderProcess::waitService( const string& endpoint) { - Future<csi::v0::Client> future; + Future<csi::v0::Client> service; if (os::exists(endpoint)) { - future = csi::v0::Client("unix://" + endpoint, runtime); + service = csi::v0::Client("unix://" + endpoint, runtime); } else { // Wait for the endpoint socket to appear until the timeout expires. Timeout timeout = Timeout::in(CSI_ENDPOINT_CREATION_TIMEOUT); - future = loop( + service = loop( self(), [=]() -> Future<Nothing> { if (timeout.expired()) { @@ -1906,13 +1940,10 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::connect( }); } - return future + return service .then(defer(self(), [=](csi::v0::Client client) { - return call<csi::v0::PROBE>(client, csi::v0::ProbeRequest()) - .then(defer(self(), [=]( - const csi::v0::ProbeResponse& response) -> csi::v0::Client { - return client; - })); + return _call<csi::v0::PROBE>(client, csi::v0::ProbeRequest()) + .then([=]() -> csi::v0::Client { return client; }); })); } @@ -2022,7 +2053,7 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService( << " type '" << info.storage().plugin().type() << "' and " << " name '" << info.storage().plugin().name() << "'"; - CHECK(services.at(containerId)->associate(connect(endpointPath))); + CHECK(services.at(containerId)->associate(waitService(endpointPath))); return services.at(containerId)->future() .then([] { return Nothing(); }); })), @@ -2183,31 +2214,23 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareIdentityService() { CHECK_SOME(nodeContainerId); - return getService(nodeContainerId.get()) - .then(defer(self(), [=](csi::v0::Client client) { - // Get the plugin info. - return call<csi::v0::GET_PLUGIN_INFO>( - client, csi::v0::GetPluginInfoRequest()) - .then(defer(self(), [=]( - const csi::v0::GetPluginInfoResponse& response) { - pluginInfo = response; + // Get the plugin info. + return call<csi::v0::GET_PLUGIN_INFO>( + nodeContainerId.get(), csi::v0::GetPluginInfoRequest()) + .then(defer(self(), [=](const csi::v0::GetPluginInfoResponse& response) { + pluginInfo = response; - LOG(INFO) << "Node plugin loaded: " << stringify(pluginInfo.get()); + LOG(INFO) << "Node plugin loaded: " << stringify(pluginInfo.get()); - // Get the latest service future before proceeding to the next step. - return getService(nodeContainerId.get()); - })); - })) - .then(defer(self(), [=](csi::v0::Client client) { // Get the plugin capabilities. return call<csi::v0::GET_PLUGIN_CAPABILITIES>( - client, csi::v0::GetPluginCapabilitiesRequest()) - .then(defer(self(), [=]( - const csi::v0::GetPluginCapabilitiesResponse& response) { - pluginCapabilities = response.capabilities(); + nodeContainerId.get(), csi::v0::GetPluginCapabilitiesRequest()); + })) + .then(defer(self(), [=]( + const csi::v0::GetPluginCapabilitiesResponse& response) { + pluginCapabilities = response.capabilities(); - return Nothing(); - })); + return Nothing(); })); } @@ -2226,36 +2249,29 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService() stringify(CSIPluginContainerInfo::CONTROLLER_SERVICE) + " not found"); } - return getService(controllerContainerId.get()) - .then(defer(self(), [=](csi::v0::Client client) { - // Get the controller plugin info and check for consistency. - return call<csi::v0::GET_PLUGIN_INFO>( - client, csi::v0::GetPluginInfoRequest()) - .then(defer(self(), [=]( - const csi::v0::GetPluginInfoResponse& response) { - LOG(INFO) << "Controller plugin loaded: " << stringify(response); - - if (pluginInfo->name() != response.name() || - pluginInfo->vendor_version() != response.vendor_version()) { - LOG(WARNING) - << "Inconsistent controller and node plugin components. Please " - "check with the plugin vendor to ensure compatibility."; - } + // Get the controller plugin info and check for consistency. + return call<csi::v0::GET_PLUGIN_INFO>( + controllerContainerId.get(), csi::v0::GetPluginInfoRequest()) + .then(defer(self(), [=](const csi::v0::GetPluginInfoResponse& response) { + LOG(INFO) << "Controller plugin loaded: " << stringify(response); + + if (pluginInfo->name() != response.name() || + pluginInfo->vendor_version() != response.vendor_version()) { + LOG(WARNING) + << "Inconsistent controller and node plugin components. Please check " + "with the plugin vendor to ensure compatibility."; + } - // Get the latest service future before proceeding to the next step. - return getService(controllerContainerId.get()); - })); - })) - .then(defer(self(), [=](csi::v0::Client client) { // Get the controller capabilities. return call<csi::v0::CONTROLLER_GET_CAPABILITIES>( - client, csi::v0::ControllerGetCapabilitiesRequest()) - .then(defer(self(), [=]( - const csi::v0::ControllerGetCapabilitiesResponse& response) { - controllerCapabilities = response.capabilities(); + controllerContainerId.get(), + csi::v0::ControllerGetCapabilitiesRequest()); + })) + .then(defer(self(), [=]( + const csi::v0::ControllerGetCapabilitiesResponse& response) { + controllerCapabilities = response.capabilities(); - return Nothing(); - })); + return Nothing(); })); } @@ -2266,32 +2282,25 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService() { CHECK_SOME(nodeContainerId); - return getService(nodeContainerId.get()) - .then(defer(self(), [=](csi::v0::Client client) { - // Get the node capabilities. - return call<csi::v0::NODE_GET_CAPABILITIES>( - client, csi::v0::NodeGetCapabilitiesRequest()) - .then(defer(self(), [=]( - const csi::v0::NodeGetCapabilitiesResponse& response) - -> Future<csi::v0::Client> { - nodeCapabilities = response.capabilities(); - - // Get the latest service future before proceeding to the next step. - return getService(nodeContainerId.get()); - })) - .then(defer(self(), [=](csi::v0::Client client) -> Future<Nothing> { - if (!controllerCapabilities.publishUnpublishVolume) { - return Nothing(); - } + // Get the node capabilities. + return call<csi::v0::NODE_GET_CAPABILITIES>( + nodeContainerId.get(), csi::v0::NodeGetCapabilitiesRequest()) + .then(defer(self(), [=]( + const csi::v0::NodeGetCapabilitiesResponse& response) + -> Future<Nothing> { + nodeCapabilities = response.capabilities(); - // Get the node ID. - return call<csi::v0::NODE_GET_ID>(client, csi::v0::NodeGetIdRequest()) - .then(defer(self(), [=]( - const csi::v0::NodeGetIdResponse& response) { - nodeId = response.node_id(); + if (!controllerCapabilities.publishUnpublishVolume) { + return Nothing(); + } - return Nothing(); - })); + // Get the node ID. + return call<csi::v0::NODE_GET_ID>( + nodeContainerId.get(), csi::v0::NodeGetIdRequest()) + .then(defer(self(), [=](const csi::v0::NodeGetIdResponse& response) { + nodeId = response.node_id(); + + return Nothing(); })); })); } @@ -2299,6 +2308,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService() // Transitions the state of the specified volume from `CREATED` or // `CONTROLLER_PUBLISH` to `NODE_READY`. +// // NOTE: This can only be called after `prepareControllerService` and // `prepareNodeService`. Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish( @@ -2316,47 +2326,42 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish( return Nothing(); } - CHECK_SOME(controllerContainerId); - CHECK_SOME(nodeId); + if (volume.state.state() == VolumeState::CREATED) { + volume.state.set_state(VolumeState::CONTROLLER_PUBLISH); + checkpointVolumeState(volumeId); + } - return getService(controllerContainerId.get()) - .then(defer(self(), [this, volumeId]( - csi::v0::Client client) -> Future<Nothing> { - VolumeData& volume = volumes.at(volumeId); + CHECK_EQ(VolumeState::CONTROLLER_PUBLISH, volume.state.state()); - if (volume.state.state() == VolumeState::CREATED) { - volume.state.set_state(VolumeState::CONTROLLER_PUBLISH); - checkpointVolumeState(volumeId); - } + CHECK_SOME(nodeId); - CHECK_EQ(VolumeState::CONTROLLER_PUBLISH, volume.state.state()); + csi::v0::ControllerPublishVolumeRequest request; + request.set_volume_id(volumeId); + request.set_node_id(nodeId.get()); + *request.mutable_volume_capability() = volume.state.volume_capability(); + request.set_readonly(false); + *request.mutable_volume_attributes() = volume.state.volume_attributes(); - csi::v0::ControllerPublishVolumeRequest request; - request.set_volume_id(volumeId); - request.set_node_id(nodeId.get()); - request.mutable_volume_capability() - ->CopyFrom(volume.state.volume_capability()); - request.set_readonly(false); - *request.mutable_volume_attributes() = volume.state.volume_attributes(); + CHECK_SOME(controllerContainerId); - return call<csi::v0::CONTROLLER_PUBLISH_VOLUME>( - client, std::move(request)) - .then(defer(self(), [this, volumeId]( - const csi::v0::ControllerPublishVolumeResponse& response) { - VolumeData& volume = volumes.at(volumeId); + return call<csi::v0::CONTROLLER_PUBLISH_VOLUME>( + controllerContainerId.get(), std::move(request)) + .then(defer(self(), [this, volumeId]( + const csi::v0::ControllerPublishVolumeResponse& response) { + VolumeData& volume = volumes.at(volumeId); - volume.state.set_state(VolumeState::NODE_READY); - *volume.state.mutable_publish_info() = response.publish_info(); - checkpointVolumeState(volumeId); + volume.state.set_state(VolumeState::NODE_READY); + *volume.state.mutable_publish_info() = response.publish_info(); + checkpointVolumeState(volumeId); - return Nothing(); - })); + return Nothing(); })); } // Transitions the state of the specified volume from `NODE_READY`, // `CONTROLLER_PUBLISH` or `CONTROLLER_UNPUBLISH` to `CREATED`. +// // NOTE: This can only be called after `prepareControllerService` and // `prepareNodeService`. Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish( @@ -2374,45 +2379,42 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish( return Nothing(); } - CHECK_SOME(controllerContainerId); - CHECK_SOME(nodeId); + // A previously failed `ControllerPublishVolume` call can be recovered through + // the current `ControllerUnpublishVolume` call. See: + // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#controllerpublishvolume // NOLINT + if (volume.state.state() == VolumeState::NODE_READY || + volume.state.state() == VolumeState::CONTROLLER_PUBLISH) { + volume.state.set_state(VolumeState::CONTROLLER_UNPUBLISH); + checkpointVolumeState(volumeId); + } - return getService(controllerContainerId.get()) - .then(defer(self(), [this, volumeId](csi::v0::Client client) { - VolumeData& volume = volumes.at(volumeId); + CHECK_EQ(VolumeState::CONTROLLER_UNPUBLISH, volume.state.state()); - // A previously failed `ControllerPublishVolume` call can be recovered - // through the current `ControllerUnpublishVolume` call. See: - // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#controllerpublishvolume // NOLINT - if (volume.state.state() == VolumeState::NODE_READY || - volume.state.state() == VolumeState::CONTROLLER_PUBLISH) { - volume.state.set_state(VolumeState::CONTROLLER_UNPUBLISH); - checkpointVolumeState(volumeId); - } + CHECK_SOME(nodeId); - CHECK_EQ(VolumeState::CONTROLLER_UNPUBLISH, volume.state.state()); + csi::v0::ControllerUnpublishVolumeRequest request; + request.set_volume_id(volumeId); + request.set_node_id(nodeId.get()); - csi::v0::ControllerUnpublishVolumeRequest request; - request.set_volume_id(volumeId); - request.set_node_id(nodeId.get()); + CHECK_SOME(controllerContainerId); - return call<csi::v0::CONTROLLER_UNPUBLISH_VOLUME>( - client, std::move(request)) - .then(defer(self(), [this, volumeId] { - VolumeData& volume = volumes.at(volumeId); + return call<csi::v0::CONTROLLER_UNPUBLISH_VOLUME>( + controllerContainerId.get(), std::move(request)) + .then(defer(self(), [this, volumeId] { + VolumeData& volume = volumes.at(volumeId); - volume.state.set_state(VolumeState::CREATED); - volume.state.mutable_publish_info()->clear(); - checkpointVolumeState(volumeId); + volume.state.set_state(VolumeState::CREATED); + volume.state.mutable_publish_info()->clear(); + checkpointVolumeState(volumeId); - return Nothing(); - })); + return Nothing(); })); } // Transitions the state of the specified volume from `NODE_READY` or // `NODE_STAGE` to `VOL_READY`. +// // NOTE: This can only be called after `prepareNodeService`. Future<Nothing> StorageLocalResourceProviderProcess::nodeStage( const string& volumeId) @@ -2430,58 +2432,53 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeStage( return Nothing(); } - CHECK_SOME(nodeContainerId); - - return getService(nodeContainerId.get()) - .then(defer(self(), [this, volumeId]( - csi::v0::Client client) -> Future<Nothing> { - VolumeData& volume = volumes.at(volumeId); + const string stagingPath = csi::paths::getMountStagingPath( + csi::paths::getMountRootDir( + slave::paths::getCsiRootDir(workDir), + info.storage().plugin().type(), + info.storage().plugin().name()), + volumeId); - const string stagingPath = csi::paths::getMountStagingPath( - csi::paths::getMountRootDir( - slave::paths::getCsiRootDir(workDir), - info.storage().plugin().type(), - info.storage().plugin().name()), - volumeId); + Try<Nothing> mkdir = os::mkdir(stagingPath); + if (mkdir.isError()) { + return Failure( + "Failed to create mount staging path '" + stagingPath + + "': " + mkdir.error()); + } - Try<Nothing> mkdir = os::mkdir(stagingPath); - if (mkdir.isError()) { - return Failure( - "Failed to create mount staging path '" + stagingPath + "': " + - mkdir.error()); - } + if (volume.state.state() == VolumeState::NODE_READY) { + volume.state.set_state(VolumeState::NODE_STAGE); + checkpointVolumeState(volumeId); + } - if (volume.state.state() == VolumeState::NODE_READY) { - volume.state.set_state(VolumeState::NODE_STAGE); - checkpointVolumeState(volumeId); - } + CHECK_EQ(VolumeState::NODE_STAGE, volume.state.state()); - CHECK_EQ(VolumeState::NODE_STAGE, volume.state.state()); + csi::v0::NodeStageVolumeRequest request; + request.set_volume_id(volumeId); + *request.mutable_publish_info() = volume.state.publish_info(); + request.set_staging_target_path(stagingPath); + *request.mutable_volume_capability() = volume.state.volume_capability(); + *request.mutable_volume_attributes() = volume.state.volume_attributes(); - csi::v0::NodeStageVolumeRequest request; - request.set_volume_id(volumeId); - *request.mutable_publish_info() = volume.state.publish_info(); - request.set_staging_target_path(stagingPath); - request.mutable_volume_capability() - ->CopyFrom(volume.state.volume_capability()); - *request.mutable_volume_attributes() = volume.state.volume_attributes(); + CHECK_SOME(nodeContainerId); - return call<csi::v0::NODE_STAGE_VOLUME>(client, std::move(request)) - .then(defer(self(), [this, volumeId] { - VolumeData& volume = volumes.at(volumeId); + return call<csi::v0::NODE_STAGE_VOLUME>( + nodeContainerId.get(), std::move(request)) + .then(defer(self(), [this, volumeId] { + VolumeData& volume = volumes.at(volumeId); - volume.state.set_state(VolumeState::VOL_READY); - volume.state.set_boot_id(bootId); - checkpointVolumeState(volumeId); + volume.state.set_state(VolumeState::VOL_READY); + volume.state.set_boot_id(bootId); + checkpointVolumeState(volumeId); - return Nothing(); - })); + return Nothing(); })); } // Transitions the state of the specified volume from `VOL_READY`, `NODE_STAGE` // or `NODE_UNSTAGE` to `NODE_READY`. +// // NOTE: This can only be called after `prepareNodeService`. Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage( const string& volumeId) @@ -2499,173 +2496,165 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage( return Nothing(); } - CHECK_SOME(nodeContainerId); + const string stagingPath = csi::paths::getMountStagingPath( + csi::paths::getMountRootDir( + slave::paths::getCsiRootDir(workDir), + info.storage().plugin().type(), + info.storage().plugin().name()), + volumeId); - return getService(nodeContainerId.get()) - .then(defer(self(), [this, volumeId](csi::v0::Client client) { - VolumeData& volume = volumes.at(volumeId); + CHECK(os::exists(stagingPath)); - const string stagingPath = csi::paths::getMountStagingPath( - csi::paths::getMountRootDir( - slave::paths::getCsiRootDir(workDir), - info.storage().plugin().type(), - info.storage().plugin().name()), - volumeId); - - CHECK(os::exists(stagingPath)); - - // A previously failed `NodeStageVolume` call can be recovered through the - // current `NodeUnstageVolume` call. See: - // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodestagevolume // NOLINT - if (volume.state.state() == VolumeState::VOL_READY || - volume.state.state() == VolumeState::NODE_STAGE) { - volume.state.set_state(VolumeState::NODE_UNSTAGE); - checkpointVolumeState(volumeId); - } + // A previously failed `NodeStageVolume` call can be recovered through the + // current `NodeUnstageVolume` call. See: + // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodestagevolume // NOLINT + if (volume.state.state() == VolumeState::VOL_READY || + volume.state.state() == VolumeState::NODE_STAGE) { + volume.state.set_state(VolumeState::NODE_UNSTAGE); + checkpointVolumeState(volumeId); + } - CHECK_EQ(VolumeState::NODE_UNSTAGE, volume.state.state()); + CHECK_EQ(VolumeState::NODE_UNSTAGE, volume.state.state()); - csi::v0::NodeUnstageVolumeRequest request; - request.set_volume_id(volumeId); - request.set_staging_target_path(stagingPath); + csi::v0::NodeUnstageVolumeRequest request; + request.set_volume_id(volumeId); + request.set_staging_target_path(stagingPath); - return call<csi::v0::NODE_UNSTAGE_VOLUME>(client, std::move(request)) - .then(defer(self(), [this, volumeId] { - VolumeData& volume = volumes.at(volumeId); + CHECK_SOME(nodeContainerId); - volume.state.set_state(VolumeState::NODE_READY); - volume.state.clear_boot_id(); - checkpointVolumeState(volumeId); + return call<csi::v0::NODE_UNSTAGE_VOLUME>( + nodeContainerId.get(), std::move(request)) + .then(defer(self(), [this, volumeId] { + VolumeData& volume = volumes.at(volumeId); - return Nothing(); - })); + volume.state.set_state(VolumeState::NODE_READY); + volume.state.clear_boot_id(); + checkpointVolumeState(volumeId); + + return Nothing(); })); } // Transitions the state of the specified volume from `VOL_READY` or // `NODE_PUBLISH` to `PUBLISHED`. +// // NOTE: This can only be called after `prepareNodeService`. Future<Nothing> StorageLocalResourceProviderProcess::nodePublish( const string& volumeId) { CHECK(volumes.contains(volumeId)); - CHECK_SOME(nodeContainerId); + VolumeData& volume = volumes.at(volumeId); - return getService(nodeContainerId.get()) - .then(defer(self(), [this, volumeId]( - csi::v0::Client client) -> Future<Nothing> { - VolumeData& volume = volumes.at(volumeId); + const string targetPath = csi::paths::getMountTargetPath( + csi::paths::getMountRootDir( + slave::paths::getCsiRootDir(workDir), + info.storage().plugin().type(), + info.storage().plugin().name()), + volumeId); - const string targetPath = csi::paths::getMountTargetPath( - csi::paths::getMountRootDir( - slave::paths::getCsiRootDir(workDir), - info.storage().plugin().type(), - info.storage().plugin().name()), - volumeId); + Try<Nothing> mkdir = os::mkdir(targetPath); + if (mkdir.isError()) { + return Failure( + "Failed to create mount target path '" + targetPath + + "': " + mkdir.error()); + } - Try<Nothing> mkdir = os::mkdir(targetPath); - if (mkdir.isError()) { - return Failure( - "Failed to create mount target path '" + targetPath + "': " + - mkdir.error()); - } + if (volume.state.state() == VolumeState::VOL_READY) { + volume.state.set_state(VolumeState::NODE_PUBLISH); + checkpointVolumeState(volumeId); + } - if (volume.state.state() == VolumeState::VOL_READY) { - volume.state.set_state(VolumeState::NODE_PUBLISH); - checkpointVolumeState(volumeId); - } + CHECK_EQ(VolumeState::NODE_PUBLISH, volume.state.state()); - CHECK_EQ(VolumeState::NODE_PUBLISH, volume.state.state()); + csi::v0::NodePublishVolumeRequest request; + request.set_volume_id(volumeId); + *request.mutable_publish_info() = volume.state.publish_info(); + request.set_target_path(targetPath); + *request.mutable_volume_capability() = volume.state.volume_capability(); + request.set_readonly(false); + *request.mutable_volume_attributes() = volume.state.volume_attributes(); - csi::v0::NodePublishVolumeRequest request; - request.set_volume_id(volumeId); - *request.mutable_publish_info() = volume.state.publish_info(); - request.set_target_path(targetPath); - request.mutable_volume_capability() - ->CopyFrom(volume.state.volume_capability()); - request.set_readonly(false); - *request.mutable_volume_attributes() = volume.state.volume_attributes(); + if (nodeCapabilities.stageUnstageVolume) { + const string stagingPath = csi::paths::getMountStagingPath( + csi::paths::getMountRootDir( + slave::paths::getCsiRootDir(workDir), + info.storage().plugin().type(), + info.storage().plugin().name()), + volumeId); - if (nodeCapabilities.stageUnstageVolume) { - const string stagingPath = csi::paths::getMountStagingPath( - csi::paths::getMountRootDir( - slave::paths::getCsiRootDir(workDir), - info.storage().plugin().type(), - info.storage().plugin().name()), - volumeId); + CHECK(os::exists(stagingPath)); - CHECK(os::exists(stagingPath)); + request.set_staging_target_path(stagingPath); + } - request.set_staging_target_path(stagingPath); - } + CHECK_SOME(nodeContainerId); - return call<csi::v0::NODE_PUBLISH_VOLUME>(client, std::move(request)) - .then(defer(self(), [this, volumeId] { - VolumeData& volume = volumes.at(volumeId); + return call<csi::v0::NODE_PUBLISH_VOLUME>( + nodeContainerId.get(), std::move(request)) + .then(defer(self(), [this, volumeId] { + VolumeData& volume = volumes.at(volumeId); - volume.state.set_state(VolumeState::PUBLISHED); - checkpointVolumeState(volumeId); + volume.state.set_state(VolumeState::PUBLISHED); + checkpointVolumeState(volumeId); - return Nothing(); - })); + return Nothing(); })); } // Transitions the state of the specified volume from `PUBLISHED`, // `NODE_PUBLISH` or `NODE_UNPUBLISH` to `VOL_READY`. +// // NOTE: This can only be called after `prepareNodeService`. Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish( const string& volumeId) { CHECK(volumes.contains(volumeId)); - CHECK_SOME(nodeContainerId); + VolumeData& volume = volumes.at(volumeId); - return getService(nodeContainerId.get()) - .then(defer(self(), [this, volumeId](csi::v0::Client client) { - VolumeData& volume = volumes.at(volumeId); + const string targetPath = csi::paths::getMountTargetPath( + csi::paths::getMountRootDir( + slave::paths::getCsiRootDir(workDir), + info.storage().plugin().type(), + info.storage().plugin().name()), + volumeId); - const string targetPath = csi::paths::getMountTargetPath( - csi::paths::getMountRootDir( - slave::paths::getCsiRootDir(workDir), - info.storage().plugin().type(), - info.storage().plugin().name()), - volumeId); - - CHECK(os::exists(targetPath)); - - // A previously failed `NodePublishVolume` call can be recovered through - // the current `NodeUnpublishVolume` call. See: - // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodepublishvolume // NOLINT - if (volume.state.state() == VolumeState::PUBLISHED || - volume.state.state() == VolumeState::NODE_PUBLISH) { - volume.state.set_state(VolumeState::NODE_UNPUBLISH); - checkpointVolumeState(volumeId); - } + CHECK(os::exists(targetPath)); - CHECK_EQ(VolumeState::NODE_UNPUBLISH, volume.state.state()); + // A previously failed `NodePublishVolume` call can be recovered through the + // current `NodeUnpublishVolume` call. See: + // https://github.com/container-storage-interface/spec/blob/v0.2.0/spec.md#nodepublishvolume // NOLINT + if (volume.state.state() == VolumeState::PUBLISHED || + volume.state.state() == VolumeState::NODE_PUBLISH) { + volume.state.set_state(VolumeState::NODE_UNPUBLISH); + checkpointVolumeState(volumeId); + } - csi::v0::NodeUnpublishVolumeRequest request; - request.set_volume_id(volumeId); - request.set_target_path(targetPath); + CHECK_EQ(VolumeState::NODE_UNPUBLISH, volume.state.state()); - return call<csi::v0::NODE_UNPUBLISH_VOLUME>(client, std::move(request)) - .then(defer(self(), [this, volumeId, targetPath]() -> Future<Nothing> { - VolumeData& volume = volumes.at(volumeId); + csi::v0::NodeUnpublishVolumeRequest request; + request.set_volume_id(volumeId); + request.set_target_path(targetPath); - volume.state.set_state(VolumeState::VOL_READY); - checkpointVolumeState(volumeId); + CHECK_SOME(nodeContainerId); - Try<Nothing> rmdir = os::rmdir(targetPath); - if (rmdir.isError()) { - return Failure( - "Failed to remove mount point '" + targetPath + "': " + - rmdir.error()); - } + return call<csi::v0::NODE_UNPUBLISH_VOLUME>( + nodeContainerId.get(), std::move(request)) + .then(defer(self(), [this, volumeId, targetPath]() -> Future<Nothing> { + VolumeData& volume = volumes.at(volumeId); - return Nothing(); - })); + volume.state.set_state(VolumeState::VOL_READY); + checkpointVolumeState(volumeId); + + Try<Nothing> rmdir = os::rmdir(targetPath); + if (rmdir.isError()) { + return Failure( + "Failed to remove mount point '" + targetPath + "': " + + rmdir.error()); + } + + return Nothing(); })); } @@ -2683,43 +2672,37 @@ Future<string> StorageLocalResourceProviderProcess::createVolume( "Controller capability 'CREATE_DELETE_VOLUME' is not supported"); } + csi::v0::CreateVolumeRequest request; + request.set_name(name); + request.mutable_capacity_range()->set_required_bytes(capacity.bytes()); + request.mutable_capacity_range()->set_limit_bytes(capacity.bytes()); + *request.add_volume_capabilities() = profileInfo.capability; + *request.mutable_parameters() = profileInfo.parameters; + CHECK_SOME(controllerContainerId); - return getService(controllerContainerId.get()) - .then(defer(self(), [=](csi::v0::Client client) { - csi::v0::CreateVolumeRequest request; - request.set_name(name); - request.mutable_capacity_range() - ->set_required_bytes(capacity.bytes()); - request.mutable_capacity_range() - ->set_limit_bytes(capacity.bytes()); - request.add_volume_capabilities()->CopyFrom(profileInfo.capability); - *request.mutable_parameters() = profileInfo.parameters; - - return call<csi::v0::CREATE_VOLUME>(client, std::move(request)) - .then(defer(self(), [=]( - const csi::v0::CreateVolumeResponse& response) -> string { - const csi::v0::Volume& volume = response.volume(); - - if (volumes.contains(volume.id())) { - // The resource provider failed over after the last `createVolume` - // call, but before the operation status was checkpointed. - CHECK_EQ(VolumeState::CREATED, - volumes.at(volume.id()).state.state()); - } else { - VolumeState volumeState; - volumeState.set_state(VolumeState::CREATED); - volumeState.mutable_volume_capability() - ->CopyFrom(profileInfo.capability); - *volumeState.mutable_parameters() = profileInfo.parameters; - *volumeState.mutable_volume_attributes() = volume.attributes(); - - volumes.put(volume.id(), std::move(volumeState)); - checkpointVolumeState(volume.id()); - } + return call<csi::v0::CREATE_VOLUME>( + controllerContainerId.get(), std::move(request)) + .then(defer(self(), [=]( + const csi::v0::CreateVolumeResponse& response) -> string { + const csi::v0::Volume& volume = response.volume(); - return volume.id(); - })); + if (volumes.contains(volume.id())) { + // The resource provider failed over after the last `createVolume` call, + // but before the operation status was checkpointed. + CHECK_EQ(VolumeState::CREATED, volumes.at(volume.id()).state.state()); + } else { + VolumeState volumeState; + volumeState.set_state(VolumeState::CREATED); + *volumeState.mutable_volume_capability() = profileInfo.capability; + *volumeState.mutable_parameters() = profileInfo.parameters; + *volumeState.mutable_volume_attributes() = volume.attributes(); + + volumes.put(volume.id(), std::move(volumeState)); + checkpointVolumeState(volume.id()); + } + + return volume.id(); })); } @@ -2731,8 +2714,6 @@ Future<string> StorageLocalResourceProviderProcess::createVolume( Future<bool> StorageLocalResourceProviderProcess::deleteVolume( const string& volumeId) { - CHECK_SOME(controllerContainerId); - const string volumePath = csi::paths::getVolumePath( slave::paths::getCsiRootDir(workDir), info.storage().plugin().type(), @@ -2749,10 +2730,10 @@ Future<bool> StorageLocalResourceProviderProcess::deleteVolume( const VolumeData& volume = volumes.at(volumeId); - Future<Nothing> deleted = Nothing(); - CHECK(VolumeState::State_IsValid(volume.state.state())); + Future<Nothing> deleted = Nothing(); + switch (volume.state.state()) { case VolumeState::PUBLISHED: case VolumeState::NODE_PUBLISH: @@ -2786,12 +2767,14 @@ Future<bool> StorageLocalResourceProviderProcess::deleteVolume( // supported. Otherwise, we simply leave it as a preprovisioned volume. if (controllerCapabilities.createDeleteVolume) { deleted = deleted - .then(defer(self(), &Self::getService, controllerContainerId.get())) - .then(defer(self(), [this, volumeId](csi::v0::Client client) { + .then(defer(self(), [this, volumeId] { csi::v0::DeleteVolumeRequest request; request.set_volume_id(volumeId); - return call<csi::v0::DELETE_VOLUME>(client, std::move(request)) + CHECK_SOME(controllerContainerId); + + return call<csi::v0::DELETE_VOLUME>( + controllerContainerId.get(), std::move(request)) .then([] { return Nothing(); }); })); } @@ -2860,45 +2843,40 @@ Future<Nothing> StorageLocalResourceProviderProcess::validateVolume( "Plugin capability 'CONTROLLER_SERVICE' is not supported"); } - CHECK_SOME(controllerContainerId); + google::protobuf::Map<string, string> volumeAttributes; - return getService(controllerContainerId.get()) - .then(defer(self(), [=](csi::v0::Client client) { - google::protobuf::Map<string, string> volumeAttributes; + if (metadata.isSome()) { + volumeAttributes = CHECK_NOTERROR(convertLabelsToStringMap(metadata.get())); + } - if (metadata.isSome()) { - volumeAttributes = - CHECK_NOTERROR(convertLabelsToStringMap(metadata.get())); - } + csi::v0::ValidateVolumeCapabilitiesRequest request; + request.set_volume_id(volumeId); + *request.add_volume_capabilities() = profileInfo.capability; + *request.mutable_volume_attributes() = volumeAttributes; - csi::v0::ValidateVolumeCapabilitiesRequest request; - request.set_volume_id(volumeId); - request.add_volume_capabilities()->CopyFrom(profileInfo.capability); - *request.mutable_volume_attributes() = volumeAttributes; - - return call<csi::v0::VALIDATE_VOLUME_CAPABILITIES>( - client, std::move(request)) - .then(defer(self(), [=]( - const csi::v0::ValidateVolumeCapabilitiesResponse& response) - -> Future<Nothing> { - if (!response.supported()) { - return Failure( - "Unsupported volume capability for volume '" + volumeId + - "': " + response.message()); - } + CHECK_SOME(controllerContainerId); - VolumeState volumeState; - volumeState.set_state(VolumeState::CREATED); - volumeState.mutable_volume_capability() - ->CopyFrom(profileInfo.capability); - *volumeState.mutable_parameters() = profileInfo.parameters; - *volumeState.mutable_volume_attributes() = volumeAttributes; + return call<csi::v0::VALIDATE_VOLUME_CAPABILITIES>( + controllerContainerId.get(), std::move(request)) + .then(defer(self(), [=]( + const csi::v0::ValidateVolumeCapabilitiesResponse& response) + -> Future<Nothing> { + if (!response.supported()) { + return Failure( + "Unsupported volume capability for volume '" + volumeId + "': " + + response.message()); + } - volumes.put(volumeId, std::move(volumeState)); - checkpointVolumeState(volumeId); + VolumeState volumeState; + volumeState.set_state(VolumeState::CREATED); + *volumeState.mutable_volume_capability() = profileInfo.capability; + *volumeState.mutable_parameters() = profileInfo.parameters; + *volumeState.mutable_volume_attributes() = volumeAttributes; - return Nothing(); - })); + volumes.put(volumeId, std::move(volumeState)); + checkpointVolumeState(volumeId); + + return Nothing(); })); } @@ -2916,41 +2894,39 @@ Future<Resources> StorageLocalResourceProviderProcess::listVolumes() CHECK_SOME(controllerContainerId); - return getService(controllerContainerId.get()) - .then(defer(self(), [=](csi::v0::Client client) { - // TODO(chhsiao): Set the max entries and use a loop to do - // multiple `ListVolumes` calls. - return call<csi::v0::LIST_VOLUMES>(client, csi::v0::ListVolumesRequest()) - .then(defer(self(), [=](const csi::v0::ListVolumesResponse& response) { - Resources resources; - - // Recover disk profiles from the checkpointed state. - hashmap<string, string> volumesToProfiles; - foreach (const Resource& resource, totalResources) { - if (resource.disk().source().has_id() && - resource.disk().source().has_profile()) { - volumesToProfiles.put( - resource.disk().source().id(), - resource.disk().source().profile()); - } - } + // TODO(chhsiao): Set the max entries and use a loop to do + // multiple `ListVolumes` calls. + return call<csi::v0::LIST_VOLUMES>( + controllerContainerId.get(), csi::v0::ListVolumesRequest()) + .then(defer(self(), [=](const csi::v0::ListVolumesResponse& response) { + Resources resources; - foreach (const auto& entry, response.entries()) { - resources += createRawDiskResource( - info, - Bytes(entry.volume().capacity_bytes()), - volumesToProfiles.contains(entry.volume().id()) - ? volumesToProfiles.at(entry.volume().id()) - : Option<string>::none(), - vendor, - entry.volume().id(), - entry.volume().attributes().empty() - ? Option<Labels>::none() - : convertStringMapToLabels(entry.volume().attributes())); - } + // Recover disk profiles from the checkpointed state. + hashmap<string, string> volumesToProfiles; + foreach (const Resource& resource, totalResources) { + if (resource.disk().source().has_id() && + resource.disk().source().has_profile()) { + volumesToProfiles.put( + resource.disk().source().id(), + resource.disk().source().profile()); + } + } - return resources; - })); + foreach (const auto& entry, response.entries()) { + resources += createRawDiskResource( + info, + Bytes(entry.volume().capacity_bytes()), + volumesToProfiles.contains(entry.volume().id()) + ? volumesToProfiles.at(entry.volume().id()) + : Option<string>::none(), + vendor, + entry.volume().id(), + entry.volume().attributes().empty() + ? Option<Labels>::none() + : convertStringMapToLabels(entry.volume().attributes())); + } + + return resources; })); } @@ -2968,19 +2944,18 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities() CHECK_SOME(controllerContainerId); - return getService(controllerContainerId.get()) - .then(defer(self(), [=](csi::v0::Client client) { - vector<Future<Resources>> futures; + vector<Future<Resources>> futures; - foreachpair (const string& profile, - const DiskProfileAdaptor::ProfileInfo& profileInfo, - profileInfos) { - csi::v0::GetCapacityRequest request; - request.add_volume_capabilities()->CopyFrom(profileInfo.capability); - *request.mutable_parameters() = profileInfo.parameters; + foreachpair (const string& profile, + const DiskProfileAdaptor::ProfileInfo& profileInfo, + profileInfos) { + csi::v0::GetCapacityRequest request; + *request.add_volume_capabilities() = profileInfo.capability; + *request.mutable_parameters() = profileInfo.parameters; - futures.push_back(call<csi::v0::GET_CAPACITY>( - client, std::move(request)) + futures.push_back( + call<csi::v0::GET_CAPACITY>( + controllerContainerId.get(), std::move(request)) .then(defer(self(), [=]( const csi::v0::GetCapacityResponse& response) -> Resources { if (response.available_capacity() == 0) { @@ -2988,18 +2963,14 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities() } return createRawDiskResource( - info, - Bytes(response.available_capacity()), - profile, - vendor); + info, Bytes(response.available_capacity()), profile, vendor); }))); - } + } - return collect(futures) - .then([](const vector<Resources>& resources) { - return accumulate(resources.begin(), resources.end(), Resources()); - }); - })); + return collect(futures) + .then([](const vector<Resources>& resources) { + return accumulate(resources.begin(), resources.end(), Resources()); + }); }