This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 38ba19127ddb48244f7c6c699e3c41e5ea12b594
Author: Greg Mann <g...@mesosphere.io>
AuthorDate: Mon Aug 10 20:26:26 2020 -0700

    Added support for secrets to the CSI volume managers.
    
    Review: https://reviews.apache.org/r/72732/
---
 src/csi/state.proto                   |   6 ++
 src/csi/v0_volume_manager.cpp         | 103 +++++++++++++++++++++++++++++++---
 src/csi/v0_volume_manager.hpp         |   5 +-
 src/csi/v0_volume_manager_process.hpp |  13 ++++-
 src/csi/v1_volume_manager.cpp         |  96 +++++++++++++++++++++++++++++--
 src/csi/v1_volume_manager.hpp         |   5 +-
 src/csi/v1_volume_manager_process.hpp |  13 ++++-
 src/csi/volume_manager.cpp            |  21 ++++++-
 src/csi/volume_manager.hpp            |   5 +-
 9 files changed, 246 insertions(+), 21 deletions(-)

diff --git a/src/csi/state.proto b/src/csi/state.proto
index 836e30c..630e4f5 100644
--- a/src/csi/state.proto
+++ b/src/csi/state.proto
@@ -78,4 +78,10 @@ message VolumeState {
 
   // Indicates that the volume must be mounted read-only.
   bool readonly = 9;
+
+  // Secrets to be included in `NodeStageVolumeRequest`.
+  map<string, Secret> node_stage_secrets = 10;
+
+  // Secrets to be included in `NodePublishVolumeRequest`.
+  map<string, Secret> node_publish_secrets = 11;
 }
diff --git a/src/csi/v0_volume_manager.cpp b/src/csi/v0_volume_manager.cpp
index 89a6da5..9e840a7 100644
--- a/src/csi/v0_volume_manager.cpp
+++ b/src/csi/v0_volume_manager.cpp
@@ -21,6 +21,8 @@
 #include <functional>
 #include <list>
 
+#include <mesos/secret/resolver.hpp>
+
 #include <process/after.hpp>
 #include <process/collect.hpp>
 #include <process/defer.hpp>
@@ -81,14 +83,16 @@ VolumeManagerProcess::VolumeManagerProcess(
     const hashset<Service> _services,
     const Runtime& _runtime,
     ServiceManager* _serviceManager,
-    Metrics* _metrics)
+    Metrics* _metrics,
+    SecretResolver* _secretResolver)
   : ProcessBase(process::ID::generate("csi-v0-volume-manager")),
     rootDir(_rootDir),
     info(_info),
     services(_services),
     runtime(_runtime),
     serviceManager(_serviceManager),
-    metrics(_metrics)
+    metrics(_metrics),
+    secretResolver(_secretResolver)
 {
   // This should have been validated in `VolumeManager::create`.
   CHECK(!services.empty())
@@ -961,8 +965,33 @@ Future<Nothing> VolumeManagerProcess::_publishVolume(const 
string& volumeId)
     request.set_staging_target_path(stagingPath);
   }
 
-  return call(NODE_SERVICE, &Client::nodePublishVolume, std::move(request))
-    .then(defer(self(), [this, volumeId, targetPath] {
+  Future<NodePublishVolumeResponse> rpcResult;
+
+  if (!volumeState.node_publish_secrets().empty()) {
+    rpcResult = resolveSecrets(volumeState.node_publish_secrets())
+      .then(process::defer(
+          self(),
+          [this, request](const Map<string, string>& secrets) {
+            NodePublishVolumeRequest request_(request);
+            *request_.mutable_node_publish_secrets() = secrets;
+
+            return call(
+                NODE_SERVICE,
+                &Client::nodePublishVolume,
+                std::move(request_));
+          }));
+  } else {
+    rpcResult =
+      call(NODE_SERVICE, &Client::nodePublishVolume, std::move(request));
+  }
+
+  return rpcResult
+    .then(process::defer(self(), [this, volumeId, targetPath]()
+        -> Future<Nothing> {
+      if (!os::exists(targetPath)) {
+        return Failure("Target path '" + targetPath + "' not created");
+      }
+
       CHECK(volumes.contains(volumeId));
       VolumeState& volumeState = volumes.at(volumeId).state;
 
@@ -1042,7 +1071,25 @@ Future<Nothing> 
VolumeManagerProcess::__publishVolume(const string& volumeId)
     evolve(volumeState.volume_capability());
   *request.mutable_volume_attributes() = volumeState.volume_context();
 
-  return call(NODE_SERVICE, &Client::nodeStageVolume, std::move(request))
+  Future<NodeStageVolumeResponse> rpcResult;
+
+  if (!volumeState.node_stage_secrets().empty()) {
+    rpcResult = resolveSecrets(volumeState.node_stage_secrets())
+      .then([=](const Map<string, string>& secrets) {
+        NodeStageVolumeRequest request_(request);
+        *request_.mutable_node_stage_secrets() = secrets;
+
+        return call(
+            NODE_SERVICE,
+            &Client::nodeStageVolume,
+            std::move(request_));
+      });
+  } else {
+    rpcResult =
+      call(NODE_SERVICE, &Client::nodeStageVolume, std::move(request));
+  }
+
+  return rpcResult
     .then(process::defer(self(), [this, volumeId] {
       CHECK(volumes.contains(volumeId));
       VolumeState& volumeState = volumes.at(volumeId).state;
@@ -1236,20 +1283,62 @@ void VolumeManagerProcess::removeVolume(const string& 
volumeId)
 }
 
 
+Future<Map<string, string>> VolumeManagerProcess::resolveSecrets(
+    const Map<string, Secret>& secrets)
+{
+  if (!secretResolver) {
+    return Failure(
+        "CSI volume included secrets but the agent was not initialized with "
+        "a secret resolver");
+  }
+
+  // This `futures` is used below with `process::collect()` to synchronize the
+  // continuation. Within the continuation itself, we need to have the
+  // key:value mapping of the secrets, so we use `resolvedSecrets` instead.
+  vector<Future<Secret::Value>> futures;
+  hashmap<string, Future<Secret::Value>> resolvedSecrets;
+
+  for (auto it = secrets.begin(); it != secrets.end(); ++it) {
+    Future<Secret::Value> pendingSecret = secretResolver->resolve(it->second);
+
+    futures.push_back(pendingSecret);
+    resolvedSecrets.insert({it->first, pendingSecret});
+  }
+
+  return process::collect(futures)
+    .then([=]() {
+      Map<string, string> result;
+
+      foreachpair (
+          const string& key,
+          const Future<Secret::Value>& secret,
+          resolvedSecrets) {
+        CHECK(secret.isReady());
+
+        result.insert({key, secret->data()});
+      }
+
+      return result;
+    });
+}
+
+
 VolumeManager::VolumeManager(
     const string& rootDir,
     const CSIPluginInfo& info,
     const hashset<Service>& services,
     const Runtime& runtime,
     ServiceManager* serviceManager,
-    Metrics* metrics)
+    Metrics* metrics,
+    SecretResolver* secretResolver)
   : process(new VolumeManagerProcess(
         rootDir,
         info,
         services,
         runtime,
         serviceManager,
-        metrics))
+        metrics,
+        secretResolver))
 {
   process::spawn(CHECK_NOTNULL(process.get()));
   recovered = process::dispatch(process.get(), &VolumeManagerProcess::recover);
diff --git a/src/csi/v0_volume_manager.hpp b/src/csi/v0_volume_manager.hpp
index 93183c2..a984711 100644
--- a/src/csi/v0_volume_manager.hpp
+++ b/src/csi/v0_volume_manager.hpp
@@ -24,6 +24,8 @@
 
 #include <mesos/mesos.hpp>
 
+#include <mesos/secret/resolver.hpp>
+
 #include <process/future.hpp>
 #include <process/grpc.hpp>
 #include <process/http.hpp>
@@ -57,7 +59,8 @@ public:
       const hashset<Service>& services,
       const process::grpc::client::Runtime& runtime,
       ServiceManager* serviceManager,
-      Metrics* metrics);
+      Metrics* metrics,
+      SecretResolver* secretResolver);
 
   // Since this class contains `Owned` members which should not but can be
   // copied, explicitly make this class non-copyable.
diff --git a/src/csi/v0_volume_manager_process.hpp 
b/src/csi/v0_volume_manager_process.hpp
index 7548c43..1162955 100644
--- a/src/csi/v0_volume_manager_process.hpp
+++ b/src/csi/v0_volume_manager_process.hpp
@@ -24,6 +24,8 @@
 
 #include <mesos/mesos.hpp>
 
+#include <mesos/secret/resolver.hpp>
+
 #include <process/future.hpp>
 #include <process/grpc.hpp>
 #include <process/http.hpp>
@@ -62,7 +64,8 @@ public:
       const hashset<Service> _services,
       const process::grpc::client::Runtime& _runtime,
       ServiceManager* _serviceManager,
-      Metrics* _metrics);
+      Metrics* _metrics,
+      SecretResolver* _secretResolver);
 
   process::Future<Nothing> recover();
 
@@ -173,6 +176,13 @@ private:
   // from memory and from disk.
   void removeVolume(const std::string& volumeId);
 
+  // If the volume manager was initialized with a non-null secret resolver, 
this
+  // helper function will resolve any secrets in the provided map.
+  // Returns a map containing the resolved secrets.
+  process::Future<google::protobuf::Map<std::string, std::string>>
+    resolveSecrets(
+        const google::protobuf::Map<std::string, Secret>& secrets);
+
   const std::string rootDir;
   const CSIPluginInfo info;
   const hashset<Service> services;
@@ -180,6 +190,7 @@ private:
   process::grpc::client::Runtime runtime;
   ServiceManager* serviceManager;
   Metrics* metrics;
+  SecretResolver* secretResolver;
 
   Option<std::string> bootId;
   Option<PluginCapabilities> pluginCapabilities;
diff --git a/src/csi/v1_volume_manager.cpp b/src/csi/v1_volume_manager.cpp
index 5178b2f..7230676 100644
--- a/src/csi/v1_volume_manager.cpp
+++ b/src/csi/v1_volume_manager.cpp
@@ -21,6 +21,8 @@
 #include <functional>
 #include <list>
 
+#include <mesos/secret/resolver.hpp>
+
 #include <process/after.hpp>
 #include <process/collect.hpp>
 #include <process/defer.hpp>
@@ -82,14 +84,16 @@ VolumeManagerProcess::VolumeManagerProcess(
     const hashset<Service> _services,
     const Runtime& _runtime,
     ServiceManager* _serviceManager,
-    Metrics* _metrics)
+    Metrics* _metrics,
+    SecretResolver* _secretResolver)
   : ProcessBase(process::ID::generate("csi-v1-volume-manager")),
     rootDir(_rootDir),
     info(_info),
     services(_services),
     runtime(_runtime),
     serviceManager(_serviceManager),
-    metrics(_metrics)
+    metrics(_metrics),
+    secretResolver(_secretResolver)
 {
   // This should have been validated in `VolumeManager::create`.
   CHECK(!services.empty())
@@ -987,7 +991,27 @@ Future<Nothing> VolumeManagerProcess::_publishVolume(const 
string& volumeId)
     request.set_staging_target_path(stagingPath);
   }
 
-  return call(NODE_SERVICE, &Client::nodePublishVolume, std::move(request))
+  Future<NodePublishVolumeResponse> rpcResult;
+
+  if (!volumeState.node_publish_secrets().empty()) {
+    rpcResult = resolveSecrets(volumeState.node_publish_secrets())
+      .then(process::defer(
+          self(),
+          [this, request](const Map<string, string>& secrets) {
+            NodePublishVolumeRequest request_(request);
+            *request_.mutable_secrets() = secrets;
+
+            return call(
+                NODE_SERVICE,
+                &Client::nodePublishVolume,
+                std::move(request_));
+          }));
+  } else {
+    rpcResult =
+      call(NODE_SERVICE, &Client::nodePublishVolume, std::move(request));
+  }
+
+  return rpcResult
     .then(process::defer(self(), [this, volumeId, targetPath]()
         -> Future<Nothing> {
       if (!os::exists(targetPath)) {
@@ -1073,7 +1097,25 @@ Future<Nothing> 
VolumeManagerProcess::__publishVolume(const string& volumeId)
     evolve(volumeState.volume_capability());
   *request.mutable_volume_context() = volumeState.volume_context();
 
-  return call(NODE_SERVICE, &Client::nodeStageVolume, std::move(request))
+  Future<NodeStageVolumeResponse> rpcResult;
+
+  if (!volumeState.node_stage_secrets().empty()) {
+    rpcResult = resolveSecrets(volumeState.node_stage_secrets())
+      .then([=](const Map<string, string>& secrets) {
+        NodeStageVolumeRequest request_(request);
+        *request_.mutable_secrets() = secrets;
+
+        return call(
+            NODE_SERVICE,
+            &Client::nodeStageVolume,
+            std::move(request_));
+      });
+  } else {
+    rpcResult =
+      call(NODE_SERVICE, &Client::nodeStageVolume, std::move(request));
+  }
+
+  return rpcResult
     .then(process::defer(self(), [this, volumeId] {
       CHECK(volumes.contains(volumeId));
       VolumeState& volumeState = volumes.at(volumeId).state;
@@ -1270,20 +1312,62 @@ void VolumeManagerProcess::removeVolume(const string& 
volumeId)
 }
 
 
+Future<Map<string, string>> VolumeManagerProcess::resolveSecrets(
+    const Map<string, Secret>& secrets)
+{
+  if (!secretResolver) {
+    return Failure(
+        "CSI volume included secrets but the agent was not initialized with "
+        "a secret resolver");
+  }
+
+  // This `futures` is used below with `process::collect()` to synchronize the
+  // continuation. Within the continuation itself, we need to have the
+  // key:value mapping of the secrets, so we use `resolvedSecrets` instead.
+  vector<Future<Secret::Value>> futures;
+  hashmap<string, Future<Secret::Value>> resolvedSecrets;
+
+  for (auto it = secrets.begin(); it != secrets.end(); ++it) {
+    Future<Secret::Value> pendingSecret = secretResolver->resolve(it->second);
+
+    futures.push_back(pendingSecret);
+    resolvedSecrets.insert({it->first, pendingSecret});
+  }
+
+  return process::collect(futures)
+    .then([=]() {
+      Map<string, string> result;
+
+      foreachpair (
+          const string& key,
+          const Future<Secret::Value>& secret,
+          resolvedSecrets) {
+        CHECK(secret.isReady());
+
+        result.insert({key, secret->data()});
+      }
+
+      return result;
+    });
+}
+
+
 VolumeManager::VolumeManager(
     const string& rootDir,
     const CSIPluginInfo& info,
     const hashset<Service>& services,
     const Runtime& runtime,
     ServiceManager* serviceManager,
-    Metrics* metrics)
+    Metrics* metrics,
+    SecretResolver* secretResolver)
   : process(new VolumeManagerProcess(
         rootDir,
         info,
         services,
         runtime,
         serviceManager,
-        metrics))
+        metrics,
+        secretResolver))
 {
   process::spawn(CHECK_NOTNULL(process.get()));
   recovered = process::dispatch(process.get(), &VolumeManagerProcess::recover);
diff --git a/src/csi/v1_volume_manager.hpp b/src/csi/v1_volume_manager.hpp
index 2f7897d..03a6eee 100644
--- a/src/csi/v1_volume_manager.hpp
+++ b/src/csi/v1_volume_manager.hpp
@@ -24,6 +24,8 @@
 
 #include <mesos/mesos.hpp>
 
+#include <mesos/secret/resolver.hpp>
+
 #include <process/future.hpp>
 #include <process/grpc.hpp>
 #include <process/http.hpp>
@@ -57,7 +59,8 @@ public:
       const hashset<Service>& services,
       const process::grpc::client::Runtime& runtime,
       ServiceManager* serviceManager,
-      Metrics* metrics);
+      Metrics* metrics,
+      SecretResolver* secretResolver);
 
   // Since this class contains `Owned` members which should not but can be
   // copied, explicitly make this class non-copyable.
diff --git a/src/csi/v1_volume_manager_process.hpp 
b/src/csi/v1_volume_manager_process.hpp
index b8a1ef7..63dc03e 100644
--- a/src/csi/v1_volume_manager_process.hpp
+++ b/src/csi/v1_volume_manager_process.hpp
@@ -24,6 +24,8 @@
 
 #include <mesos/mesos.hpp>
 
+#include <mesos/secret/resolver.hpp>
+
 #include <process/future.hpp>
 #include <process/grpc.hpp>
 #include <process/http.hpp>
@@ -62,7 +64,8 @@ public:
       const hashset<Service> _services,
       const process::grpc::client::Runtime& _runtime,
       ServiceManager* _serviceManager,
-      Metrics* _metrics);
+      Metrics* _metrics,
+      SecretResolver* _secretResolver);
 
   process::Future<Nothing> recover();
 
@@ -173,6 +176,13 @@ private:
   // from memory and from disk.
   void removeVolume(const std::string& volumeId);
 
+  // If the volume manager was initialized with a non-null secret resolver, 
this
+  // helper function will resolve any secrets in the provided map.
+  // Returns a map containing the resolved secrets.
+  process::Future<google::protobuf::Map<std::string, std::string>>
+    resolveSecrets(
+        const google::protobuf::Map<std::string, Secret>& secrets);
+
   const std::string rootDir;
   const CSIPluginInfo info;
   const hashset<Service> services;
@@ -180,6 +190,7 @@ private:
   process::grpc::client::Runtime runtime;
   ServiceManager* serviceManager;
   Metrics* metrics;
+  SecretResolver* secretResolver;
 
   Option<std::string> bootId;
   Option<PluginCapabilities> pluginCapabilities;
diff --git a/src/csi/volume_manager.cpp b/src/csi/volume_manager.cpp
index c47adfe..1ac2209 100644
--- a/src/csi/volume_manager.cpp
+++ b/src/csi/volume_manager.cpp
@@ -21,6 +21,8 @@
 #include <mesos/csi/v0.hpp>
 #include <mesos/csi/v1.hpp>
 
+#include <mesos/secret/resolver.hpp>
+
 #include "csi/service_manager.hpp"
 #include "csi/v0_volume_manager.hpp"
 #include "csi/v1_volume_manager.hpp"
@@ -43,7 +45,8 @@ Try<Owned<VolumeManager>> VolumeManager::create(
     const string& apiVersion,
     const Runtime& runtime,
     ServiceManager* serviceManager,
-    Metrics* metrics)
+    Metrics* metrics,
+    SecretResolver* secretResolver)
 {
   if (services.empty()) {
     return Error(
@@ -53,10 +56,22 @@ Try<Owned<VolumeManager>> VolumeManager::create(
 
   if (apiVersion == v0::API_VERSION) {
     return Try<Owned<VolumeManager>>(new v0::VolumeManager(
-        rootDir, info, services, runtime, serviceManager, metrics));
+        rootDir,
+        info,
+        services,
+        runtime,
+        serviceManager,
+        metrics,
+        secretResolver));
   } else if (apiVersion == v1::API_VERSION) {
     return Try<Owned<VolumeManager>>(new v1::VolumeManager(
-        rootDir, info, services, runtime, serviceManager, metrics));
+        rootDir,
+        info,
+        services,
+        runtime,
+        serviceManager,
+        metrics,
+        secretResolver));
   }
 
   return Error("Unsupported CSI API version: " + apiVersion);
diff --git a/src/csi/volume_manager.hpp b/src/csi/volume_manager.hpp
index 57e7c51..8262183 100644
--- a/src/csi/volume_manager.hpp
+++ b/src/csi/volume_manager.hpp
@@ -24,6 +24,8 @@
 
 #include <mesos/mesos.hpp>
 
+#include <mesos/secret/resolver.hpp>
+
 #include <process/future.hpp>
 #include <process/grpc.hpp>
 #include <process/http.hpp>
@@ -62,7 +64,8 @@ public:
       const std::string& apiVersion,
       const process::grpc::client::Runtime& runtime,
       ServiceManager* serviceManager,
-      Metrics* metrics);
+      Metrics* metrics,
+      SecretResolver* secretResolver = nullptr);
 
   virtual ~VolumeManager() = default;
 

Reply via email to