This is an automated email from the ASF dual-hosted git repository. grag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit fc22984de558302029a8cad0655e375653208448 Author: Greg Mann <g...@mesosphere.io> AuthorDate: Thu Sep 3 12:06:38 2020 -0700 Added tests for the 'volume/csi' isolator. Review: https://reviews.apache.org/r/72728/ --- src/Makefile.am | 1 + src/tests/CMakeLists.txt | 1 + src/tests/cluster.cpp | 2 +- .../containerizer/volume_csi_isolator_tests.cpp | 1122 ++++++++++++++++++++ 4 files changed, 1125 insertions(+), 1 deletion(-) diff --git a/src/Makefile.am b/src/Makefile.am index 673ea6c..c2da4e9 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -2873,6 +2873,7 @@ mesos_tests_SOURCES += \ tests/containerizer/runtime_isolator_tests.cpp \ tests/containerizer/sched_tests.cpp \ tests/containerizer/setns_test_helper.cpp \ + tests/containerizer/volume_csi_isolator_tests.cpp \ tests/containerizer/volume_host_path_isolator_tests.cpp \ tests/containerizer/volume_image_isolator_tests.cpp \ tests/containerizer/volume_secret_isolator_tests.cpp diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index 6b420d0..6beb74e 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -247,6 +247,7 @@ if (LINUX) containerizer/rootfs.cpp containerizer/runtime_isolator_tests.cpp containerizer/sched_tests.cpp + containerizer/volume_csi_isolator_tests.cpp containerizer/volume_host_path_isolator_tests.cpp containerizer/volume_image_isolator_tests.cpp containerizer/volume_secret_isolator_tests.cpp) diff --git a/src/tests/cluster.cpp b/src/tests/cluster.cpp index 3c86855..d547cbb 100644 --- a/src/tests/cluster.cpp +++ b/src/tests/cluster.cpp @@ -537,7 +537,7 @@ Try<process::Owned<Slave>> Slave::create( const process::http::URL agentUrl( scheme, process::address().ip, - flags.port, + process::address().port, processId + "/api/v1"); Try<Owned<slave::CSIServer>> _csiServer = slave::CSIServer::create( diff --git a/src/tests/containerizer/volume_csi_isolator_tests.cpp b/src/tests/containerizer/volume_csi_isolator_tests.cpp new file mode 100644 index 0000000..dafb0b7 --- /dev/null +++ b/src/tests/containerizer/volume_csi_isolator_tests.cpp @@ -0,0 +1,1122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include <list> +#include <string> +#include <vector> + +#include <mesos/mesos.hpp> + +#include <mesos/authentication/secret_generator.hpp> + +#include <mesos/csi/v0.hpp> +#include <mesos/csi/v1.hpp> + +#include <mesos/master/detector.hpp> + +#include <process/clock.hpp> +#include <process/future.hpp> +#include <process/grpc.hpp> +#include <process/owned.hpp> +#include <process/reap.hpp> +#include <process/subprocess.hpp> + +#include <stout/bytes.hpp> +#include <stout/hashmap.hpp> +#include <stout/path.hpp> +#include <stout/option.hpp> +#include <stout/os.hpp> +#include <stout/stringify.hpp> + +#include <stout/tests/utils.hpp> + +#ifdef USE_SSL_SOCKET +#include "authentication/executor/jwt_secret_generator.hpp" +#endif // USE_SSL_SOCKET + +#include "csi/paths.hpp" + +#include "master/flags.hpp" + +#include "slave/csi_server.hpp" +#include "slave/flags.hpp" +#include "slave/paths.hpp" + +#include "slave/containerizer/fetcher.hpp" + +#include "slave/containerizer/mesos/containerizer.hpp" +#include "slave/containerizer/mesos/paths.hpp" + +#include "tests/environment.hpp" +#include "tests/mesos.hpp" + +#ifdef USE_SSL_SOCKET +using mesos::authentication::executor::JWTSecretGenerator; +#endif // USE_SSL_SOCKET + +using mesos::internal::slave::CSIServer; +using mesos::internal::slave::Fetcher; +using mesos::internal::slave::MesosContainerizer; + +using mesos::internal::slave::containerizer::paths::getContainerPid; + +using mesos::master::detector::MasterDetector; + +using process::Clock; +using process::Future; +using process::Owned; + +using std::list; +using std::string; +using std::vector; + +using testing::AllOf; +using testing::AnyOf; +using testing::DoAll; + +namespace mesos { +namespace internal { +namespace tests { + +const string TEST_CONTAINER_PATH = "volume-container-path/"; +const string TEST_CSI_PLUGIN_TYPE = "org.apache.mesos.csi.test"; +const string TEST_OUTPUT_FILE = "output.txt"; +const string TEST_OUTPUT_STRING = "hello world"; +const string TEST_VOLUME_ID = "test-volume"; + + +class VolumeCSIIsolatorTest + : public MesosTest, + public testing::WithParamInterface<string> +{ +public: + void SetUp() + { + MesosTest::SetUp(); + + csiPluginConfigDir = path::join(sandbox.get(), "csi_plugin_configs"); + + ASSERT_SOME(os::mkdir(csiPluginConfigDir)); + } + + master::Flags CreateMasterFlags() override + { + master::Flags flags = MesosTest::CreateMasterFlags(); + + // This extended allocation interval helps us avoid unwanted allocations + // when running the clock. + flags.allocation_interval = Seconds(99); + + return flags; + } + + slave::Flags CreateSlaveFlags() override + { + slave::Flags flags = MesosTest::CreateSlaveFlags(); + + flags.csi_plugin_config_dir = csiPluginConfigDir; + flags.image_providers = "DOCKER"; + flags.isolation = + flags.isolation + ",filesystem/linux,volume/csi,docker/runtime"; + + agentWorkDir = flags.work_dir; + + return flags; + } + + void TearDown() + { + const string csiRootDir = slave::paths::getCsiRootDir(agentWorkDir); + + Try<list<string>> csiContainerPaths = + csi::paths::getContainerPaths(csiRootDir, "*", "*"); + + ASSERT_SOME(csiContainerPaths); + + foreach (const string& path, csiContainerPaths.get()) { + Try<csi::paths::ContainerPath> containerPath = + csi::paths::parseContainerPath(csiRootDir, path); + + ASSERT_SOME(containerPath); + + Result<string> endpointDir = + os::realpath(csi::paths::getEndpointDirSymlinkPath( + csiRootDir, + containerPath->type, + containerPath->name, + containerPath->containerId)); + + if (endpointDir.isSome()) { + ASSERT_SOME(os::rmdir(endpointDir.get())); + } + } + } + + void createCsiPluginConfig( + const Bytes& capacity, + const Option<string>& volumes = None(), + const string& name = TEST_CSI_PLUGIN_TYPE) + { + Try<string> mkdtemp = environment->mkdtemp(); + ASSERT_SOME(mkdtemp); + + const string& testCsiPluginWorkDir = mkdtemp.get(); + + const string testCsiPluginPath = + path::join(getTestHelperDir(), "test-csi-plugin"); + + // Note that the `--endpoint` flag required by the test CSI plugin is + // injected by the ServiceManager. + Try<string> csiPluginConfig = strings::format( + R"~( + { + "type": "%s", + "containers": [ + { + "services": [ + "NODE_SERVICE" + ], + "command": { + "shell": false, + "value": "%s", + "arguments": [ + "%s", + "--work_dir=%s", + "--available_capacity=%s", + "%s", + "--volume_id_path=false", + "--api_version=%s" + ] + }, + "resources": [ + { + "name": "cpus", + "type": "SCALAR", + "scalar": { + "value": 0.1 + } + }, + { + "name": "mem", + "type": "SCALAR", + "scalar": { + "value": 1024 + } + } + ] + } + ] + } + )~", + name, + testCsiPluginPath, + testCsiPluginPath, + testCsiPluginWorkDir, + stringify(capacity), + volumes.isSome() ? "--volumes=" + volumes.get() : "", + GetParam()); + + ASSERT_SOME(csiPluginConfig); + ASSERT_SOME(os::write( + path::join(csiPluginConfigDir, name + ".json"), + csiPluginConfig.get())); + } + + SecretGenerator* createSecretGenerator(const slave::Flags& agentFlags) + { + SecretGenerator* secretGenerator = nullptr; + +#ifdef USE_SSL_SOCKET + CHECK_SOME(agentFlags.jwt_secret_key); + + Try<string> jwtSecretKey = os::read(agentFlags.jwt_secret_key.get()); + if (jwtSecretKey.isError()) { + EXIT(EXIT_FAILURE) << "Failed to read the file specified by " + << "--jwt_secret_key"; + } + + Try<os::Permissions> permissions = + os::permissions(agentFlags.jwt_secret_key.get()); + if (permissions.isError()) { + LOG(WARNING) << "Failed to stat jwt secret key file '" + << agentFlags.jwt_secret_key.get() + << "': " << permissions.error(); + } else if (permissions->others.rwx) { + LOG(WARNING) << "Permissions on executor secret key file '" + << agentFlags.jwt_secret_key.get() + << "' are too open; it is recommended that your" + << " key file is NOT accessible by others"; + } + + secretGenerator = new JWTSecretGenerator(jwtSecretKey.get()); +#endif // USE_SSL_SOCKET + + return secretGenerator; + } + + string csiPluginConfigDir; + string agentWorkDir; +}; + + +INSTANTIATE_TEST_CASE_P( + CSIVersion, + VolumeCSIIsolatorTest, + testing::Values(csi::v0::API_VERSION, csi::v1::API_VERSION), + [](const testing::TestParamInfo<string>& info) { return info.param; }); + + +// When one or more invalid plugin configurations exist in the config +// directory, the CSI server's `start()` method should return a failure. +TEST_P(VolumeCSIIsolatorTest, ROOT_InvalidPluginConfig) +{ + // Write an invalid configuration file to the config directory. + ASSERT_SOME(os::write( + path::join(csiPluginConfigDir, "config.json"), + "this is not valid JSON")); + + process::http::URL agentURL( + "http", + process::address().ip, + process::address().port, + "slave/api/v1"); + + Try<Owned<CSIServer>> server = + CSIServer::create(CreateSlaveFlags(), agentURL, nullptr, nullptr); + + ASSERT_SOME(server); + + SlaveID agentId; + + agentId.set_value("0123456789"); + + Future<Nothing> started = server.get()->start(agentId); + + AWAIT_FAILED(started); + + ASSERT_TRUE(strings::contains( + started.failure(), + "CSI server failed to initialize CSI plugins: JSON parse")); +} + + +// To verify the basic functionality of CSI volumes, we launch one task which +// mounts a preprovisioned volume and writes to it. Then, we launch a second +// task which reads and verifies the output from the same volume. +TEST_P(VolumeCSIIsolatorTest, ROOT_INTERNET_CURL_CommandTaskWithVolume) +{ + createCsiPluginConfig(Bytes(0), TEST_VOLUME_ID + ":1MB"); + + master::Flags masterFlags = CreateMasterFlags(); + + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + slave::Flags agentFlags = CreateSlaveFlags(); + + const SlaveOptions agentOptions = + SlaveOptions(detector.get()) + .withFlags(agentFlags); + + Try<Owned<cluster::Slave>> agent = StartSlave(agentOptions); + ASSERT_SOME(agent); + + AWAIT_READY(slaveRegisteredMessage); + + Clock::pause(); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + + AWAIT_READY(subscribed); + v1::FrameworkID frameworkId(subscribed->framework_id()); + + v1::Resources resources = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::ExecutorInfo executorInfo = v1::createExecutorInfo( + v1::DEFAULT_EXECUTOR_ID, + None(), + resources, + v1::ExecutorInfo::DEFAULT, + frameworkId); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + v1::Offer offer = offers->offers(0); + const v1::AgentID& agentId = offer.agent_id(); + + Try<string> taskCommand = strings::format( + "echo '%s' > %s", + TEST_OUTPUT_STRING, + TEST_CONTAINER_PATH + TEST_OUTPUT_FILE); + + v1::TaskInfo taskInfo = v1::createTask(agentId, resources, taskCommand.get()); + + taskInfo.mutable_container()->CopyFrom(v1::createContainerInfo( + "alpine", + {v1::createVolumeCsi( + TEST_CSI_PLUGIN_TYPE, + TEST_VOLUME_ID, + TEST_CONTAINER_PATH, + mesos::v1::Volume::Source::CSIVolume::VolumeCapability + ::AccessMode::SINGLE_NODE_WRITER, + false)})); + + Future<v1::scheduler::Event::Update> startingUpdate; + Future<v1::scheduler::Event::Update> runningUpdate; + Future<v1::scheduler::Event::Update> finishedUpdate; + + testing::Sequence taskSequence; + EXPECT_CALL( + *scheduler, + update(_, TaskStatusUpdateStateEq(v1::TASK_STARTING))) + .InSequence(taskSequence) + .WillOnce(DoAll( + FutureArg<1>(&startingUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId)); + + EXPECT_CALL( + *scheduler, + update(_, TaskStatusUpdateStateEq(v1::TASK_RUNNING))) + .InSequence(taskSequence) + .WillOnce(DoAll( + FutureArg<1>(&runningUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId)); + + EXPECT_CALL( + *scheduler, + update(_, TaskStatusUpdateStateEq(v1::TASK_FINISHED))) + .InSequence(taskSequence) + .WillOnce(DoAll( + FutureArg<1>(&finishedUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId)); + + Clock::resume(); + + mesos.send( + v1::createCallAccept( + frameworkId, + offer, + {v1::LAUNCH({taskInfo})})); + + AWAIT_READY(startingUpdate); + AWAIT_READY(runningUpdate); + AWAIT_READY(finishedUpdate); + + // To avoid pausing the clock before the acknowledgement is sent, we await on + // this future. + Future<StatusUpdateAcknowledgementMessage> acknowledgement = + FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _); + + AWAIT_READY(acknowledgement); + + Clock::pause(); + + // Run another task which mounts the same external volume and reads the + // output of the previous task. + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)); + + Clock::advance(masterFlags.allocation_interval); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + offer = offers->offers(0); + + taskCommand = strings::format( + "if [ \"`cat %s`\" = \"%s\" ]; then exit 0; else exit 1; fi", + TEST_CONTAINER_PATH + TEST_OUTPUT_FILE, + TEST_OUTPUT_STRING); + + CHECK_SOME(taskCommand); + + taskInfo = v1::createTask(agentId, resources, taskCommand.get()); + + taskInfo.mutable_container()->CopyFrom(v1::createContainerInfo( + None(), + {v1::createVolumeCsi( + TEST_CSI_PLUGIN_TYPE, + TEST_VOLUME_ID, + TEST_CONTAINER_PATH, + mesos::v1::Volume::Source::CSIVolume::VolumeCapability + ::AccessMode::SINGLE_NODE_WRITER, + false)})); + + EXPECT_CALL( + *scheduler, + update(_, TaskStatusUpdateStateEq(v1::TASK_STARTING))) + .InSequence(taskSequence) + .WillOnce(DoAll( + FutureArg<1>(&startingUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId)); + + EXPECT_CALL( + *scheduler, + update(_, TaskStatusUpdateStateEq(v1::TASK_RUNNING))) + .InSequence(taskSequence) + .WillOnce(DoAll( + FutureArg<1>(&runningUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId)); + + EXPECT_CALL( + *scheduler, + update(_, TaskStatusUpdateStateEq(v1::TASK_FINISHED))) + .InSequence(taskSequence) + .WillOnce(DoAll( + FutureArg<1>(&finishedUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId)); + + Clock::resume(); + + mesos.send( + v1::createCallAccept( + frameworkId, + offer, + {v1::LAUNCH({taskInfo})})); + + AWAIT_READY(startingUpdate); + AWAIT_READY(runningUpdate); + AWAIT_READY(finishedUpdate); +} + + +// Two tasks in a task group should both be able to mount the same CSI volume. +TEST_P(VolumeCSIIsolatorTest, ROOT_INTERNET_CURL_TaskGroupWithVolume) +{ + createCsiPluginConfig(Bytes(0), TEST_VOLUME_ID + ":1MB"); + + master::Flags masterFlags = CreateMasterFlags(); + + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + slave::Flags agentFlags = CreateSlaveFlags(); + + const SlaveOptions agentOptions = + SlaveOptions(detector.get()) + .withFlags(agentFlags); + + Try<Owned<cluster::Slave>> agent = StartSlave(agentOptions); + ASSERT_SOME(agent); + + AWAIT_READY(slaveRegisteredMessage); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + + AWAIT_READY(subscribed); + v1::FrameworkID frameworkId(subscribed->framework_id()); + + v1::Resources resources = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::ExecutorInfo executorInfo = v1::createExecutorInfo( + v1::DEFAULT_EXECUTOR_ID, + None(), + resources, + v1::ExecutorInfo::DEFAULT, + frameworkId); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + v1::Offer offer = offers->offers(0); + const v1::AgentID& agentId = offer.agent_id(); + + Try<string> taskCommand1 = strings::format( + "echo '%s' > %s && exit 0", + TEST_OUTPUT_STRING, + TEST_CONTAINER_PATH + TEST_OUTPUT_FILE); + + Try<string> taskCommand2 = strings::format( + "while [ \"`cat %s 2>/dev/null`\" != \"%s\" ]; do : sleep 0.1 ; done; exit 0", + TEST_CONTAINER_PATH + TEST_OUTPUT_FILE, + TEST_OUTPUT_STRING); + + v1::TaskInfo taskInfo1 = + v1::createTask(agentId, resources, taskCommand1.get()); + + v1::TaskInfo taskInfo2 = + v1::createTask(agentId, resources, taskCommand2.get()); + + mesos::v1::Volume volume = v1::createVolumeCsi( + TEST_CSI_PLUGIN_TYPE, + TEST_VOLUME_ID, + TEST_CONTAINER_PATH, + mesos::v1::Volume::Source::CSIVolume::VolumeCapability + ::AccessMode::SINGLE_NODE_WRITER, + false); + + taskInfo1.mutable_container()->CopyFrom( + v1::createContainerInfo("alpine", {volume})); + + taskInfo2.mutable_container()->CopyFrom( + v1::createContainerInfo(None(), {volume})); + + Future<v1::scheduler::Event::Update> startingUpdate1; + Future<v1::scheduler::Event::Update> runningUpdate1; + Future<v1::scheduler::Event::Update> finishedUpdate1; + + testing::Sequence task1; + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo1.task_id()), + TaskStatusUpdateStateEq(v1::TASK_STARTING)))) + .InSequence(task1) + .WillOnce(DoAll( + FutureArg<1>(&startingUpdate1), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId)); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo1.task_id()), + TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) + .InSequence(task1) + .WillOnce(DoAll( + FutureArg<1>(&runningUpdate1), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId)); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo1.task_id()), + TaskStatusUpdateStateEq(v1::TASK_FINISHED)))) + .InSequence(task1) + .WillOnce(DoAll( + FutureArg<1>(&finishedUpdate1), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId)); + + Future<v1::scheduler::Event::Update> startingUpdate2; + Future<v1::scheduler::Event::Update> runningUpdate2; + Future<v1::scheduler::Event::Update> finishedUpdate2; + + testing::Sequence task2; + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo2.task_id()), + TaskStatusUpdateStateEq(v1::TASK_STARTING)))) + .InSequence(task2) + .WillOnce(DoAll( + FutureArg<1>(&startingUpdate2), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId)); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo2.task_id()), + TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) + .InSequence(task2) + .WillOnce(DoAll( + FutureArg<1>(&runningUpdate2), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId)); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo2.task_id()), + TaskStatusUpdateStateEq(v1::TASK_FINISHED)))) + .InSequence(task2) + .WillOnce(DoAll( + FutureArg<1>(&finishedUpdate2), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId)); + + mesos.send( + v1::createCallAccept( + frameworkId, + offer, + {v1::LAUNCH_GROUP( + executorInfo, v1::createTaskGroupInfo({taskInfo1, taskInfo2}))})); + + AWAIT_READY(startingUpdate1); + AWAIT_READY(startingUpdate2); + + AWAIT_READY(runningUpdate1); + AWAIT_READY(runningUpdate2); + + AWAIT_READY(finishedUpdate1); + AWAIT_READY(finishedUpdate2); +} + + +// A task which is executed as a non-root user should be able to mount a CSI +// volume and write to it. +TEST_P(VolumeCSIIsolatorTest, UNPRIVILEGED_USER_NonRootTaskUser) +{ + createCsiPluginConfig(Bytes(0), TEST_VOLUME_ID + ":1MB"); + + master::Flags masterFlags = CreateMasterFlags(); + + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + slave::Flags agentFlags = CreateSlaveFlags(); + + const SlaveOptions agentOptions = + SlaveOptions(detector.get()) + .withFlags(agentFlags); + + Try<Owned<cluster::Slave>> agent = StartSlave(agentOptions); + ASSERT_SOME(agent); + + AWAIT_READY(slaveRegisteredMessage); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + + AWAIT_READY(subscribed); + v1::FrameworkID frameworkId(subscribed->framework_id()); + + v1::Resources resources = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::ExecutorInfo executorInfo = v1::createExecutorInfo( + v1::DEFAULT_EXECUTOR_ID, + None(), + resources, + v1::ExecutorInfo::DEFAULT, + frameworkId); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + v1::Offer offer = offers->offers(0); + const v1::AgentID& agentId = offer.agent_id(); + + v1::CommandInfo taskCommand = v1::createCommandInfo( + strings::format( + "echo '%s' > %s && " + "if [ \"`cat %s`\" = \"%s\" ] ; then exit 0 ; else exit 1 ; fi", + TEST_OUTPUT_STRING, + TEST_CONTAINER_PATH + TEST_OUTPUT_FILE, + TEST_CONTAINER_PATH + TEST_OUTPUT_FILE, + TEST_OUTPUT_STRING).get()); + + Option<string> user = os::getenv("SUDO_USER"); + ASSERT_SOME(user); + + taskCommand.set_user(user.get()); + + v1::TaskInfo taskInfo = v1::createTask(agentId, resources, taskCommand); + + taskInfo.mutable_container()->CopyFrom(v1::createContainerInfo( + None(), + {v1::createVolumeCsi( + TEST_CSI_PLUGIN_TYPE, + TEST_VOLUME_ID, + TEST_CONTAINER_PATH, + mesos::v1::Volume::Source::CSIVolume::VolumeCapability + ::AccessMode::SINGLE_NODE_WRITER, + false)})); + + Future<v1::scheduler::Event::Update> startingUpdate; + Future<v1::scheduler::Event::Update> runningUpdate; + Future<v1::scheduler::Event::Update> finishedUpdate; + + testing::Sequence taskSequence; + EXPECT_CALL(*scheduler, update(_, TaskStatusUpdateStateEq(v1::TASK_STARTING))) + .InSequence(taskSequence) + .WillOnce(DoAll( + FutureArg<1>(&startingUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId)); + + EXPECT_CALL(*scheduler, update(_, TaskStatusUpdateStateEq(v1::TASK_RUNNING))) + .InSequence(taskSequence) + .WillOnce(DoAll( + FutureArg<1>(&runningUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId)); + + EXPECT_CALL(*scheduler, update(_, TaskStatusUpdateStateEq(v1::TASK_FINISHED))) + .InSequence(taskSequence) + .WillOnce(DoAll( + FutureArg<1>(&finishedUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId)); + + mesos.send(v1::createCallAccept( + frameworkId, + offer, + {v1::LAUNCH({taskInfo})})); + + AWAIT_READY(startingUpdate); + AWAIT_READY(runningUpdate); + AWAIT_READY(finishedUpdate); +} + + +// If a publish call is made against a plugin whose configuration file is not +// present, the call should fail. If a valid configuration is added for the +// plugin later, it should be initialized and the call should be handled. +TEST_P(VolumeCSIIsolatorTest, ROOT_PluginConfigAddedAtRuntime) +{ + // Write a default plugin configuration to disk so that the CSI server will + // initialize successfully. + createCsiPluginConfig(Bytes(0), TEST_VOLUME_ID + ":1MB"); + + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + const slave::Flags agentFlags = CreateSlaveFlags(); + + const string processId = process::ID::generate("slave"); + + const process::http::URL agentUrl( + "http", + process::address().ip, + process::address().port, + processId + "/api/v1"); + + Try<Owned<CSIServer>> csiServer = CSIServer::create( + agentFlags, agentUrl, createSecretGenerator(agentFlags), nullptr); + + ASSERT_SOME(csiServer); + + auto agentOptions = + SlaveOptions(detector.get()) + .withId(processId) + .withFlags(agentFlags) + .withCsiServer(csiServer.get()); + + Try<Owned<cluster::Slave>> agent = StartSlave(agentOptions); + ASSERT_SOME(agent); + + AWAIT_READY(slaveRegisteredMessage); + + SlaveID agentId = slaveRegisteredMessage->slave_id(); + + Future<Nothing> started = csiServer.get()->start(agentId); + + AWAIT_READY(started); + + Volume::Source::CSIVolume::VolumeCapability capability; + capability.mutable_mount(); + capability.mutable_access_mode()->set_mode( + Volume::Source::CSIVolume::VolumeCapability + ::AccessMode::SINGLE_NODE_WRITER); + + Volume::Source::CSIVolume::StaticProvisioning staticVol; + staticVol.set_volume_id(TEST_VOLUME_ID); + staticVol.mutable_volume_capability()->CopyFrom(capability); + + const string pluginName = "org.apache.mesos.csi.added-at-runtime"; + + Volume::Source::CSIVolume volume; + volume.set_plugin_name(pluginName); + volume.mutable_static_provisioning()->CopyFrom(staticVol); + + // First, perform publish/unpublish calls before we have written the + // configuration to disk. + Future<string> nodePublishResult = csiServer.get()->publishVolume(volume); + + AWAIT_FAILED(nodePublishResult); + + ASSERT_TRUE(strings::contains( + nodePublishResult.failure(), + "Failed to initialize CSI plugin '" + pluginName + + "': No valid CSI plugin configurations found")); + + // Now write the configuration to disk and try the calls again. + createCsiPluginConfig(Bytes(0), TEST_VOLUME_ID + ":1MB", pluginName); + + nodePublishResult = csiServer.get()->publishVolume(volume); + + AWAIT_READY(nodePublishResult); + + Future<Nothing> nodeUnpublishResult = + csiServer.get()->unpublishVolume(pluginName, TEST_VOLUME_ID); + + AWAIT_READY(nodeUnpublishResult); +} + + +// This test verifies basic functionality of a CSI volume when it is published +// by an unmanaged CSI plugin. +TEST_P(VolumeCSIIsolatorTest, ROOT_UnmanagedPlugin) +{ + const string pluginBinary = "test-csi-plugin"; + + // Launch the test CSI plugin manually in a subprocess. + Try<string> mkdtemp = environment->mkdtemp(); + ASSERT_SOME(mkdtemp); + + const string& testCsiPluginWorkDir = mkdtemp.get(); + + const string testCsiPluginPath = path::join(getTestHelperDir(), pluginBinary); + + const string testCsiSocketPath = + path::join("unix:///", testCsiPluginWorkDir, "endpoint.sock"); + + vector<string> argv { + pluginBinary, + "--work_dir=" + testCsiPluginWorkDir, + "--available_capacity=" + stringify(Bytes(0)), + "--volumes=" + TEST_VOLUME_ID + ":1MB", + "--volume_id_path=false", + "--endpoint=" + testCsiSocketPath, + "--api_version=" + GetParam() + }; + + Result<string> path = os::realpath(getLauncherDir()); + ASSERT_SOME(path); + + Try<process::Subprocess> csiPlugin = process::subprocess( + path::join(path.get(), pluginBinary), + argv, + process::Subprocess::FD(STDIN_FILENO), + process::Subprocess::FD(STDOUT_FILENO), + process::Subprocess::FD(STDERR_FILENO), + nullptr, // Don't pass flags. + None(), // No environment. + None(), // Use default clone. + {}, // No parent hooks. + { process::Subprocess::ChildHook::SETSID() }); + + ASSERT_SOME(csiPlugin); + + Try<string> csiPluginConfig = strings::format( + R"~( + { + "type": "%s", + "endpoints": [ + { + "csi_service": "NODE_SERVICE", + "endpoint": "%s" + } + ] + } + )~", + TEST_CSI_PLUGIN_TYPE, + testCsiSocketPath); + + ASSERT_SOME(csiPluginConfig); + ASSERT_SOME(os::write( + path::join(csiPluginConfigDir, "plugin_config.json"), + csiPluginConfig.get())); + + master::Flags masterFlags = CreateMasterFlags(); + + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + slave::Flags agentFlags = CreateSlaveFlags(); + + const SlaveOptions agentOptions = + SlaveOptions(detector.get()) + .withFlags(agentFlags); + + Try<Owned<cluster::Slave>> agent = StartSlave(agentOptions); + ASSERT_SOME(agent); + + AWAIT_READY(slaveRegisteredMessage); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + + AWAIT_READY(subscribed); + v1::FrameworkID frameworkId(subscribed->framework_id()); + + v1::Resources resources = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::ExecutorInfo executorInfo = v1::createExecutorInfo( + v1::DEFAULT_EXECUTOR_ID, + None(), + resources, + v1::ExecutorInfo::DEFAULT, + frameworkId); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + v1::Offer offer = offers->offers(0); + const v1::AgentID& agentId = offer.agent_id(); + + v1::CommandInfo taskCommand = v1::createCommandInfo( + strings::format( + "echo '%s' > %s && " + "if [ \"`cat %s`\" = \"%s\" ] ; then exit 0 ; else exit 1 ; fi", + TEST_OUTPUT_STRING, + TEST_CONTAINER_PATH + TEST_OUTPUT_FILE, + TEST_CONTAINER_PATH + TEST_OUTPUT_FILE, + TEST_OUTPUT_STRING).get()); + + v1::TaskInfo taskInfo = v1::createTask(agentId, resources, taskCommand); + + taskInfo.mutable_container()->CopyFrom(v1::createContainerInfo( + None(), + {v1::createVolumeCsi( + TEST_CSI_PLUGIN_TYPE, + TEST_VOLUME_ID, + TEST_CONTAINER_PATH, + mesos::v1::Volume::Source::CSIVolume::VolumeCapability + ::AccessMode::SINGLE_NODE_WRITER, + false)})); + + Future<v1::scheduler::Event::Update> startingUpdate; + Future<v1::scheduler::Event::Update> runningUpdate; + Future<v1::scheduler::Event::Update> finishedUpdate; + + testing::Sequence taskSequence; + EXPECT_CALL(*scheduler, update(_, TaskStatusUpdateStateEq(v1::TASK_STARTING))) + .InSequence(taskSequence) + .WillOnce(DoAll( + FutureArg<1>(&startingUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId)); + + EXPECT_CALL(*scheduler, update(_, TaskStatusUpdateStateEq(v1::TASK_RUNNING))) + .InSequence(taskSequence) + .WillOnce(DoAll( + FutureArg<1>(&runningUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId)); + + EXPECT_CALL(*scheduler, update(_, TaskStatusUpdateStateEq(v1::TASK_FINISHED))) + .InSequence(taskSequence) + .WillOnce(DoAll( + FutureArg<1>(&finishedUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId)); + + mesos.send(v1::createCallAccept( + frameworkId, + offer, + {v1::LAUNCH({taskInfo})})); + + AWAIT_READY(startingUpdate); + AWAIT_READY(runningUpdate); + AWAIT_READY(finishedUpdate); +} + +} // namespace tests { +} // namespace internal { +} // namespace mesos {