This is an automated email from the ASF dual-hosted git repository. grag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 5ed30db48785007e35805886a024ebb8a61a7037 Author: Greg Mann <g...@mesosphere.io> AuthorDate: Thu Aug 20 19:27:02 2020 -0700 Added the CSI server to the Mesos agent. This patch adds a CSI server to the Mesos agent in both the agent binary and in tests. Review: https://reviews.apache.org/r/72761/ --- src/local/local.cpp | 1 + src/slave/main.cpp | 101 ++++++++++++++++++++++++++----------- src/slave/slave.cpp | 18 +++++++ src/slave/slave.hpp | 3 ++ src/tests/cluster.cpp | 128 ++++++++++++++++++++++++++++++++++------------- src/tests/cluster.hpp | 3 ++ src/tests/mesos.cpp | 1 + src/tests/mesos.hpp | 9 ++++ src/tests/mock_slave.cpp | 7 +++ src/tests/mock_slave.hpp | 3 ++ 10 files changed, 208 insertions(+), 66 deletions(-) diff --git a/src/local/local.cpp b/src/local/local.cpp index 8950570..9535399 100644 --- a/src/local/local.cpp +++ b/src/local/local.cpp @@ -535,6 +535,7 @@ PID<Master> launch(const Flags& flags, Allocator* _allocator) secretGenerators->back(), nullptr, nullptr, + nullptr, #ifndef __WINDOWS__ None(), #endif // __WINDOWS__ diff --git a/src/slave/main.cpp b/src/slave/main.cpp index 0aa2cc9..84b813c 100644 --- a/src/slave/main.cpp +++ b/src/slave/main.cpp @@ -37,6 +37,8 @@ #include <process/owned.hpp> #include <process/process.hpp> +#include <process/ssl/flags.hpp> + #include <stout/check.hpp> #include <stout/flags.hpp> #include <stout/hashset.hpp> @@ -84,6 +86,7 @@ #include "module/manager.hpp" #include "slave/constants.hpp" +#include "slave/csi_server.hpp" #include "slave/gc.hpp" #include "slave/slave.hpp" #include "slave/task_status_update_manager.hpp" @@ -111,6 +114,8 @@ using mesos::Authorizer; using mesos::SecretResolver; using mesos::SlaveInfo; +using net::IP; + using process::Owned; using process::firewall::DisabledEndpointsFirewallRule; @@ -528,6 +533,69 @@ int main(int argc, char** argv) << futureTracker.error(); } + SecretGenerator* secretGenerator = nullptr; + +#ifdef USE_SSL_SOCKET + if (flags.jwt_secret_key.isSome()) { + Try<string> jwtSecretKey = os::read(flags.jwt_secret_key.get()); + if (jwtSecretKey.isError()) { + EXIT(EXIT_FAILURE) << "Failed to read the file specified by " + << "--jwt_secret_key"; + } + + // TODO(greggomann): Factor the following code out into a common helper, + // since we also do this when loading credentials. + Try<os::Permissions> permissions = + os::permissions(flags.jwt_secret_key.get()); + if (permissions.isError()) { + LOG(WARNING) << "Failed to stat jwt secret key file '" + << flags.jwt_secret_key.get() + << "': " << permissions.error(); + } else if (permissions->others.rwx) { + LOG(WARNING) << "Permissions on executor secret key file '" + << flags.jwt_secret_key.get() + << "' are too open; it is recommended that your" + << " key file is NOT accessible by others"; + } + + secretGenerator = new JWTSecretGenerator(jwtSecretKey.get()); + } +#endif // USE_SSL_SOCKET + + // The agent will hold ownership of the CSI server, but we also pass a pointer + // to it into the containerizer for use by the 'volume/csi' isolator. + Owned<CSIServer> csiServer; + + if (flags.csi_plugin_config_dir.isSome()) { + // Initialize the CSI server, which manages any configured CSI plugins. + string scheme = "http"; + +#ifdef USE_SSL_SOCKET + if (process::network::openssl::flags().enabled) { + scheme = "https"; + } +#endif + + const process::http::URL agentUrl( + scheme, + process::address().ip, + process::address().port, + id + "/api/v1"); + + Try<Owned<CSIServer>> csiServer_ = CSIServer::create( + flags, + agentUrl, + secretGenerator, + secretResolver.get()); + + if (csiServer_.isError()) { + EXIT(EXIT_FAILURE) + << "Failed to initialize the CSI server: " << csiServer_.error(); + } + + csiServer = std::move(csiServer_.get()); + } + Try<Containerizer*> containerizer = Containerizer::create( flags, false, @@ -535,7 +603,8 @@ int main(int argc, char** argv) gc, secretResolver.get(), volumeGidManager, - futureTracker.get()); + futureTracker.get(), + csiServer.get()); if (containerizer.isError()) { EXIT(EXIT_FAILURE) @@ -608,35 +677,6 @@ int main(int argc, char** argv) << qosController.error(); } - SecretGenerator* secretGenerator = nullptr; - -#ifdef USE_SSL_SOCKET - if (flags.jwt_secret_key.isSome()) { - Try<string> jwtSecretKey = os::read(flags.jwt_secret_key.get()); - if (jwtSecretKey.isError()) { - EXIT(EXIT_FAILURE) << "Failed to read the file specified by " - << "--jwt_secret_key"; - } - - // TODO(greggomann): Factor the following code out into a common helper, - // since we also do this when loading credentials. - Try<os::Permissions> permissions = - os::permissions(flags.jwt_secret_key.get()); - if (permissions.isError()) { - LOG(WARNING) << "Failed to stat jwt secret key file '" - << flags.jwt_secret_key.get() - << "': " << permissions.error(); - } else if (permissions->others.rwx) { - LOG(WARNING) << "Permissions on executor secret key file '" - << flags.jwt_secret_key.get() - << "' are too open; it is recommended that your" - << " key file is NOT accessible by others"; - } - - secretGenerator = new JWTSecretGenerator(jwtSecretKey.get()); - } -#endif // USE_SSL_SOCKET - #ifndef __WINDOWS__ // Create executor domain socket if the user so desires. Option<Socket> executorSocket = None(); @@ -723,6 +763,7 @@ int main(int argc, char** argv) secretGenerator, volumeGidManager, futureTracker.get(), + std::move(csiServer), #ifndef __WINDOWS__ executorSocket, #endif // __WINDOWS__ diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index c828d99..a69937b 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -205,6 +205,7 @@ Slave::Slave(const string& id, SecretGenerator* _secretGenerator, VolumeGidManager* _volumeGidManager, PendingFutureTracker* _futureTracker, + Owned<CSIServer>&& _csiServer, #ifndef __WINDOWS__ const Option<process::network::unix::Socket>& _executorSocket, #endif // __WINDOWS__ @@ -239,6 +240,7 @@ Slave::Slave(const string& id, secretGenerator(_secretGenerator), volumeGidManager(_volumeGidManager), futureTracker(_futureTracker), + csiServer(std::move(_csiServer)), #ifndef __WINDOWS__ executorSocket(_executorSocket), #endif // __WINDOWS__ @@ -1741,6 +1743,14 @@ void Slave::registered( // running, so the resource providers can use the agent API. localResourceProviderDaemon->start(info.id()); + if (csiServer.get()) { + csiServer->start(info.id()) + .onFailed([=](const string& failure) { + EXIT(EXIT_FAILURE) + << "CSI server initialization failed: " << failure; + }); + } + // Setup a timer so that the agent attempts to reregister if it // doesn't receive a ping from the master for an extended period // of time. This needs to be done once registered, in case we @@ -1826,6 +1836,14 @@ void Slave::reregistered( // running, so the resource providers can use the agent API. localResourceProviderDaemon->start(info.id()); + if (csiServer.get()) { + csiServer->start(info.id()) + .onFailed([=](const string& failure) { + EXIT(EXIT_FAILURE) + << "CSI server initialization failed: " << failure; + }); + } + // Setup a timer so that the agent attempts to reregister if it // doesn't receive a ping from the master for an extended period // of time. This needs to be done once reregistered, in case we diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index 2cf45c6..7946668 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -130,6 +130,7 @@ public: mesos::SecretGenerator* secretGenerator, VolumeGidManager* volumeGidManager, PendingFutureTracker* futureTracker, + process::Owned<CSIServer>&& csiServer, #ifndef __WINDOWS__ const Option<process::network::unix::Socket>& executorSocket, #endif // __WINDOWS__ @@ -888,6 +889,8 @@ private: PendingFutureTracker* futureTracker; + process::Owned<CSIServer> csiServer; + #ifndef __WINDOWS__ Option<process::network::unix::Socket> executorSocket; #endif // __WINDOWS__ diff --git a/src/tests/cluster.cpp b/src/tests/cluster.cpp index ab4cea4..3c86855 100644 --- a/src/tests/cluster.cpp +++ b/src/tests/cluster.cpp @@ -16,6 +16,7 @@ #include <memory> #include <string> +#include <utility> #include <vector> #include <mesos/mesos.hpp> @@ -48,12 +49,15 @@ #include <process/pid.hpp> #include <process/process.hpp> +#include <process/ssl/flags.hpp> + #include <stout/duration.hpp> #include <stout/error.hpp> #include <stout/gtest.hpp> #include <stout/none.hpp> #include <stout/nothing.hpp> #include <stout/option.hpp> +#include <stout/os.hpp> #include <stout/path.hpp> #include <stout/strings.hpp> #include <stout/try.hpp> @@ -94,6 +98,7 @@ #include "master/detector/standalone.hpp" #include "master/detector/zookeeper.hpp" +#include "slave/csi_server.hpp" #include "slave/flags.hpp" #include "slave/gc.hpp" #include "slave/slave.hpp" @@ -124,6 +129,10 @@ using mesos::master::detector::ZooKeeperMasterDetector; using mesos::slave::ContainerTermination; +using net::IP; + +using process::Owned; + #ifndef __WINDOWS__ using process::network::unix::Socket; #endif // __WINDOWS__ @@ -426,6 +435,7 @@ Try<process::Owned<Slave>> Slave::create( const Option<mesos::SecretGenerator*>& secretGenerator, const Option<Authorizer*>& providedAuthorizer, const Option<PendingFutureTracker*>& futureTracker, + const Option<Owned<slave::CSIServer>>& csiServer, bool mock) { process::Owned<Slave> slave(new Slave()); @@ -468,6 +478,82 @@ Try<process::Owned<Slave>> Slave::create( slave->futureTracker.reset(_futureTracker.get()); } + // If the secret generator is not provided, create a default one. + if (secretGenerator.isNone()) { + SecretGenerator* _secretGenerator = nullptr; + +#ifdef USE_SSL_SOCKET + if (flags.jwt_secret_key.isSome()) { + Try<string> jwtSecretKey = os::read(flags.jwt_secret_key.get()); + if (jwtSecretKey.isError()) { + return Error("Failed to read the file specified by --jwt_secret_key"); + } + + // TODO(greggomann): Factor the following code out into a common helper, + // since we also do this when loading credentials. + Try<os::Permissions> permissions = + os::permissions(flags.jwt_secret_key.get()); + if (permissions.isError()) { + LOG(WARNING) << "Failed to stat jwt secret key file '" + << flags.jwt_secret_key.get() + << "': " << permissions.error(); + } else if (permissions->others.rwx) { + LOG(WARNING) << "Permissions on executor secret key file '" + << flags.jwt_secret_key.get() + << "' are too open; it is recommended that your" + << " key file is NOT accessible by others"; + } + + _secretGenerator = new JWTSecretGenerator(jwtSecretKey.get()); + } +#endif // USE_SSL_SOCKET + + slave->secretGenerator.reset(_secretGenerator); + } + + // Create a SecretResolver for use with the CSI server below. + Try<SecretResolver*> secretResolver = + mesos::SecretResolver::create(flags.secret_resolver); + + if (secretResolver.isError()) { + return Error( + "Failed to initialize secret resolver: " + + secretResolver.error()); + } + + const string processId = + id.isSome() ? id.get() : process::ID::generate("slave"); + + if (csiServer.isNone() && flags.csi_plugin_config_dir.isSome()) { + // Initialize the CSI server, which manages any configured CSI plugins. + string scheme = "http"; + +#ifdef USE_SSL_SOCKET + if (process::network::openssl::flags().enabled) { + scheme = "https"; + } +#endif + + const process::http::URL agentUrl( + scheme, + process::address().ip, + flags.port, + processId + "/api/v1"); + + Try<Owned<slave::CSIServer>> _csiServer = slave::CSIServer::create( + flags, + agentUrl, + secretGenerator.getOrElse(slave->secretGenerator.get()), + secretResolver.get()); + + if (_csiServer.isError()) { + return Error( + "Failed to initialize the CSI server: " + _csiServer.error()); + } + + slave->csiServer = std::move(_csiServer.get()); + } + // If the containerizer is not provided, create a default one. if (containerizer.isSome()) { slave->containerizer = containerizer.get(); @@ -483,7 +569,8 @@ Try<process::Owned<Slave>> Slave::create( gc.getOrElse(slave->gc.get()), nullptr, volumeGidManager, - futureTracker.getOrElse(slave->futureTracker.get())); + futureTracker.getOrElse(slave->futureTracker.get()), + (csiServer.getOrElse(slave->csiServer)).get()); if (_containerizer.isError()) { return Error("Failed to create containerizer: " + _containerizer.error()); @@ -583,39 +670,6 @@ Try<process::Owned<Slave>> Slave::create( slave->qosController.reset(_qosController.get()); } - // If the secret generator is not provided, create a default one. - if (secretGenerator.isNone()) { - SecretGenerator* _secretGenerator = nullptr; - -#ifdef USE_SSL_SOCKET - if (flags.jwt_secret_key.isSome()) { - Try<string> jwtSecretKey = os::read(flags.jwt_secret_key.get()); - if (jwtSecretKey.isError()) { - return Error("Failed to read the file specified by --jwt_secret_key"); - } - - // TODO(greggomann): Factor the following code out into a common helper, - // since we also do this when loading credentials. - Try<os::Permissions> permissions = - os::permissions(flags.jwt_secret_key.get()); - if (permissions.isError()) { - LOG(WARNING) << "Failed to stat jwt secret key file '" - << flags.jwt_secret_key.get() - << "': " << permissions.error(); - } else if (permissions->others.rwx) { - LOG(WARNING) << "Permissions on executor secret key file '" - << flags.jwt_secret_key.get() - << "' are too open; it is recommended that your" - << " key file is NOT accessible by others"; - } - - _secretGenerator = new JWTSecretGenerator(jwtSecretKey.get()); - } -#endif // USE_SSL_SOCKET - - slave->secretGenerator.reset(_secretGenerator); - } - #ifndef __WINDOWS__ Option<Socket> executorSocket = None(); if (flags.http_executor_domain_sockets) { @@ -645,7 +699,7 @@ Try<process::Owned<Slave>> Slave::create( // Inject all the dependencies. if (mock) { slave->slave.reset(new MockSlave( - id.isSome() ? id.get() : process::ID::generate("slave"), + processId, flags, detector, slave->containerizer, @@ -657,10 +711,11 @@ Try<process::Owned<Slave>> Slave::create( secretGenerator.getOrElse(slave->secretGenerator.get()), volumeGidManager, futureTracker.getOrElse(slave->futureTracker.get()), + csiServer.getOrElse(slave->csiServer), authorizer)); } else { slave->slave.reset(new slave::Slave( - id.isSome() ? id.get() : process::ID::generate("slave"), + processId, flags, detector, slave->containerizer, @@ -672,6 +727,7 @@ Try<process::Owned<Slave>> Slave::create( secretGenerator.getOrElse(slave->secretGenerator.get()), volumeGidManager, futureTracker.getOrElse(slave->futureTracker.get()), + csiServer.getOrElse(slave->csiServer), #ifndef __WINDOWS__ executorSocket, #endif // __WINDOWS__ diff --git a/src/tests/cluster.hpp b/src/tests/cluster.hpp index 415a60f..b36b94f 100644 --- a/src/tests/cluster.hpp +++ b/src/tests/cluster.hpp @@ -57,6 +57,7 @@ #include "master/master.hpp" #include "slave/constants.hpp" +#include "slave/csi_server.hpp" #include "slave/flags.hpp" #include "slave/gc.hpp" #include "slave/slave.hpp" @@ -171,6 +172,7 @@ public: const Option<mesos::SecretGenerator*>& secretGenerator = None(), const Option<Authorizer*>& authorizer = None(), const Option<PendingFutureTracker*>& futureTracker = None(), + const Option<process::Owned<slave::CSIServer>>& csiServer = None(), bool mock = false); ~Slave(); @@ -241,6 +243,7 @@ private: process::Owned<mesos::slave::ResourceEstimator> resourceEstimator; process::Owned<mesos::SecretGenerator> secretGenerator; process::Owned<slave::TaskStatusUpdateManager> taskStatusUpdateManager; + process::Owned<slave::CSIServer> csiServer; // Indicates whether or not authorization callbacks were set when this agent // was constructed. diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp index d6933a6..31b1629 100644 --- a/src/tests/mesos.cpp +++ b/src/tests/mesos.cpp @@ -348,6 +348,7 @@ Try<Owned<cluster::Slave>> MesosTest::StartSlave(const SlaveOptions& options) options.secretGenerator, options.authorizer, options.futureTracker, + options.csiServer, options.mock); if (slave.isSome() && !options.mock) { diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp index 0ad0999..8f89d7c 100644 --- a/src/tests/mesos.hpp +++ b/src/tests/mesos.hpp @@ -19,6 +19,7 @@ #include <memory> #include <string> +#include <utility> #include <vector> #include <gmock/gmock.h> @@ -82,6 +83,7 @@ #include "resource_provider/detector.hpp" #include "slave/constants.hpp" +#include "slave/csi_server.hpp" #include "slave/slave.hpp" #include "slave/containerizer/containerizer.hpp" @@ -193,6 +195,12 @@ struct SlaveOptions return *this; } + SlaveOptions& withCsiServer(const process::Owned<slave::CSIServer>& csiServer) + { + this->csiServer = csiServer; + return *this; + } + mesos::master::detector::MasterDetector* detector; bool mock; Option<slave::Flags> flags; @@ -205,6 +213,7 @@ struct SlaveOptions Option<mesos::SecretGenerator*> secretGenerator; Option<Authorizer*> authorizer; Option<PendingFutureTracker*> futureTracker; + Option<process::Owned<slave::CSIServer>> csiServer; }; diff --git a/src/tests/mock_slave.cpp b/src/tests/mock_slave.cpp index fa2a0f5..1d03b3c 100644 --- a/src/tests/mock_slave.cpp +++ b/src/tests/mock_slave.cpp @@ -14,6 +14,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include <utility> + #include <gmock/gmock.h> #include <mesos/authentication/secret_generator.hpp> @@ -22,10 +24,12 @@ #include <mesos/slave/resource_estimator.hpp> #include <process/future.hpp> +#include <process/owned.hpp> #include <process/pid.hpp> #include <stout/option.hpp> +#include "slave/csi_server.hpp" #include "slave/slave.hpp" #include "slave/task_status_update_manager.hpp" @@ -47,6 +51,7 @@ using std::string; using std::vector; using process::Future; +using process::Owned; using process::UPID; using testing::_; @@ -104,6 +109,7 @@ MockSlave::MockSlave( SecretGenerator* secretGenerator, VolumeGidManager* volumeGidManager, PendingFutureTracker* futureTracker, + Owned<slave::CSIServer>&& csiServer, const Option<Authorizer*>& authorizer) // It is necessary to explicitly call `ProcessBase` constructor here even // though the direct parent `Slave` already does this. This is because @@ -124,6 +130,7 @@ MockSlave::MockSlave( secretGenerator, volumeGidManager, futureTracker, + std::move(csiServer), #ifndef __WINDOWS__ None(), #endif // __WINDOWS__ diff --git a/src/tests/mock_slave.hpp b/src/tests/mock_slave.hpp index 58daefa..cd13be2 100644 --- a/src/tests/mock_slave.hpp +++ b/src/tests/mock_slave.hpp @@ -31,6 +31,7 @@ #include <mesos/slave/resource_estimator.hpp> #include <process/future.hpp> +#include <process/owned.hpp> #include <process/pid.hpp> #include <stout/duration.hpp> @@ -40,6 +41,7 @@ #include "messages/messages.hpp" +#include "slave/csi_server.hpp" #include "slave/slave.hpp" using ::testing::_; @@ -101,6 +103,7 @@ public: SecretGenerator* secretGenerator, slave::VolumeGidManager* volumeGidManager, PendingFutureTracker* futureTracker, + process::Owned<slave::CSIServer>&& csiServer, const Option<Authorizer*>& authorizer); MOCK_METHOD6(___run, void(