This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit fe0cd02a0697a4c4fcf5087fcafd6729beec0b41
Author: Greg Mann <g...@mesosphere.io>
AuthorDate: Mon Aug 10 20:11:50 2020 -0700

    Added implementation of the CSI server.
    
    Review: https://reviews.apache.org/r/72716/
---
 src/CMakeLists.txt       |   1 +
 src/Makefile.am          |   2 +
 src/slave/csi_server.cpp | 455 +++++++++++++++++++++++++++++++++++++++++++++++
 src/slave/csi_server.hpp |   8 +-
 4 files changed, 465 insertions(+), 1 deletion(-)

diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 4e15e3d..c60d98a 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -150,6 +150,7 @@ set(AGENT_SRC
   slave/constants.cpp
   slave/container_daemon.cpp
   slave/container_logger.cpp
+  slave/csi_server.cpp
   slave/flags.cpp
   slave/gc.cpp
   slave/http.cpp
diff --git a/src/Makefile.am b/src/Makefile.am
index 447db32..49dab4b 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1208,6 +1208,8 @@ libmesos_no_3rdparty_la_SOURCES +=                        
                \
   slave/container_daemon.hpp                                           \
   slave/container_daemon_process.hpp                                   \
   slave/container_logger.cpp                                           \
+  slave/csi_server.cpp                                                 \
+  slave/csi_server.hpp                                                 \
   slave/container_loggers/sandbox.cpp                                  \
   slave/container_loggers/sandbox.hpp                                  \
   slave/containerizer/composing.cpp                                    \
diff --git a/src/slave/csi_server.cpp b/src/slave/csi_server.cpp
new file mode 100644
index 0000000..a9a3995
--- /dev/null
+++ b/src/slave/csi_server.cpp
@@ -0,0 +1,455 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <list>
+#include <string>
+#include <vector>
+
+#include <mesos/mesos.hpp>
+
+#include <mesos/secret/resolver.hpp>
+
+#include <process/collect.hpp>
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/grpc.hpp>
+#include <process/http.hpp>
+#include <process/id.hpp>
+#include <process/owned.hpp>
+#include <process/sequence.hpp>
+
+#include <stout/hashmap.hpp>
+#include <stout/hashset.hpp>
+#include <stout/json.hpp>
+#include <stout/os.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/try.hpp>
+
+#include "common/validation.hpp"
+
+#include "csi/metrics.hpp"
+#include "csi/paths.hpp"
+#include "csi/service_manager.hpp"
+#include "csi/volume_manager.hpp"
+
+#include "slave/csi_server.hpp"
+#include "slave/flags.hpp"
+#include "slave/paths.hpp"
+
+using mesos::csi::ServiceManager;
+using mesos::csi::VolumeManager;
+
+using mesos::csi::state::VolumeState;
+
+using process::Failure;
+using process::Future;
+using process::Owned;
+
+using process::grpc::client::Runtime;
+
+using process::http::authentication::Principal;
+
+using std::list;
+using std::string;
+using std::vector;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+static VolumeState createVolumeState(
+    const Volume::Source::CSIVolume::StaticProvisioning& volume);
+
+
+static hashset<CSIPluginContainerInfo::Service> extractServices(
+    const CSIPluginInfo& plugin);
+
+
+class CSIServerProcess : public process::Process<CSIServerProcess>
+{
+public:
+  CSIServerProcess(
+      const process::http::URL& _agentUrl,
+      const string& _rootDir,
+      SecretGenerator* _secretGenerator,
+      SecretResolver* _secretResolver,
+      hashmap<string, CSIPluginInfo> _pluginConfigs)
+    : process::ProcessBase(process::ID::generate("csi-server")),
+      agentUrl(_agentUrl),
+      rootDir(_rootDir),
+      secretGenerator(_secretGenerator),
+      secretResolver(_secretResolver),
+      pluginConfigs(_pluginConfigs) {}
+
+  Future<Nothing> start();
+
+  Future<string> publishVolume(const Volume::Source::CSIVolume& volume);
+
+  Future<Nothing> unpublishVolume(
+      const string& pluginName,
+      const string& volumeId);
+
+private:
+  struct CSIPlugin
+  {
+    CSIPlugin(const string& metricsPrefix) : metrics(metricsPrefix) {}
+
+    CSIPluginInfo info;
+    Owned<ServiceManager> serviceManager;
+    Owned<VolumeManager> volumeManager;
+    Runtime runtime;
+    csi::Metrics metrics;
+  };
+
+  // Contains the plugins loaded by the server. The key of this map is the
+  // plugin name.
+  hashmap<string, CSIPlugin> plugins;
+
+  const process::http::URL agentUrl;
+  const string rootDir;
+  SecretGenerator* secretGenerator;
+  SecretResolver* secretResolver;
+  Option<string> authToken;
+  hashmap<string, CSIPluginInfo> pluginConfigs;
+};
+
+
+Future<Nothing> CSIServerProcess::start()
+{
+  Future<Nothing> result = Nothing();
+
+  // The contents of this principal are arbitrary. We choose to avoid a
+  // principal with a 'value' string so that we do not unintentionally collide
+  // with another real principal with restricted permissions.
+  Principal principal(Option<string>::none(), {{"key", "csi-server"}});
+
+  if (secretGenerator) {
+    result = secretGenerator->generate(principal)
+      .then([=](const Secret& secret) -> Future<Nothing> {
+        Option<Error> error = common::validation::validateSecret(secret);
+        if (error.isSome()) {
+          return Failure(
+              "CSI server failed to validate generated secret: " +
+              error->message);
+        }
+
+        if (secret.type() != Secret::VALUE) {
+          return Failure(
+              "CSI server expecting generated secret to be of VALUE type "
+              "instead of " + stringify(secret.type()) + " type; " +
+              "only VALUE type secrets are supported at this time");
+        }
+
+        CHECK(secret.has_value());
+
+        authToken = secret.value().data();
+
+        return Nothing();
+    });
+  }
+
+  // Initialize CSI plugins.
+  vector<Future<Nothing>> initializations;
+
+  foreachpair (const string& name, const CSIPluginInfo& info, pluginConfigs) {
+    // Default-construct the plugin struct so that we have a valid runtime
+    // to pass into the service manager.
+    plugins.put(name, CSIPlugin("csi_plugins/" + name + "/"));
+
+    if (info.containers_size() > 0) {
+      plugins.at(name).serviceManager.reset(new ServiceManager(
+          agentUrl,
+          rootDir,
+          info,
+          extractServices(info),
+          "org-apache-mesos-internal-",
+          authToken,
+          plugins.at(name).runtime,
+          &plugins.at(name).metrics));
+    } else {
+      CHECK(info.endpoints_size() > 0);
+
+      plugins.at(name).serviceManager.reset(new ServiceManager(
+          info,
+          extractServices(info),
+          plugins.at(name).runtime,
+          &plugins.at(name).metrics));
+    }
+
+    initializations.push_back(plugins.at(name).serviceManager->recover()
+      .then(defer(self(), [=]() {
+        CHECK(plugins.contains(name));
+
+        return plugins.at(name).serviceManager->getApiVersion();
+      }))
+      .then(defer(self(), [=](const string& apiVersion) -> Future<Nothing> {
+        CHECK(plugins.contains(name));
+
+        Try<Owned<VolumeManager>> volumeManager = VolumeManager::create(
+            rootDir,
+            info,
+            extractServices(info),
+            apiVersion,
+            plugins.at(name).runtime,
+            plugins.at(name).serviceManager.get(),
+            &plugins.at(name).metrics,
+            secretResolver);
+
+        if (volumeManager.isError()) {
+          return Failure(
+              "CSI server failed to create volume manager for plugin"
+              " '" + info.name() + "': " + volumeManager.error());
+        }
+
+        plugins.at(name).volumeManager = std::move(volumeManager.get());
+
+        return plugins.at(name).volumeManager->recover();
+      })));
+  }
+
+  return result
+    .then([=]() {
+      return process::collect(initializations);
+    })
+    .then([=]() {
+      return Nothing();
+    });
+}
+
+
+Future<string> CSIServerProcess::publishVolume(
+    const Volume::Source::CSIVolume& volume)
+{
+  CHECK(volume.has_static_provisioning());
+
+  if (!plugins.contains(volume.plugin_name())) {
+    return Failure("Invalid CSI plugin '" + volume.plugin_name() + "'");
+  }
+
+  return plugins.at(volume.plugin_name()).volumeManager->publishVolume(
+      volume.static_provisioning().volume_id(),
+      createVolumeState(volume.static_provisioning()))
+    .then([=]() {
+      CHECK(plugins.contains(volume.plugin_name()));
+
+      const string targetRootDir =
+        plugins.at(volume.plugin_name()).info.has_target_path_root()
+          ? plugins.at(volume.plugin_name()).info.target_path_root()
+          : rootDir;
+
+      return csi::paths::getMountTargetPath(
+          csi::paths::getMountRootDir(
+              targetRootDir,
+              plugins.at(volume.plugin_name()).info.type(),
+              plugins.at(volume.plugin_name()).info.name()),
+          volume.static_provisioning().volume_id());
+    });
+}
+
+
+Future<Nothing> CSIServerProcess::unpublishVolume(
+    const string& pluginName,
+    const string& volumeId)
+{
+  if (!plugins.contains(pluginName)) {
+    return Failure("Invalid CSI plugin '" + pluginName + "'");
+  }
+
+  return plugins.at(pluginName).volumeManager->unpublishVolume(volumeId);
+}
+
+
+VolumeState createVolumeState(
+    const Volume::Source::CSIVolume::StaticProvisioning& volume)
+{
+  VolumeState result;
+  result.set_state(VolumeState::NODE_READY);
+  *result.mutable_volume_capability() = volume.volume_capability();
+  *result.mutable_volume_context() = volume.volume_context();
+  result.set_readonly(volume.readonly());
+  result.set_pre_provisioned(true);
+
+  return result;
+}
+
+
+hashset<CSIPluginContainerInfo::Service> extractServices(
+    const CSIPluginInfo& plugin)
+{
+  hashset<CSIPluginContainerInfo::Service> result;
+
+  if (plugin.containers_size() > 0) {
+    foreach (const CSIPluginContainerInfo& container, plugin.containers()) {
+      for (int i = 0; i < container.services_size(); ++i) {
+        result.insert(container.services(i));
+      }
+    }
+  } else {
+    CHECK(plugin.endpoints_size() > 0);
+
+    foreach (const CSIPluginEndpoint& endpoint, plugin.endpoints()) {
+      result.insert(endpoint.csi_service());
+    }
+  }
+
+  return result;
+}
+
+
+CSIServer::CSIServer(
+    const process::http::URL& agentUrl,
+    const string& rootDir,
+    SecretGenerator* secretGenerator,
+    SecretResolver* secretResolver,
+    const hashmap<string, CSIPluginInfo>& pluginConfigs)
+  : process(new CSIServerProcess(
+        agentUrl,
+        rootDir,
+        secretGenerator,
+        secretResolver,
+        pluginConfigs))
+{
+  process::spawn(CHECK_NOTNULL(process.get()));
+}
+
+
+CSIServer::~CSIServer()
+{
+  process::terminate(process.get());
+  process::wait(process.get());
+}
+
+
+Try<Owned<CSIServer>> CSIServer::create(
+    const Flags& flags,
+    const process::http::URL& agentUrl,
+    SecretGenerator* secretGenerator,
+    SecretResolver* secretResolver)
+{
+  if (!strings::contains(flags.isolation, "volume/csi")) {
+    return Error("Missing required isolator 'volume/csi'");
+  }
+
+  if (flags.csi_plugin_config_dir.isNone() ||
+      flags.csi_plugin_config_dir->empty()) {
+    return Error("Missing required '--csi_plugin_config_dir' flag");
+  }
+
+  if (!os::exists(flags.csi_plugin_config_dir.get())) {
+    return Error(
+        "The CSI plugin configuration directory '" +
+        flags.csi_plugin_config_dir.get() + "' does not exist");
+  }
+
+  Try<list<string>> entries = os::ls(flags.csi_plugin_config_dir.get());
+  if (entries.isError()) {
+    return Error(
+        "Unable to list the CSI plugin configuration directory '" +
+        flags.csi_plugin_config_dir.get()+ "': " + entries.error());
+  }
+
+  hashmap<std::string, CSIPluginInfo> pluginConfigs;
+
+  foreach (const string& entry, entries.get()) {
+    const string path = path::join(flags.csi_plugin_config_dir.get(), entry);
+
+    // Ignore directory entries.
+    if (os::stat::isdir(path)) {
+      continue;
+    }
+
+    Try<string> read = os::read(path);
+    if (read.isError()) {
+      // In case of an error we log and skip to the next entry.
+      LOG(ERROR) << "Failed to read CSI plugin configuration file '"
+                 << path << "': " << read.error();
+
+      continue;
+    }
+
+    Try<JSON::Object> json = JSON::parse<JSON::Object>(read.get());
+    if (json.isError()) {
+      return Error("JSON parse failed: " + json.error());
+    }
+
+    Try<CSIPluginInfo> parse = ::protobuf::parse<CSIPluginInfo>(json.get());
+    if (parse.isError()) {
+      return Error("Protobuf parse failed: " + parse.error());
+    }
+
+    const CSIPluginInfo& csiPluginConfig = parse.get();
+    const string& type = csiPluginConfig.type();
+
+    if (pluginConfigs.contains(type)) {
+      LOG(ERROR) << "Multiple configurations for a CSI plugin are not allowed. 
"
+                 << "Skipping configuration file '" << path << "' since CSI "
+                 << "plugin '" << type << "' already exists";
+      continue;
+    }
+
+    pluginConfigs[type] = csiPluginConfig;
+  }
+
+  if (pluginConfigs.empty()) {
+    return Error(
+        "No valid CSI plugin configurations found in '" +
+        flags.csi_plugin_config_dir.get() + "'");
+  }
+
+  return new CSIServer(
+      agentUrl,
+      slave::paths::getCsiRootDir(flags.work_dir),
+      secretGenerator,
+      secretResolver,
+      pluginConfigs);
+}
+
+
+Future<Nothing> CSIServer::start()
+{
+  started.associate(process::dispatch(process.get(), 
&CSIServerProcess::start));
+
+  return started.future();
+}
+
+
+Future<string> CSIServer::publishVolume(
+    const Volume::Source::CSIVolume& volume)
+{
+  return started.future()
+    .then(process::defer(
+        process.get(),
+        &CSIServerProcess::publishVolume,
+        volume));
+}
+
+
+Future<Nothing> CSIServer::unpublishVolume(
+    const string& pluginName,
+    const string& volumeId)
+{
+  return started.future()
+    .then(process::defer(
+        process.get(),
+        &CSIServerProcess::unpublishVolume,
+        pluginName,
+        volumeId));
+}
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
diff --git a/src/slave/csi_server.hpp b/src/slave/csi_server.hpp
index 17882e1..f5ec766 100644
--- a/src/slave/csi_server.hpp
+++ b/src/slave/csi_server.hpp
@@ -23,6 +23,8 @@
 
 #include <mesos/authentication/secret_generator.hpp>
 
+#include <mesos/secret/resolver.hpp>
+
 #include <process/future.hpp>
 #include <process/http.hpp>
 #include <process/owned.hpp>
@@ -52,7 +54,8 @@ public:
   static Try<process::Owned<CSIServer>> create(
       const Flags& flags,
       const process::http::URL& agentUrl,
-      SecretGenerator* secretGenerator);
+      SecretGenerator* secretGenerator,
+      SecretResolver* secretResolver);
 
   // Starts the CSI server. Any `publishVolume()` or `unpublishVolume()` calls
   // which were made previously will be executed after this method is called.
@@ -78,9 +81,12 @@ private:
       const process::http::URL& agentUrl,
       const std::string& csiRootDir,
       SecretGenerator* secretGenerator,
+      SecretResolver* secretResolver,
       const hashmap<std::string, CSIPluginInfo>& csiPluginConfigs);
 
   process::Owned<CSIServerProcess> process;
+
+  process::Promise<Nothing> started;
 };
 
 } // namespace slave {

Reply via email to