Adapted storage local resource provider to use CSI v0.2. This patch contains necessary changes for the storage local resource provider to use CSI v0.2. Support for the `STAGE_UNSTAGE_VOLUME` CSI node service capability is not implemented in this patch yet.
Review: https://reviews.apache.org/r/66410/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/aeffcd7d Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/aeffcd7d Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/aeffcd7d Branch: refs/heads/master Commit: aeffcd7d9a1f9c97e5e347063ceab71c43c00e2d Parents: 6dfd259 Author: Chun-Hung Hsiao <chhs...@apache.org> Authored: Thu Apr 12 12:07:19 2018 -0700 Committer: Chun-Hung Hsiao <chhs...@mesosphere.io> Committed: Thu Apr 12 14:01:52 2018 -0700 ---------------------------------------------------------------------- src/resource_provider/storage/provider.cpp | 512 ++++++++++++------------ 1 file changed, 257 insertions(+), 255 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/aeffcd7d/src/resource_provider/storage/provider.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index a07620d..40544e0 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -366,10 +366,11 @@ private: void reconcileOperations( const Event::ReconcileOperations& reconcile); - Future<csi::Client> connect(const string& endpoint); - Future<csi::Client> getService(const ContainerID& containerId); + Future<csi::v0::Client> connect(const string& endpoint); + Future<csi::v0::Client> getService(const ContainerID& containerId); Future<Nothing> killService(const ContainerID& containerId); + Future<Nothing> prepareIdentityService(); Future<Nothing> prepareControllerService(); Future<Nothing> prepareNodeService(); Future<Nothing> controllerPublish(const string& volumeId); @@ -384,7 +385,7 @@ private: Future<string> validateCapability( const string& volumeId, const Option<Labels>& metadata, - const csi::VolumeCapability& capability); + const csi::v0::VolumeCapability& capability); Future<Resources> listVolumes(); Future<Resources> getCapacities(); @@ -436,9 +437,8 @@ private: shared_ptr<DiskProfileAdaptor> diskProfileAdaptor; - csi::Version csiVersion; - csi::VolumeCapability defaultMountCapability; - csi::VolumeCapability defaultBlockCapability; + csi::v0::VolumeCapability defaultMountCapability; + csi::v0::VolumeCapability defaultBlockCapability; string bootId; process::grpc::client::Runtime runtime; Owned<v1::resource_provider::Driver> driver; @@ -453,14 +453,15 @@ private: // True if a reconciliation of storage pools is happening. bool reconciling; - ContainerID controllerContainerId; - ContainerID nodeContainerId; hashmap<ContainerID, Owned<ContainerDaemon>> daemons; - hashmap<ContainerID, Owned<Promise<csi::Client>>> services; - - Option<csi::GetPluginInfoResponse> controllerInfo; - Option<csi::GetPluginInfoResponse> nodeInfo; - Option<csi::ControllerCapabilities> controllerCapabilities; + hashmap<ContainerID, Owned<Promise<csi::v0::Client>>> services; + + Option<ContainerID> nodeContainerId; + Option<ContainerID> controllerContainerId; + Option<csi::v0::GetPluginInfoResponse> pluginInfo; + csi::v0::PluginCapabilities pluginCapabilities; + csi::v0::ControllerCapabilities controllerCapabilities; + csi::v0::NodeCapabilities nodeCapabilities; Option<string> nodeId; // We maintain the following invariant: if one operation depends on @@ -554,18 +555,13 @@ void StorageLocalResourceProviderProcess::received(const Event& event) void StorageLocalResourceProviderProcess::initialize() { - // Set CSI version to 0.1.0. - csiVersion.set_major(0); - csiVersion.set_minor(1); - csiVersion.set_patch(0); - // Default mount and block capabilities for pre-existing volumes. defaultMountCapability.mutable_mount(); defaultMountCapability.mutable_access_mode() - ->set_mode(csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER); + ->set_mode(csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER); defaultBlockCapability.mutable_block(); defaultBlockCapability.mutable_access_mode() - ->set_mode(csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER); + ->set_mode(csi::v0::VolumeCapability::AccessMode::SINGLE_NODE_WRITER); Try<string> _bootId = os::bootId(); if (_bootId.isError()) { @@ -577,24 +573,24 @@ void StorageLocalResourceProviderProcess::initialize() foreach (const CSIPluginContainerInfo& container, info.storage().plugin().containers()) { - auto it = find( - container.services().begin(), - container.services().end(), - CSIPluginContainerInfo::CONTROLLER_SERVICE); - if (it != container.services().end()) { - controllerContainerId = getContainerId(info, container); + if (container.services().end() != find( + container.services().begin(), + container.services().end(), + CSIPluginContainerInfo::NODE_SERVICE)) { + nodeContainerId = getContainerId(info, container); break; } } + CHECK_SOME(nodeContainerId); + foreach (const CSIPluginContainerInfo& container, info.storage().plugin().containers()) { - auto it = find( - container.services().begin(), - container.services().end(), - CSIPluginContainerInfo::NODE_SERVICE); - if (it != container.services().end()) { - nodeContainerId = getContainerId(info, container); + if (container.services().end() != find( + container.services().begin(), + container.services().end(), + CSIPluginContainerInfo::CONTROLLER_SERVICE)) { + controllerContainerId = getContainerId(info, container); break; } } @@ -720,10 +716,12 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverServices() const ContainerID& containerId = containerPath->containerId; + CHECK_SOME(nodeContainerId); + // Do not kill the up-to-date controller or node container. // Otherwise, kill them and perform cleanups. - if (containerId == controllerContainerId || - containerId == nodeContainerId) { + if (nodeContainerId == containerId || + controllerContainerId == containerId) { const string configPath = csi::paths::getContainerInfoPath( slave::paths::getCsiRootDir(workDir), info.storage().plugin().type(), @@ -776,11 +774,14 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverServices() }))); } - // NOTE: The `GetNodeID` CSI call is only supported if the plugin has - // the `PUBLISH_UNPUBLISH_VOLUME` controller capability. So to decide - // if `GetNodeID` should be called in `prepareNodeService`, we need to - // run `prepareControllerService` first. + // NOTE: The `Controller` service is supported if the plugin has the + // `CONTROLLER_SERVICE` capability, and the `NodeGetId` call is + // supported if the `Controller` service has the + // `PUBLISH_UNPUBLISH_VOLUME` capability. Therefore, we first launch + // the node plugin to get the plugin capabilities, then decide if we + // need to launch the controller plugin and get the node ID. return collect(futures) + .then(defer(self(), &Self::prepareIdentityService)) .then(defer(self(), &Self::prepareControllerService)) .then(defer(self(), &Self::prepareNodeService)); } @@ -1672,19 +1673,19 @@ void StorageLocalResourceProviderProcess::reconcileOperations( // Returns a future of a CSI client that waits for the endpoint socket // to appear if necessary, then connects to the socket and check its -// supported version. -Future<csi::Client> StorageLocalResourceProviderProcess::connect( +// readiness. +Future<csi::v0::Client> StorageLocalResourceProviderProcess::connect( const string& endpoint) { - Future<csi::Client> client; + Future<csi::v0::Client> future; if (os::exists(endpoint)) { - client = csi::Client("unix://" + endpoint, runtime); + future = csi::v0::Client("unix://" + endpoint, runtime); } else { // Wait for the endpoint socket to appear until the timeout expires. Timeout timeout = Timeout::in(CSI_ENDPOINT_CREATION_TIMEOUT); - client = loop( + future = loop( self(), [=]() -> Future<Nothing> { if (timeout.expired()) { @@ -1693,30 +1694,19 @@ Future<csi::Client> StorageLocalResourceProviderProcess::connect( return after(Milliseconds(10)); }, - [=](const Nothing&) -> ControlFlow<csi::Client> { + [=](const Nothing&) -> ControlFlow<csi::v0::Client> { if (os::exists(endpoint)) { - return Break(csi::Client("unix://" + endpoint, runtime)); + return Break(csi::v0::Client("unix://" + endpoint, runtime)); } return Continue(); }); } - return client - .then(defer(self(), [=](csi::Client client) { - return client.GetSupportedVersions(csi::GetSupportedVersionsRequest()) - .then(defer(self(), [=]( - const csi::GetSupportedVersionsResponse& response) - -> Future<csi::Client> { - auto it = find( - response.supported_versions().begin(), - response.supported_versions().end(), - csiVersion); - if (it == response.supported_versions().end()) { - return Failure( - "CSI version " + stringify(csiVersion) + " is not supported"); - } - + return future + .then(defer(self(), [=](csi::v0::Client client) { + return client.Probe(csi::v0::ProbeRequest()) + .then(defer(self(), [=](const csi::v0::ProbeResponse& response) { return client; })); })); @@ -1726,7 +1716,7 @@ Future<csi::Client> StorageLocalResourceProviderProcess::connect( // Returns a future of the latest CSI client for the specified plugin // container. If the container is not already running, this method will // start a new a new container daemon. -Future<csi::Client> StorageLocalResourceProviderProcess::getService( +Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService( const ContainerID& containerId) { if (daemons.contains(containerId)) { @@ -1802,7 +1792,7 @@ Future<csi::Client> StorageLocalResourceProviderProcess::getService( ->mutable_mount_propagation()->set_mode(MountPropagation::BIDIRECTIONAL); CHECK(!services.contains(containerId)); - services[containerId].reset(new Promise<csi::Client>()); + services[containerId].reset(new Promise<csi::v0::Client>()); Try<Owned<ContainerDaemon>> daemon = ContainerDaemon::create( extractParentEndpoint(url), @@ -1815,7 +1805,7 @@ Future<csi::Client> StorageLocalResourceProviderProcess::getService( CHECK(services.at(containerId)->future().isPending()); return connect(endpointPath) - .then(defer(self(), [=](const csi::Client& client) { + .then(defer(self(), [=](const csi::v0::Client& client) { services.at(containerId)->set(client); return Nothing(); })) @@ -1827,16 +1817,16 @@ Future<csi::Client> StorageLocalResourceProviderProcess::getService( })); })), std::function<Future<Nothing>()>(defer(self(), [=]() -> Future<Nothing> { - if (containerId == controllerContainerId) { + if (containerId == controllerContainerId.get()) { metrics.csi_controller_plugin_terminations++; } - if (containerId == nodeContainerId) { + if (containerId == nodeContainerId.get()) { metrics.csi_node_plugin_terminations++; } services.at(containerId)->discard(); - services.at(containerId).reset(new Promise<csi::Client>()); + services.at(containerId).reset(new Promise<csi::v0::Client>()); if (os::exists(endpointPath)) { Try<Nothing> rm = os::rm(endpointPath); @@ -1940,54 +1930,31 @@ Future<Nothing> StorageLocalResourceProviderProcess::killService( } -Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService() +Future<Nothing> StorageLocalResourceProviderProcess::prepareIdentityService() { - return getService(controllerContainerId) - .then(defer(self(), [=](csi::Client client) { - // Get the plugin info and check for consistency. - csi::GetPluginInfoRequest request; - request.mutable_version()->CopyFrom(csiVersion); - - return client.GetPluginInfo(request) - .then(defer(self(), [=](const csi::GetPluginInfoResponse& response) { - controllerInfo = response; + CHECK_SOME(nodeContainerId); - LOG(INFO) - << "Controller plugin loaded: " << stringify(controllerInfo.get()); + return getService(nodeContainerId.get()) + .then(defer(self(), [=](csi::v0::Client client) { + // Get the plugin info. + return client.GetPluginInfo(csi::v0::GetPluginInfoRequest()) + .then(defer(self(), [=]( + const csi::v0::GetPluginInfoResponse& response) { + pluginInfo = response; - if (nodeInfo.isSome() && - (controllerInfo->name() != nodeInfo->name() || - controllerInfo->vendor_version() != - nodeInfo->vendor_version())) { - LOG(WARNING) - << "Inconsistent controller and node plugin components. Please " - "check with the plugin vendor to ensure compatibility."; - } + LOG(INFO) << "Node plugin loaded: " << stringify(pluginInfo.get()); - // NOTE: We always get the latest service future before - // proceeding to the next step. - return getService(controllerContainerId); + // Get the latest service future before proceeding to the next step. + return getService(nodeContainerId.get()); })); })) - .then(defer(self(), [=](csi::Client client) { - // Probe the plugin to validate the runtime environment. - csi::ControllerProbeRequest request; - request.mutable_version()->CopyFrom(csiVersion); - - return client.ControllerProbe(request) - .then(defer(self(), [=](const csi::ControllerProbeResponse& response) { - return getService(controllerContainerId); - })); - })) - .then(defer(self(), [=](csi::Client client) { - // Get the controller capabilities. - csi::ControllerGetCapabilitiesRequest request; - request.mutable_version()->CopyFrom(csiVersion); - - return client.ControllerGetCapabilities(request) + .then(defer(self(), [=](csi::v0::Client client) { + // Get the plugin capabilities. + return client.GetPluginCapabilities( + csi::v0::GetPluginCapabilitiesRequest()) .then(defer(self(), [=]( - const csi::ControllerGetCapabilitiesResponse& response) { - controllerCapabilities = response.capabilities(); + const csi::v0::GetPluginCapabilitiesResponse& response) { + pluginCapabilities = response.capabilities(); return Nothing(); })); @@ -1995,73 +1962,101 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService() } -Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService() +// NOTE: This can only be called after `prepareIdentityService`. +Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService() { - // NOTE: This can only be called after `prepareControllerService`. - CHECK_SOME(controllerCapabilities); + CHECK_SOME(pluginInfo); - return getService(nodeContainerId) - .then(defer(self(), [=](csi::Client client) { - // Get the plugin info and check for consistency. - csi::GetPluginInfoRequest request; - request.mutable_version()->CopyFrom(csiVersion); + if (!pluginCapabilities.controllerService) { + return Nothing(); + } - return client.GetPluginInfo(request) - .then(defer(self(), [=](const csi::GetPluginInfoResponse& response) { - nodeInfo = response; + if (controllerContainerId.isNone()) { + return Failure( + stringify(CSIPluginContainerInfo::CONTROLLER_SERVICE) + " not found"); + } - LOG(INFO) << "Node plugin loaded: " << stringify(nodeInfo.get()); + return getService(controllerContainerId.get()) + .then(defer(self(), [=](csi::v0::Client client) { + // Get the controller plugin info and check for consistency. + return client.GetPluginInfo(csi::v0::GetPluginInfoRequest()) + .then(defer(self(), [=]( + const csi::v0::GetPluginInfoResponse& response) { + LOG(INFO) << "Controller plugin loaded: " << stringify(response); - if (controllerInfo.isSome() && - (controllerInfo->name() != nodeInfo->name() || - controllerInfo->vendor_version() != - nodeInfo->vendor_version())) { + if (pluginInfo->name() != response.name() || + pluginInfo->vendor_version() != response.vendor_version()) { LOG(WARNING) << "Inconsistent controller and node plugin components. Please " "check with the plugin vendor to ensure compatibility."; } - // NOTE: We always get the latest service future before - // proceeding to the next step. - return getService(nodeContainerId); + // Get the latest service future before proceeding to the next step. + return getService(controllerContainerId.get()); })); })) - .then(defer(self(), [=](csi::Client client) { - // Probe the plugin to validate the runtime environment. - csi::NodeProbeRequest request; - request.mutable_version()->CopyFrom(csiVersion); - - return client.NodeProbe(request) - .then(defer(self(), [=](const csi::NodeProbeResponse& response) { - return getService(nodeContainerId); + .then(defer(self(), [=](csi::v0::Client client) { + // Get the controller capabilities. + return client.ControllerGetCapabilities( + csi::v0::ControllerGetCapabilitiesRequest()) + .then(defer(self(), [=]( + const csi::v0::ControllerGetCapabilitiesResponse& response) { + controllerCapabilities = response.capabilities(); + + return Nothing(); })); - })) - .then(defer(self(), [=](csi::Client client) -> Future<Nothing> { - if (!controllerCapabilities->publishUnpublishVolume) { - return Nothing(); - } + })); +} - // Get the node ID. - csi::GetNodeIDRequest request; - request.mutable_version()->CopyFrom(csiVersion); - return client.GetNodeID(request) - .then(defer(self(), [=](const csi::GetNodeIDResponse& response) { - nodeId = response.node_id(); +// NOTE: This can only be called after `prepareIdentityService` and +// `prepareControllerService`. +Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService() +{ + CHECK_SOME(nodeContainerId); - return Nothing(); + return getService(nodeContainerId.get()) + .then(defer(self(), [=](csi::v0::Client client) { + // Get the node capabilities. + return client.NodeGetCapabilities(csi::v0::NodeGetCapabilitiesRequest()) + .then(defer(self(), [=]( + const csi::v0::NodeGetCapabilitiesResponse& response) + -> Future<csi::v0::Client> { + nodeCapabilities = response.capabilities(); + + // TODO(chhsiao): Implement `STAGE_UNSTAGE_VOLUME` support. + if (nodeCapabilities.stageUnstageVolume) { + return Failure( + "Node capability 'STAGE_UNSTAGE_VOLUME' is not supported"); + } + + // Get the latest service future before proceeding to the next step. + return getService(nodeContainerId.get()); + })) + .then(defer(self(), [=](csi::v0::Client client) -> Future<Nothing> { + if (!controllerCapabilities.publishUnpublishVolume) { + return Nothing(); + } + + // Get the node ID. + return client.NodeGetId(csi::v0::NodeGetIdRequest()) + .then(defer(self(), [=]( + const csi::v0::NodeGetIdResponse& response) { + nodeId = response.node_id(); + + return Nothing(); + })); })); })); } +// NOTE: This can only be called after `prepareControllerService` and +// `prepareNodeService`. Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish( const string& volumeId) { - // NOTE: This can only be called after `prepareControllerService` and - // `prepareNodeService`. - CHECK_SOME(controllerCapabilities); - CHECK(!controllerCapabilities->publishUnpublishVolume || nodeId.isSome()); + CHECK(!controllerCapabilities.publishUnpublishVolume || nodeId.isSome()); CHECK(volumes.contains(volumeId)); if (volumes.at(volumeId).state.state() == @@ -2080,11 +2075,12 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish( Future<Nothing> controllerPublished; - if (controllerCapabilities->publishUnpublishVolume) { - controllerPublished = getService(controllerContainerId) - .then(defer(self(), [=](csi::Client client) { - csi::ControllerPublishVolumeRequest request; - request.mutable_version()->CopyFrom(csiVersion); + if (controllerCapabilities.publishUnpublishVolume) { + CHECK_SOME(controllerContainerId); + + controllerPublished = getService(controllerContainerId.get()) + .then(defer(self(), [=](csi::v0::Client client) { + csi::v0::ControllerPublishVolumeRequest request; request.set_volume_id(volumeId); request.set_node_id(nodeId.get()); request.mutable_volume_capability() @@ -2095,9 +2091,9 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish( return client.ControllerPublishVolume(request) .then(defer(self(), [=]( - const csi::ControllerPublishVolumeResponse& response) { - *volumes.at(volumeId).state.mutable_publish_volume_info() = - response.publish_volume_info(); + const csi::v0::ControllerPublishVolumeResponse& response) { + *volumes.at(volumeId).state.mutable_publish_info() = + response.publish_info(); return Nothing(); })); @@ -2122,13 +2118,13 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish( } +// NOTE: This can only be called after `prepareControllerService` and +// `prepareNodeService`. Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish( const string& volumeId) { - // NOTE: This can only be called after `prepareControllerService` and - // `prepareNodeService`. - CHECK_SOME(controllerCapabilities); - CHECK(!controllerCapabilities->publishUnpublishVolume || nodeId.isSome()); + CHECK_SOME(controllerContainerId); + CHECK(!controllerCapabilities.publishUnpublishVolume || nodeId.isSome()); CHECK(volumes.contains(volumeId)); if (volumes.at(volumeId).state.state() == @@ -2147,11 +2143,10 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish( Future<Nothing> controllerUnpublished; - if (controllerCapabilities->publishUnpublishVolume) { - controllerUnpublished = getService(controllerContainerId) - .then(defer(self(), [=](csi::Client client) { - csi::ControllerUnpublishVolumeRequest request; - request.mutable_version()->CopyFrom(csiVersion); + if (controllerCapabilities.publishUnpublishVolume) { + controllerUnpublished = getService(controllerContainerId.get()) + .then(defer(self(), [=](csi::v0::Client client) { + csi::v0::ControllerUnpublishVolumeRequest request; request.set_volume_id(volumeId); request.set_node_id(nodeId.get()); @@ -2165,7 +2160,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish( return controllerUnpublished .then(defer(self(), [=] { volumes.at(volumeId).state.set_state(csi::state::VolumeState::CREATED); - volumes.at(volumeId).state.mutable_publish_volume_info()->clear(); + volumes.at(volumeId).state.mutable_publish_info()->clear(); checkpointVolumeState(volumeId); return Nothing(); @@ -2182,6 +2177,10 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish( Future<Nothing> StorageLocalResourceProviderProcess::nodePublish( const string& volumeId) { + // TODO(chhsiao): Implement `STAGE_UNSTAGE_VOLUME` support. + + CHECK_SOME(nodeContainerId); + CHECK(volumes.contains(volumeId)); if (volumes.at(volumeId).state.state() == csi::state::VolumeState::NODE_PUBLISH) { @@ -2208,13 +2207,12 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish( "Failed to create mount point '" + mountPath + "': " + mkdir.error()); } - return getService(nodeContainerId) - .then(defer(self(), [=](csi::Client client) { - csi::NodePublishVolumeRequest request; - request.mutable_version()->CopyFrom(csiVersion); + return getService(nodeContainerId.get()) + .then(defer(self(), [=](csi::v0::Client client) { + csi::v0::NodePublishVolumeRequest request; request.set_volume_id(volumeId); - *request.mutable_publish_volume_info() = - volumes.at(volumeId).state.publish_volume_info(); + *request.mutable_publish_info() = + volumes.at(volumeId).state.publish_info(); request.set_target_path(mountPath); request.mutable_volume_capability() ->CopyFrom(volumes.at(volumeId).state.volume_capability()); @@ -2243,6 +2241,10 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish( Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish( const string& volumeId) { + // TODO(chhsiao): Implement `STAGE_UNSTAGE_VOLUME` support. + + CHECK_SOME(nodeContainerId); + CHECK(volumes.contains(volumeId)); if (volumes.at(volumeId).state.state() == csi::state::VolumeState::NODE_UNPUBLISH) { @@ -2267,10 +2269,9 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish( Future<Nothing> nodeUnpublished; if (os::exists(mountPath)) { - nodeUnpublished = getService(nodeContainerId) - .then(defer(self(), [=](csi::Client client) { - csi::NodeUnpublishVolumeRequest request; - request.mutable_version()->CopyFrom(csiVersion); + nodeUnpublished = getService(nodeContainerId.get()) + .then(defer(self(), [=](csi::v0::Client client) { + csi::v0::NodeUnpublishVolumeRequest request; request.set_volume_id(volumeId); request.set_target_path(mountPath); @@ -2310,22 +2311,22 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish( // Returns a CSI volume ID. +// NOTE: This can only be called after `prepareControllerService`. Future<string> StorageLocalResourceProviderProcess::createVolume( const string& name, const Bytes& capacity, const DiskProfileAdaptor::ProfileInfo& profileInfo) { - // NOTE: This can only be called after `prepareControllerService`. - CHECK_SOME(controllerCapabilities); - - if (!controllerCapabilities->createDeleteVolume) { - return Failure("Capability 'CREATE_DELETE_VOLUME' is not supported"); + if (!controllerCapabilities.createDeleteVolume) { + return Failure( + "Controller capability 'CREATE_DELETE_VOLUME' is not supported"); } - return getService(controllerContainerId) - .then(defer(self(), [=](csi::Client client) { - csi::CreateVolumeRequest request; - request.mutable_version()->CopyFrom(csiVersion); + CHECK_SOME(controllerContainerId); + + return getService(controllerContainerId.get()) + .then(defer(self(), [=](csi::v0::Client client) { + csi::v0::CreateVolumeRequest request; request.set_name(name); request.mutable_capacity_range() ->set_required_bytes(capacity.bytes()); @@ -2335,47 +2336,49 @@ Future<string> StorageLocalResourceProviderProcess::createVolume( *request.mutable_parameters() = profileInfo.parameters; return client.CreateVolume(request) - .then(defer(self(), [=](const csi::CreateVolumeResponse& response) { - const csi::VolumeInfo& volumeInfo = response.volume_info(); + .then(defer(self(), [=](const csi::v0::CreateVolumeResponse& response) { + const csi::v0::Volume& volume = response.volume(); - if (volumes.contains(volumeInfo.id())) { + if (volumes.contains(volume.id())) { // The resource provider failed over after the last // `CreateVolume` call, but before the operation status was // checkpointed. CHECK_EQ(csi::state::VolumeState::CREATED, - volumes.at(volumeInfo.id()).state.state()); + volumes.at(volume.id()).state.state()); } else { csi::state::VolumeState volumeState; volumeState.set_state(csi::state::VolumeState::CREATED); volumeState.mutable_volume_capability() ->CopyFrom(profileInfo.capability); - *volumeState.mutable_volume_attributes() = volumeInfo.attributes(); + *volumeState.mutable_volume_attributes() = volume.attributes(); - volumes.put(volumeInfo.id(), std::move(volumeState)); - checkpointVolumeState(volumeInfo.id()); + volumes.put(volume.id(), std::move(volumeState)); + checkpointVolumeState(volume.id()); } - return volumeInfo.id(); + return volume.id(); })); })); } +// NOTE: This can only be called after `prepareControllerService` and +// `prepareNodeService` (since it may require `NodeUnpublishVolume`). Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume( const string& volumeId, bool preExisting) { - // NOTE: This can only be called after `prepareControllerService` and - // `prepareNodeService` (since it may require `NodeUnpublishVolume`). - CHECK_SOME(controllerCapabilities); - CHECK(!controllerCapabilities->publishUnpublishVolume || nodeId.isSome()); + CHECK(!controllerCapabilities.publishUnpublishVolume || nodeId.isSome()); // We do not need the capability for pre-existing volumes since no // actual `DeleteVolume` call will be made. - if (!preExisting && !controllerCapabilities->createDeleteVolume) { - return Failure("Capability 'CREATE_DELETE_VOLUME' is not supported"); + if (!preExisting && !controllerCapabilities.createDeleteVolume) { + return Failure( + "Controller capability 'CREATE_DELETE_VOLUME' is not supported"); } + CHECK_SOME(controllerContainerId); + const string volumePath = csi::paths::getVolumePath( slave::paths::getCsiRootDir(workDir), info.storage().plugin().type(), @@ -2399,10 +2402,9 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume( case csi::state::VolumeState::CREATED: { if (!preExisting) { deleted = deleted - .then(defer(self(), &Self::getService, controllerContainerId)) - .then(defer(self(), [=](csi::Client client) { - csi::DeleteVolumeRequest request; - request.mutable_version()->CopyFrom(csiVersion); + .then(defer(self(), &Self::getService, controllerContainerId.get())) + .then(defer(self(), [=](csi::v0::Client client) { + csi::v0::DeleteVolumeRequest request; request.set_volume_id(volumeId); return client.DeleteVolume(request) @@ -2443,34 +2445,41 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume( } -// Validates if a volume has the specified capability. This is called -// when applying `CREATE_VOLUME` or `CREATE_BLOCK` on a pre-existing -// volume, so we make it returns a volume ID, similar to `createVolume`. +// Validates if a volume has the specified capability. This is called when +// applying `CREATE_VOLUME` or `CREATE_BLOCK` on a pre-existing volume, so we +// make it returns a volume ID, similar to `createVolume`. +// NOTE: This can only be called after `prepareIdentityService` and only for +// newly discovered volumes. Future<string> StorageLocalResourceProviderProcess::validateCapability( const string& volumeId, const Option<Labels>& metadata, - const csi::VolumeCapability& capability) + const csi::v0::VolumeCapability& capability) { - // NOTE: This can only be called for newly discovered volumes. CHECK(!volumes.contains(volumeId)); - return getService(controllerContainerId) - .then(defer(self(), [=](csi::Client client) { + if (!pluginCapabilities.controllerService) { + return Failure( + "Plugin capability 'CONTROLLER_SERVICE' is not supported"); + } + + CHECK_SOME(controllerContainerId); + + return getService(controllerContainerId.get()) + .then(defer(self(), [=](csi::v0::Client client) { google::protobuf::Map<string, string> volumeAttributes; if (metadata.isSome()) { volumeAttributes = convertLabelsToStringMap(metadata.get()).get(); } - csi::ValidateVolumeCapabilitiesRequest request; - request.mutable_version()->CopyFrom(csiVersion); + csi::v0::ValidateVolumeCapabilitiesRequest request; request.set_volume_id(volumeId); request.add_volume_capabilities()->CopyFrom(capability); *request.mutable_volume_attributes() = volumeAttributes; return client.ValidateVolumeCapabilities(request) .then(defer(self(), [=]( - const csi::ValidateVolumeCapabilitiesResponse& response) + const csi::v0::ValidateVolumeCapabilitiesResponse& response) -> Future<string> { if (!response.supported()) { return Failure( @@ -2492,27 +2501,25 @@ Future<string> StorageLocalResourceProviderProcess::validateCapability( } +// NOTE: This can only be called after `prepareControllerService` and +// the resource provider ID has been obtained. Future<Resources> StorageLocalResourceProviderProcess::listVolumes() { - // NOTE: This can only be called after `prepareControllerService` and - // the resource provider ID has been obtained. - CHECK_SOME(controllerCapabilities); CHECK(info.has_id()); // This is only used for reconciliation so no failure is returned. - if (!controllerCapabilities->listVolumes) { + if (!controllerCapabilities.listVolumes) { return Resources(); } - return getService(controllerContainerId) - .then(defer(self(), [=](csi::Client client) { + CHECK_SOME(controllerContainerId); + + return getService(controllerContainerId.get()) + .then(defer(self(), [=](csi::v0::Client client) { // TODO(chhsiao): Set the max entries and use a loop to do // mutliple `ListVolumes` calls. - csi::ListVolumesRequest request; - request.mutable_version()->CopyFrom(csiVersion); - - return client.ListVolumes(request) - .then(defer(self(), [=](const csi::ListVolumesResponse& response) { + return client.ListVolumes(csi::v0::ListVolumesRequest()) + .then(defer(self(), [=](const csi::v0::ListVolumesResponse& response) { Resources resources; // Recover disk profiles from the checkpointed state. @@ -2529,15 +2536,14 @@ Future<Resources> StorageLocalResourceProviderProcess::listVolumes() foreach (const auto& entry, response.entries()) { resources += createRawDiskResource( info, - Bytes(entry.volume_info().capacity_bytes()), - volumesToProfiles.contains(entry.volume_info().id()) - ? volumesToProfiles.at(entry.volume_info().id()) + Bytes(entry.volume().capacity_bytes()), + volumesToProfiles.contains(entry.volume().id()) + ? volumesToProfiles.at(entry.volume().id()) : Option<string>::none(), - entry.volume_info().id(), - entry.volume_info().attributes().empty() + entry.volume().id(), + entry.volume().attributes().empty() ? Option<Labels>::none() - : convertStringMapToLabels( - entry.volume_info().attributes())); + : convertStringMapToLabels(entry.volume().attributes())); } return resources; @@ -2546,20 +2552,21 @@ Future<Resources> StorageLocalResourceProviderProcess::listVolumes() } +// NOTE: This can only be called after `prepareControllerService` and +// the resource provider ID has been obtained. Future<Resources> StorageLocalResourceProviderProcess::getCapacities() { - // NOTE: This can only be called after `prepareControllerService` and - // the resource provider ID has been obtained. - CHECK_SOME(controllerCapabilities); CHECK(info.has_id()); // This is only used for reconciliation so no failure is returned. - if (!controllerCapabilities->getCapacity) { + if (!controllerCapabilities.getCapacity) { return Resources(); } - return getService(controllerContainerId) - .then(defer(self(), [=](csi::Client client) { + CHECK_SOME(controllerContainerId); + + return getService(controllerContainerId.get()) + .then(defer(self(), [=](csi::v0::Client client) { list<Future<Resources>> futures; foreach (const string& profile, knownProfiles) { @@ -2570,14 +2577,13 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities() const DiskProfileAdaptor::ProfileInfo& profileInfo = profileInfos.at(profile); - csi::GetCapacityRequest request; - request.mutable_version()->CopyFrom(csiVersion); + csi::v0::GetCapacityRequest request; request.add_volume_capabilities()->CopyFrom(profileInfo.capability); *request.mutable_parameters() = profileInfo.parameters; futures.push_back(client.GetCapacity(request) .then(defer(self(), [=]( - const csi::GetCapacityResponse& response) -> Resources { + const csi::v0::GetCapacityResponse& response) -> Resources { if (response.available_capacity() == 0) { return Resources(); } @@ -3218,26 +3224,22 @@ Try<Owned<LocalResourceProvider>> StorageLocalResourceProvider::create( "' does not follow Java package naming convention"); } - bool hasControllerService = false; + // Verify that the plugin provides the CSI node service. + // TODO(chhsiao): We should move this check to a validation function + // for `CSIPluginInfo`. bool hasNodeService = false; foreach (const CSIPluginContainerInfo& container, info.storage().plugin().containers()) { - for (int i = 0; i < container.services_size(); i++) { - const CSIPluginContainerInfo::Service service = container.services(i); - if (service == CSIPluginContainerInfo::CONTROLLER_SERVICE) { - hasControllerService = true; - } else if (service == CSIPluginContainerInfo::NODE_SERVICE) { - hasNodeService = true; - } + if (container.services().end() != find( + container.services().begin(), + container.services().end(), + CSIPluginContainerInfo::NODE_SERVICE)) { + hasNodeService = true; + break; } } - if (!hasControllerService) { - return Error( - stringify(CSIPluginContainerInfo::CONTROLLER_SERVICE) + " not found"); - } - if (!hasNodeService) { return Error( stringify(CSIPluginContainerInfo::NODE_SERVICE) + " not found");