This is an automated email from the ASF dual-hosted git repository. qianzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit d2c84d14b63f137546c00d69b0309c4543811732 Author: Qian Zhang <zhq527...@gmail.com> AuthorDate: Wed Jul 15 16:02:48 2020 +0800 Improved CSI service manager to support unmanaged CSI plugins. Review: https://reviews.apache.org/r/72683 --- src/csi/service_manager.cpp | 92 ++++++++++++++++++++++++++++++++++++++++++++- src/csi/service_manager.hpp | 12 +++++- 2 files changed, 101 insertions(+), 3 deletions(-) diff --git a/src/csi/service_manager.cpp b/src/csi/service_manager.cpp index a87df96..7a8d8e5 100644 --- a/src/csi/service_manager.cpp +++ b/src/csi/service_manager.cpp @@ -137,6 +137,12 @@ public: const Runtime& _runtime, Metrics* _metrics); + ServiceManagerProcess( + const CSIPluginInfo& _info, + const hashset<Service>& services, + const Runtime& _runtime, + Metrics* _metrics); + Future<Nothing> recover(); Future<string> getServiceEndpoint(const Service& service); @@ -180,8 +186,15 @@ private: http::Headers headers; Option<string> apiVersion; + + // This is for the managed CSI plugin which will be launched as + // standalone containers. hashmap<Service, ContainerID> serviceContainers; + // This is for the unmanaged CSI plugin which is already deployed + // out of Mesos. + hashmap<Service, string> serviceEndpoints; + hashmap<ContainerID, Owned<ContainerDaemon>> daemons; hashmap<ContainerID, Owned<Promise<string>>> endpoints; }; @@ -233,8 +246,45 @@ ServiceManagerProcess::ServiceManagerProcess( } +ServiceManagerProcess::ServiceManagerProcess( + const CSIPluginInfo& _info, + const hashset<Service>& services, + const Runtime& _runtime, + Metrics* _metrics) + : ProcessBase(process::ID::generate("csi-service-manager")), + agentUrl(), + rootDir(), + info(_info), + containerPrefix(), + authToken(), + contentType(ContentType::PROTOBUF), + runtime(_runtime), + metrics(_metrics) +{ + foreach (const Service& service, services) { + foreach (const CSIPluginEndpoint& serviceEndpoint, info.endpoints()) { + if (serviceEndpoint.csi_service() == service) { + serviceEndpoints[service] = serviceEndpoint.endpoint(); + break; + } + } + + CHECK(serviceEndpoints.contains(service)) + << service << " not found for CSI plugin type '" << info.type() + << "' and name '" << info.name() << "'"; + } +} + + Future<Nothing> ServiceManagerProcess::recover() { + // For the unmanaged CSI plugin, we do not need to recover anything. + if (!serviceEndpoints.empty()) { + return Nothing(); + } + + CHECK(!serviceContainers.empty()); + return getContainers() .then(process::defer(self(), [=]( const hashmap<ContainerID, Option<ContainerStatus>>& containers) @@ -346,6 +396,21 @@ Future<Nothing> ServiceManagerProcess::recover() Future<string> ServiceManagerProcess::getServiceEndpoint(const Service& service) { + // For the unmanaged CSI plugin, get its endpoint from + // `serviceEndpoints` directly. + if (!serviceEndpoints.empty()) { + if (serviceEndpoints.contains(service)) { + return serviceEndpoints.at(service); + } else { + return Failure( + stringify(service) + " not found for CSI plugin type '" + + info.type() + "' and name '" + info.name() + "'"); + } + } + + // For the managed CSI plugin, get its endpoint via its corresponding + // standalone container ID. + CHECK(!serviceContainers.empty()); if (!serviceContainers.contains(service)) { return Failure( stringify(service) + " not found for CSI plugin type '" + info.type() + @@ -362,8 +427,15 @@ Future<string> ServiceManagerProcess::getApiVersion() return apiVersion.get(); } - // Ensure that the plugin has been probed (which does the API version - // detection) through `getEndpoint` before returning the API version. + // Ensure that the unmanaged CSI plugin has been probed (which does the API + // version detection) before returning the API version. + if (!serviceEndpoints.empty()) { + return probeEndpoint(serviceEndpoints.begin()->second) + .then(process::defer(self(), [=] { return CHECK_NOTNONE(apiVersion); })); + } + + // For the managed CSI plugin, `probeEndpoint` will be internally called by + // `getEndpoint` to do the API version detection. CHECK(!serviceContainers.empty()); return getEndpoint(serviceContainers.begin()->second) .then(process::defer(self(), [=] { return CHECK_NOTNONE(apiVersion); })); @@ -790,6 +862,22 @@ ServiceManager::ServiceManager( } +ServiceManager::ServiceManager( + const CSIPluginInfo& info, + const hashset<Service>& services, + const process::grpc::client::Runtime& runtime, + Metrics* metrics) + : process(new ServiceManagerProcess( + info, + services, + runtime, + metrics)) +{ + process::spawn(CHECK_NOTNULL(process.get())); + recovered = process::dispatch(process.get(), &ServiceManagerProcess::recover); +} + + ServiceManager::~ServiceManager() { recovered.discard(); diff --git a/src/csi/service_manager.hpp b/src/csi/service_manager.hpp index 60a0805..76a80fb 100644 --- a/src/csi/service_manager.hpp +++ b/src/csi/service_manager.hpp @@ -47,10 +47,12 @@ constexpr Service NODE_SERVICE = CSIPluginContainerInfo::NODE_SERVICE; class ServiceManagerProcess; -// Manages the service containers of a CSI plugin instance. +// Manages the services of a CSI plugin instance. class ServiceManager { public: + // This is for the managed CSI plugins which will be + // launched as standalone containers. ServiceManager( const process::http::URL& agentUrl, const std::string& rootDir, @@ -61,6 +63,14 @@ public: const process::grpc::client::Runtime& runtime, Metrics* metrics); + // This is for the unmanaged CSI plugins which we assume + // are already launched out of Mesos. + ServiceManager( + const CSIPluginInfo& info, + const hashset<Service>& services, + const process::grpc::client::Runtime& runtime, + Metrics* metrics); + // Since this class contains `Owned` members which should not but can be // copied, explicitly make this class non-copyable. //