This is an automated email from the ASF dual-hosted git repository. grag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 38ba19127ddb48244f7c6c699e3c41e5ea12b594 Author: Greg Mann <g...@mesosphere.io> AuthorDate: Mon Aug 10 20:26:26 2020 -0700 Added support for secrets to the CSI volume managers. Review: https://reviews.apache.org/r/72732/ --- src/csi/state.proto | 6 ++ src/csi/v0_volume_manager.cpp | 103 +++++++++++++++++++++++++++++++--- src/csi/v0_volume_manager.hpp | 5 +- src/csi/v0_volume_manager_process.hpp | 13 ++++- src/csi/v1_volume_manager.cpp | 96 +++++++++++++++++++++++++++++-- src/csi/v1_volume_manager.hpp | 5 +- src/csi/v1_volume_manager_process.hpp | 13 ++++- src/csi/volume_manager.cpp | 21 ++++++- src/csi/volume_manager.hpp | 5 +- 9 files changed, 246 insertions(+), 21 deletions(-) diff --git a/src/csi/state.proto b/src/csi/state.proto index 836e30c..630e4f5 100644 --- a/src/csi/state.proto +++ b/src/csi/state.proto @@ -78,4 +78,10 @@ message VolumeState { // Indicates that the volume must be mounted read-only. bool readonly = 9; + + // Secrets to be included in `NodeStageVolumeRequest`. + map<string, Secret> node_stage_secrets = 10; + + // Secrets to be included in `NodePublishVolumeRequest`. + map<string, Secret> node_publish_secrets = 11; } diff --git a/src/csi/v0_volume_manager.cpp b/src/csi/v0_volume_manager.cpp index 89a6da5..9e840a7 100644 --- a/src/csi/v0_volume_manager.cpp +++ b/src/csi/v0_volume_manager.cpp @@ -21,6 +21,8 @@ #include <functional> #include <list> +#include <mesos/secret/resolver.hpp> + #include <process/after.hpp> #include <process/collect.hpp> #include <process/defer.hpp> @@ -81,14 +83,16 @@ VolumeManagerProcess::VolumeManagerProcess( const hashset<Service> _services, const Runtime& _runtime, ServiceManager* _serviceManager, - Metrics* _metrics) + Metrics* _metrics, + SecretResolver* _secretResolver) : ProcessBase(process::ID::generate("csi-v0-volume-manager")), rootDir(_rootDir), info(_info), services(_services), runtime(_runtime), serviceManager(_serviceManager), - metrics(_metrics) + metrics(_metrics), + secretResolver(_secretResolver) { // This should have been validated in `VolumeManager::create`. CHECK(!services.empty()) @@ -961,8 +965,33 @@ Future<Nothing> VolumeManagerProcess::_publishVolume(const string& volumeId) request.set_staging_target_path(stagingPath); } - return call(NODE_SERVICE, &Client::nodePublishVolume, std::move(request)) - .then(defer(self(), [this, volumeId, targetPath] { + Future<NodePublishVolumeResponse> rpcResult; + + if (!volumeState.node_publish_secrets().empty()) { + rpcResult = resolveSecrets(volumeState.node_publish_secrets()) + .then(process::defer( + self(), + [this, request](const Map<string, string>& secrets) { + NodePublishVolumeRequest request_(request); + *request_.mutable_node_publish_secrets() = secrets; + + return call( + NODE_SERVICE, + &Client::nodePublishVolume, + std::move(request_)); + })); + } else { + rpcResult = + call(NODE_SERVICE, &Client::nodePublishVolume, std::move(request)); + } + + return rpcResult + .then(process::defer(self(), [this, volumeId, targetPath]() + -> Future<Nothing> { + if (!os::exists(targetPath)) { + return Failure("Target path '" + targetPath + "' not created"); + } + CHECK(volumes.contains(volumeId)); VolumeState& volumeState = volumes.at(volumeId).state; @@ -1042,7 +1071,25 @@ Future<Nothing> VolumeManagerProcess::__publishVolume(const string& volumeId) evolve(volumeState.volume_capability()); *request.mutable_volume_attributes() = volumeState.volume_context(); - return call(NODE_SERVICE, &Client::nodeStageVolume, std::move(request)) + Future<NodeStageVolumeResponse> rpcResult; + + if (!volumeState.node_stage_secrets().empty()) { + rpcResult = resolveSecrets(volumeState.node_stage_secrets()) + .then([=](const Map<string, string>& secrets) { + NodeStageVolumeRequest request_(request); + *request_.mutable_node_stage_secrets() = secrets; + + return call( + NODE_SERVICE, + &Client::nodeStageVolume, + std::move(request_)); + }); + } else { + rpcResult = + call(NODE_SERVICE, &Client::nodeStageVolume, std::move(request)); + } + + return rpcResult .then(process::defer(self(), [this, volumeId] { CHECK(volumes.contains(volumeId)); VolumeState& volumeState = volumes.at(volumeId).state; @@ -1236,20 +1283,62 @@ void VolumeManagerProcess::removeVolume(const string& volumeId) } +Future<Map<string, string>> VolumeManagerProcess::resolveSecrets( + const Map<string, Secret>& secrets) +{ + if (!secretResolver) { + return Failure( + "CSI volume included secrets but the agent was not initialized with " + "a secret resolver"); + } + + // This `futures` is used below with `process::collect()` to synchronize the + // continuation. Within the continuation itself, we need to have the + // key:value mapping of the secrets, so we use `resolvedSecrets` instead. + vector<Future<Secret::Value>> futures; + hashmap<string, Future<Secret::Value>> resolvedSecrets; + + for (auto it = secrets.begin(); it != secrets.end(); ++it) { + Future<Secret::Value> pendingSecret = secretResolver->resolve(it->second); + + futures.push_back(pendingSecret); + resolvedSecrets.insert({it->first, pendingSecret}); + } + + return process::collect(futures) + .then([=]() { + Map<string, string> result; + + foreachpair ( + const string& key, + const Future<Secret::Value>& secret, + resolvedSecrets) { + CHECK(secret.isReady()); + + result.insert({key, secret->data()}); + } + + return result; + }); +} + + VolumeManager::VolumeManager( const string& rootDir, const CSIPluginInfo& info, const hashset<Service>& services, const Runtime& runtime, ServiceManager* serviceManager, - Metrics* metrics) + Metrics* metrics, + SecretResolver* secretResolver) : process(new VolumeManagerProcess( rootDir, info, services, runtime, serviceManager, - metrics)) + metrics, + secretResolver)) { process::spawn(CHECK_NOTNULL(process.get())); recovered = process::dispatch(process.get(), &VolumeManagerProcess::recover); diff --git a/src/csi/v0_volume_manager.hpp b/src/csi/v0_volume_manager.hpp index 93183c2..a984711 100644 --- a/src/csi/v0_volume_manager.hpp +++ b/src/csi/v0_volume_manager.hpp @@ -24,6 +24,8 @@ #include <mesos/mesos.hpp> +#include <mesos/secret/resolver.hpp> + #include <process/future.hpp> #include <process/grpc.hpp> #include <process/http.hpp> @@ -57,7 +59,8 @@ public: const hashset<Service>& services, const process::grpc::client::Runtime& runtime, ServiceManager* serviceManager, - Metrics* metrics); + Metrics* metrics, + SecretResolver* secretResolver); // Since this class contains `Owned` members which should not but can be // copied, explicitly make this class non-copyable. diff --git a/src/csi/v0_volume_manager_process.hpp b/src/csi/v0_volume_manager_process.hpp index 7548c43..1162955 100644 --- a/src/csi/v0_volume_manager_process.hpp +++ b/src/csi/v0_volume_manager_process.hpp @@ -24,6 +24,8 @@ #include <mesos/mesos.hpp> +#include <mesos/secret/resolver.hpp> + #include <process/future.hpp> #include <process/grpc.hpp> #include <process/http.hpp> @@ -62,7 +64,8 @@ public: const hashset<Service> _services, const process::grpc::client::Runtime& _runtime, ServiceManager* _serviceManager, - Metrics* _metrics); + Metrics* _metrics, + SecretResolver* _secretResolver); process::Future<Nothing> recover(); @@ -173,6 +176,13 @@ private: // from memory and from disk. void removeVolume(const std::string& volumeId); + // If the volume manager was initialized with a non-null secret resolver, this + // helper function will resolve any secrets in the provided map. + // Returns a map containing the resolved secrets. + process::Future<google::protobuf::Map<std::string, std::string>> + resolveSecrets( + const google::protobuf::Map<std::string, Secret>& secrets); + const std::string rootDir; const CSIPluginInfo info; const hashset<Service> services; @@ -180,6 +190,7 @@ private: process::grpc::client::Runtime runtime; ServiceManager* serviceManager; Metrics* metrics; + SecretResolver* secretResolver; Option<std::string> bootId; Option<PluginCapabilities> pluginCapabilities; diff --git a/src/csi/v1_volume_manager.cpp b/src/csi/v1_volume_manager.cpp index 5178b2f..7230676 100644 --- a/src/csi/v1_volume_manager.cpp +++ b/src/csi/v1_volume_manager.cpp @@ -21,6 +21,8 @@ #include <functional> #include <list> +#include <mesos/secret/resolver.hpp> + #include <process/after.hpp> #include <process/collect.hpp> #include <process/defer.hpp> @@ -82,14 +84,16 @@ VolumeManagerProcess::VolumeManagerProcess( const hashset<Service> _services, const Runtime& _runtime, ServiceManager* _serviceManager, - Metrics* _metrics) + Metrics* _metrics, + SecretResolver* _secretResolver) : ProcessBase(process::ID::generate("csi-v1-volume-manager")), rootDir(_rootDir), info(_info), services(_services), runtime(_runtime), serviceManager(_serviceManager), - metrics(_metrics) + metrics(_metrics), + secretResolver(_secretResolver) { // This should have been validated in `VolumeManager::create`. CHECK(!services.empty()) @@ -987,7 +991,27 @@ Future<Nothing> VolumeManagerProcess::_publishVolume(const string& volumeId) request.set_staging_target_path(stagingPath); } - return call(NODE_SERVICE, &Client::nodePublishVolume, std::move(request)) + Future<NodePublishVolumeResponse> rpcResult; + + if (!volumeState.node_publish_secrets().empty()) { + rpcResult = resolveSecrets(volumeState.node_publish_secrets()) + .then(process::defer( + self(), + [this, request](const Map<string, string>& secrets) { + NodePublishVolumeRequest request_(request); + *request_.mutable_secrets() = secrets; + + return call( + NODE_SERVICE, + &Client::nodePublishVolume, + std::move(request_)); + })); + } else { + rpcResult = + call(NODE_SERVICE, &Client::nodePublishVolume, std::move(request)); + } + + return rpcResult .then(process::defer(self(), [this, volumeId, targetPath]() -> Future<Nothing> { if (!os::exists(targetPath)) { @@ -1073,7 +1097,25 @@ Future<Nothing> VolumeManagerProcess::__publishVolume(const string& volumeId) evolve(volumeState.volume_capability()); *request.mutable_volume_context() = volumeState.volume_context(); - return call(NODE_SERVICE, &Client::nodeStageVolume, std::move(request)) + Future<NodeStageVolumeResponse> rpcResult; + + if (!volumeState.node_stage_secrets().empty()) { + rpcResult = resolveSecrets(volumeState.node_stage_secrets()) + .then([=](const Map<string, string>& secrets) { + NodeStageVolumeRequest request_(request); + *request_.mutable_secrets() = secrets; + + return call( + NODE_SERVICE, + &Client::nodeStageVolume, + std::move(request_)); + }); + } else { + rpcResult = + call(NODE_SERVICE, &Client::nodeStageVolume, std::move(request)); + } + + return rpcResult .then(process::defer(self(), [this, volumeId] { CHECK(volumes.contains(volumeId)); VolumeState& volumeState = volumes.at(volumeId).state; @@ -1270,20 +1312,62 @@ void VolumeManagerProcess::removeVolume(const string& volumeId) } +Future<Map<string, string>> VolumeManagerProcess::resolveSecrets( + const Map<string, Secret>& secrets) +{ + if (!secretResolver) { + return Failure( + "CSI volume included secrets but the agent was not initialized with " + "a secret resolver"); + } + + // This `futures` is used below with `process::collect()` to synchronize the + // continuation. Within the continuation itself, we need to have the + // key:value mapping of the secrets, so we use `resolvedSecrets` instead. + vector<Future<Secret::Value>> futures; + hashmap<string, Future<Secret::Value>> resolvedSecrets; + + for (auto it = secrets.begin(); it != secrets.end(); ++it) { + Future<Secret::Value> pendingSecret = secretResolver->resolve(it->second); + + futures.push_back(pendingSecret); + resolvedSecrets.insert({it->first, pendingSecret}); + } + + return process::collect(futures) + .then([=]() { + Map<string, string> result; + + foreachpair ( + const string& key, + const Future<Secret::Value>& secret, + resolvedSecrets) { + CHECK(secret.isReady()); + + result.insert({key, secret->data()}); + } + + return result; + }); +} + + VolumeManager::VolumeManager( const string& rootDir, const CSIPluginInfo& info, const hashset<Service>& services, const Runtime& runtime, ServiceManager* serviceManager, - Metrics* metrics) + Metrics* metrics, + SecretResolver* secretResolver) : process(new VolumeManagerProcess( rootDir, info, services, runtime, serviceManager, - metrics)) + metrics, + secretResolver)) { process::spawn(CHECK_NOTNULL(process.get())); recovered = process::dispatch(process.get(), &VolumeManagerProcess::recover); diff --git a/src/csi/v1_volume_manager.hpp b/src/csi/v1_volume_manager.hpp index 2f7897d..03a6eee 100644 --- a/src/csi/v1_volume_manager.hpp +++ b/src/csi/v1_volume_manager.hpp @@ -24,6 +24,8 @@ #include <mesos/mesos.hpp> +#include <mesos/secret/resolver.hpp> + #include <process/future.hpp> #include <process/grpc.hpp> #include <process/http.hpp> @@ -57,7 +59,8 @@ public: const hashset<Service>& services, const process::grpc::client::Runtime& runtime, ServiceManager* serviceManager, - Metrics* metrics); + Metrics* metrics, + SecretResolver* secretResolver); // Since this class contains `Owned` members which should not but can be // copied, explicitly make this class non-copyable. diff --git a/src/csi/v1_volume_manager_process.hpp b/src/csi/v1_volume_manager_process.hpp index b8a1ef7..63dc03e 100644 --- a/src/csi/v1_volume_manager_process.hpp +++ b/src/csi/v1_volume_manager_process.hpp @@ -24,6 +24,8 @@ #include <mesos/mesos.hpp> +#include <mesos/secret/resolver.hpp> + #include <process/future.hpp> #include <process/grpc.hpp> #include <process/http.hpp> @@ -62,7 +64,8 @@ public: const hashset<Service> _services, const process::grpc::client::Runtime& _runtime, ServiceManager* _serviceManager, - Metrics* _metrics); + Metrics* _metrics, + SecretResolver* _secretResolver); process::Future<Nothing> recover(); @@ -173,6 +176,13 @@ private: // from memory and from disk. void removeVolume(const std::string& volumeId); + // If the volume manager was initialized with a non-null secret resolver, this + // helper function will resolve any secrets in the provided map. + // Returns a map containing the resolved secrets. + process::Future<google::protobuf::Map<std::string, std::string>> + resolveSecrets( + const google::protobuf::Map<std::string, Secret>& secrets); + const std::string rootDir; const CSIPluginInfo info; const hashset<Service> services; @@ -180,6 +190,7 @@ private: process::grpc::client::Runtime runtime; ServiceManager* serviceManager; Metrics* metrics; + SecretResolver* secretResolver; Option<std::string> bootId; Option<PluginCapabilities> pluginCapabilities; diff --git a/src/csi/volume_manager.cpp b/src/csi/volume_manager.cpp index c47adfe..1ac2209 100644 --- a/src/csi/volume_manager.cpp +++ b/src/csi/volume_manager.cpp @@ -21,6 +21,8 @@ #include <mesos/csi/v0.hpp> #include <mesos/csi/v1.hpp> +#include <mesos/secret/resolver.hpp> + #include "csi/service_manager.hpp" #include "csi/v0_volume_manager.hpp" #include "csi/v1_volume_manager.hpp" @@ -43,7 +45,8 @@ Try<Owned<VolumeManager>> VolumeManager::create( const string& apiVersion, const Runtime& runtime, ServiceManager* serviceManager, - Metrics* metrics) + Metrics* metrics, + SecretResolver* secretResolver) { if (services.empty()) { return Error( @@ -53,10 +56,22 @@ Try<Owned<VolumeManager>> VolumeManager::create( if (apiVersion == v0::API_VERSION) { return Try<Owned<VolumeManager>>(new v0::VolumeManager( - rootDir, info, services, runtime, serviceManager, metrics)); + rootDir, + info, + services, + runtime, + serviceManager, + metrics, + secretResolver)); } else if (apiVersion == v1::API_VERSION) { return Try<Owned<VolumeManager>>(new v1::VolumeManager( - rootDir, info, services, runtime, serviceManager, metrics)); + rootDir, + info, + services, + runtime, + serviceManager, + metrics, + secretResolver)); } return Error("Unsupported CSI API version: " + apiVersion); diff --git a/src/csi/volume_manager.hpp b/src/csi/volume_manager.hpp index 57e7c51..8262183 100644 --- a/src/csi/volume_manager.hpp +++ b/src/csi/volume_manager.hpp @@ -24,6 +24,8 @@ #include <mesos/mesos.hpp> +#include <mesos/secret/resolver.hpp> + #include <process/future.hpp> #include <process/grpc.hpp> #include <process/http.hpp> @@ -62,7 +64,8 @@ public: const std::string& apiVersion, const process::grpc::client::Runtime& runtime, ServiceManager* serviceManager, - Metrics* metrics); + Metrics* metrics, + SecretResolver* secretResolver = nullptr); virtual ~VolumeManager() = default;