This is an automated email from the ASF dual-hosted git repository.

qianzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 83ae449fa396d92e708e2ba8bb6f1312eb0fd5dd
Author: Qian Zhang <zhq527...@gmail.com>
AuthorDate: Tue Aug 4 15:44:24 2020 +0800

    Implemented the `prepare` method of `volume/csi` isolator.
    
    Review: https://reviews.apache.org/r/72733
---
 include/mesos/mesos.proto                          |   8 +-
 include/mesos/v1/mesos.proto                       |   8 +-
 src/CMakeLists.txt                                 |   1 +
 src/Makefile.am                                    |   4 +
 src/common/validation.cpp                          |  12 ++
 .../mesos/isolators/volume/csi/isolator.cpp        | 214 ++++++++++++++++++++-
 .../mesos/isolators/volume/csi/isolator.hpp        |  24 +++
 .../mesos/isolators/volume/csi/state.hpp           |  61 ++++++
 .../mesos/isolators/volume/csi/state.proto         |  29 +++
 9 files changed, 352 insertions(+), 9 deletions(-)

diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 0f91d88..661f746 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -3055,10 +3055,10 @@ message Volume {
   // TODO(gyliu513): Make this as `optional` after deprecation cycle of 1.0.
   required Mode mode = 3;
 
-  // Path pointing to a directory or file in the container. If the
-  // path is a relative path, it is relative to the container work
-  // directory. If the path is an absolute path, that path must
-  // already exist.
+  // Path pointing to a directory or file in the container. If the path
+  // is a relative path, it is relative to the container work directory.
+  // If the path is an absolute path and the container does not have its
+  // own rootfs, that path must already exist in the agent host rootfs.
   required string container_path = 1;
 
   // The following specifies the source of this volume. At most one of
diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto
index f25db8a..ffe45c3 100644
--- a/include/mesos/v1/mesos.proto
+++ b/include/mesos/v1/mesos.proto
@@ -3044,10 +3044,10 @@ message Volume {
   // TODO(gyliu513): Make this as `optional` after deprecation cycle of 1.0.
   required Mode mode = 3;
 
-  // Path pointing to a directory or file in the container. If the
-  // path is a relative path, it is relative to the container work
-  // directory. If the path is an absolute path, that path must
-  // already exist.
+  // Path pointing to a directory or file in the container. If the path
+  // is a relative path, it is relative to the container work directory.
+  // If the path is an absolute path and the container does not have its
+  // own rootfs, that path must already exist in the agent host rootfs.
   required string container_path = 1;
 
   // The following specifies the source of this volume. At most one of
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index f3abdbf..a976dc1 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -96,6 +96,7 @@ if (NOT WIN32)
   PROTOC_GENERATE(INTERNAL TARGET csi/state)
   PROTOC_GENERATE(INTERNAL TARGET resource_provider/storage/disk_profile)
   PROTOC_GENERATE(INTERNAL TARGET 
slave/containerizer/mesos/isolators/docker/volume/state)
+  PROTOC_GENERATE(INTERNAL TARGET 
slave/containerizer/mesos/isolators/volume/csi/state)
   PROTOC_GENERATE(INTERNAL TARGET 
slave/containerizer/mesos/provisioner/docker/message)
   PROTOC_GENERATE(INTERNAL TARGET slave/volume_gid_manager/state)
 endif ()
diff --git a/src/Makefile.am b/src/Makefile.am
index 70e844d..6d68ed0 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -414,6 +414,8 @@ CXX_PROTOS +=                                               
                \
   slave/containerizer/mesos/isolators/docker/volume/state.pb.h         \
   slave/containerizer/mesos/isolators/network/cni/spec.pb.cc           \
   slave/containerizer/mesos/isolators/network/cni/spec.pb.h            \
+  slave/containerizer/mesos/isolators/volume/csi/state.pb.cc           \
+  slave/containerizer/mesos/isolators/volume/csi/state.pb.h            \
   slave/volume_gid_manager/state.pb.cc                                 \
   slave/volume_gid_manager/state.pb.h
 
@@ -1027,6 +1029,7 @@ libmesos_no_3rdparty_la_SOURCES =                         
        \
   slave/containerizer/mesos/provisioner/docker/message.proto           \
   slave/containerizer/mesos/isolators/docker/volume/state.proto                
\
   slave/containerizer/mesos/isolators/network/cni/spec.proto           \
+  slave/containerizer/mesos/isolators/volume/csi/state.proto           \
   slave/volume_gid_manager/state.proto
 
 # TODO(tillt): Remove authentication/cram_md5/* which will enable us to
@@ -1253,6 +1256,7 @@ libmesos_no_3rdparty_la_SOURCES +=                        
                \
   slave/containerizer/mesos/isolators/volume/sandbox_path.hpp          \
   slave/containerizer/mesos/isolators/volume/csi/paths.cpp             \
   slave/containerizer/mesos/isolators/volume/csi/paths.hpp             \
+  slave/containerizer/mesos/isolators/volume/csi/state.hpp             \
   slave/containerizer/mesos/isolators/windows/cpu.hpp                  \
   slave/containerizer/mesos/isolators/windows/mem.hpp                  \
   slave/containerizer/mesos/launch.cpp                                 \
diff --git a/src/common/validation.cpp b/src/common/validation.cpp
index 14a8c7b..e22d564 100644
--- a/src/common/validation.cpp
+++ b/src/common/validation.cpp
@@ -255,6 +255,18 @@ Option<Error> validateVolume(const Volume& volume)
               "'source.secret' is not set for SECRET volume");
         }
         break;
+      case Volume::Source::CSI_VOLUME:
+        if (!volume.source().has_csi_volume()) {
+          return Error(
+              "'source.csi_volume' is not set for CSI volume");
+        }
+
+        if (!volume.source().csi_volume().has_static_provisioning()) {
+          return Error(
+              "'source.csi_volume.static_provisioning' "
+              "is not set for CSI volume");
+        }
+        break;
       default:
         return Error("'source.type' is unknown");
     }
diff --git a/src/slave/containerizer/mesos/isolators/volume/csi/isolator.cpp 
b/src/slave/containerizer/mesos/isolators/volume/csi/isolator.cpp
index 7ec3a4e..90a526f 100644
--- a/src/slave/containerizer/mesos/isolators/volume/csi/isolator.cpp
+++ b/src/slave/containerizer/mesos/isolators/volume/csi/isolator.cpp
@@ -17,6 +17,7 @@
 #include <string>
 #include <vector>
 
+#include <process/collect.hpp>
 #include <process/future.hpp>
 #include <process/owned.hpp>
 
@@ -24,14 +25,23 @@
 
 #include <stout/os/realpath.hpp>
 
+#include "common/protobuf_utils.hpp"
+
+#include "linux/fs.hpp"
+#include "linux/ns.hpp"
+
+#include "slave/state.hpp"
+
 #include "slave/containerizer/mesos/isolators/volume/csi/isolator.hpp"
 #include "slave/containerizer/mesos/isolators/volume/csi/paths.hpp"
 
 using std::string;
 using std::vector;
 
+using process::Failure;
 using process::Future;
 using process::Owned;
+using process::PID;
 
 using mesos::slave::ContainerConfig;
 using mesos::slave::ContainerLaunchInfo;
@@ -42,6 +52,9 @@ namespace mesos {
 namespace internal {
 namespace slave {
 
+using AccessMode = Volume::Source::CSIVolume::VolumeCapability::AccessMode;
+
+
 Try<Isolator*> VolumeCSIIsolatorProcess::create(
     const Flags& flags,
     CSIServer* csiServer)
@@ -99,7 +112,206 @@ Future<Option<ContainerLaunchInfo>> 
VolumeCSIIsolatorProcess::prepare(
     const ContainerID& containerId,
     const ContainerConfig& containerConfig)
 {
-  return None();
+  if (!containerConfig.has_container_info()) {
+    return None();
+  }
+
+  if (containerConfig.container_info().type() != ContainerInfo::MESOS) {
+    return Failure("Can only prepare CSI volumes for a MESOS container");
+  }
+
+  // The hashset is used to check if there are duplicated CSI volumes for the
+  // same container.
+  hashset<CSIVolume> volumeSet;
+
+  // Represents the CSI volume mounts that we want to do for the container.
+  vector<Mount> mounts;
+
+  foreach (const Volume& _volume, containerConfig.container_info().volumes()) {
+    if (!_volume.has_source() ||
+        !_volume.source().has_type() ||
+        _volume.source().type() != Volume::Source::CSI_VOLUME) {
+      continue;
+    }
+
+    CHECK(_volume.source().has_csi_volume());
+    CHECK(_volume.source().csi_volume().has_static_provisioning());
+
+    const Volume::Source::CSIVolume& csiVolume = _volume.source().csi_volume();
+    const string& pluginName = csiVolume.plugin_name();
+    const string& volumeId = csiVolume.static_provisioning().volume_id();
+    const AccessMode& accessMode =
+      csiVolume.static_provisioning().volume_capability().access_mode();
+
+    if (csiVolume.static_provisioning().readonly() &&
+        _volume.mode() == Volume::RW) {
+      return Failure(
+          "Cannot publish the volume '" + volumeId +
+          "' in read-only mode but use it in read-write mode");
+    }
+
+    if ((accessMode.mode() == AccessMode::SINGLE_NODE_READER_ONLY ||
+         accessMode.mode() == AccessMode::MULTI_NODE_READER_ONLY) &&
+        _volume.mode() == Volume::RW) {
+      return Failure(
+          "Cannot use the read-only volume '" +
+          volumeId + "' in read-write mode");
+    }
+
+    CSIVolume volume;
+    volume.set_plugin_name(pluginName);
+    volume.set_id(volumeId);
+
+    if (volumeSet.contains(volume)) {
+      return Failure(
+          "Found duplicate CSI volume with plugin '" +
+          pluginName + "' and volume ID '" + volumeId + "'");
+    }
+
+    // Determine the target of the mount.
+    string target;
+
+    // The logic to determine a volume mount target is identical to Linux
+    // filesystem isolator, because this isolator has a dependency on that
+    // isolator, and it assumes that if the container specifies a rootfs
+    // the sandbox is already bind mounted into the container.
+    if (path::is_absolute(_volume.container_path())) {
+      // To specify a CSI volume for a container, frameworks should be allowed
+      // to define the `container_path` either as an absolute path or a 
relative
+      // path. Please see Linux filesystem isolator for details.
+      if (containerConfig.has_rootfs()) {
+        target = path::join(
+            containerConfig.rootfs(),
+            _volume.container_path());
+
+        Try<Nothing> mkdir = os::mkdir(target);
+        if (mkdir.isError()) {
+          return Failure(
+              "Failed to create the target of the mount at '" +
+              target + "': " + mkdir.error());
+        }
+      } else {
+        target = _volume.container_path();
+
+        if (!os::exists(target)) {
+          return Failure("Absolute container path '" + target + "' "
+                         "does not exist");
+        }
+      }
+    } else {
+      if (containerConfig.has_rootfs()) {
+        target = path::join(containerConfig.rootfs(),
+                            flags.sandbox_directory,
+                            _volume.container_path());
+      } else {
+        target = path::join(containerConfig.directory(),
+                            _volume.container_path());
+      }
+
+      // NOTE: We cannot create the mount point at `target` if
+      // container has rootfs defined. The bind mount of the sandbox
+      // will hide what's inside `target`. So we should always create
+      // the mount point in `directory`.
+      string mountPoint = path::join(
+          containerConfig.directory(),
+          _volume.container_path());
+
+      Try<Nothing> mkdir = os::mkdir(mountPoint);
+      if (mkdir.isError()) {
+        return Failure(
+            "Failed to create the target of the mount at '" +
+            mountPoint + "': " + mkdir.error());
+      }
+    }
+
+    Mount mount;
+    mount.csiVolume = csiVolume;
+    mount.target = target;
+    mount.volumeMode = _volume.mode();
+
+    mounts.push_back(mount);
+    volumeSet.insert(volume);
+  }
+
+  if (volumeSet.empty()) {
+    return None();
+  }
+
+  // Create the `CSIVolumes` protobuf message to checkpoint.
+  CSIVolumes state;
+  foreach (const CSIVolume& volume, volumeSet) {
+    state.add_volumes()->CopyFrom(volume);
+  }
+
+  const string volumesPath = csi::paths::getVolumesPath(rootDir, containerId);
+  Try<Nothing> checkpoint = state::checkpoint(volumesPath, state);
+  if (checkpoint.isError()) {
+    return Failure(
+        "Failed to checkpoint CSI volumes at '" +
+        volumesPath + "': " + checkpoint.error());
+  }
+
+  VLOG(1) << "Successfully created checkpoint at '" << volumesPath << "'";
+
+  infos.put(containerId, Owned<Info>(new Info(volumeSet)));
+
+  // Invoke CSI server to publish the volumes.
+  vector<Future<string>> futures;
+  futures.reserve(mounts.size());
+  foreach (const Mount& mount, mounts) {
+    futures.push_back(csiServer->publishVolume(mount.csiVolume));
+  }
+
+  return await(futures)
+    .then(defer(
+        PID<VolumeCSIIsolatorProcess>(this),
+        &VolumeCSIIsolatorProcess::_prepare,
+        containerId,
+        mounts,
+        lambda::_1));
+}
+
+
+Future<Option<ContainerLaunchInfo>> VolumeCSIIsolatorProcess::_prepare(
+    const ContainerID& containerId,
+    const vector<Mount>& mounts,
+    const vector<Future<string>>& futures)
+{
+
+  ContainerLaunchInfo launchInfo;
+  launchInfo.add_clone_namespaces(CLONE_NEWNS);
+
+  vector<string> messages;
+  vector<string> sources;
+  foreach (const Future<string>& future, futures) {
+    if (!future.isReady()) {
+      messages.push_back(future.isFailed() ? future.failure() : "discarded");
+      continue;
+    }
+
+    sources.push_back(strings::trim(future.get()));
+  }
+
+  if (!messages.empty()) {
+    return Failure(strings::join("\n", messages));
+  }
+
+  CHECK_EQ(sources.size(), mounts.size());
+
+  for (size_t i = 0; i < sources.size(); i++) {
+    const string& source = sources[i];
+    const Mount& mount = mounts[i];
+
+    LOG(INFO) << "Mounting CSI volume mount point '" << source
+              << "' to '" << mount.target << "' for container " << containerId;
+
+    *launchInfo.add_mounts() = protobuf::slave::createContainerMount(
+        source,
+        mount.target,
+        MS_BIND | MS_REC | (mount.volumeMode == Volume::RO ? MS_RDONLY : 0));
+  }
+
+  return launchInfo;
 }
 
 
diff --git a/src/slave/containerizer/mesos/isolators/volume/csi/isolator.hpp 
b/src/slave/containerizer/mesos/isolators/volume/csi/isolator.hpp
index f943766..a70da4f 100644
--- a/src/slave/containerizer/mesos/isolators/volume/csi/isolator.hpp
+++ b/src/slave/containerizer/mesos/isolators/volume/csi/isolator.hpp
@@ -36,6 +36,8 @@
 
 #include "slave/containerizer/mesos/isolator.hpp"
 
+#include "slave/containerizer/mesos/isolators/volume/csi/state.hpp"
+
 namespace mesos {
 namespace internal {
 namespace slave {
@@ -63,6 +65,21 @@ public:
       const ContainerID& containerId) override;
 
 private:
+  struct Mount
+  {
+    Volume::Source::CSIVolume csiVolume;
+    std::string target;
+    Volume::Mode volumeMode;
+  };
+
+  struct Info
+  {
+    Info (const hashset<CSIVolume>& _volumes)
+      : volumes(_volumes) {}
+
+    hashset<CSIVolume> volumes;
+  };
+
   VolumeCSIIsolatorProcess(
       const Flags& _flags,
       CSIServer* _csiServer,
@@ -72,11 +89,18 @@ private:
     csiServer(_csiServer),
     rootDir(_rootDir) {}
 
+  process::Future<Option<mesos::slave::ContainerLaunchInfo>> _prepare(
+      const ContainerID& containerId,
+      const std::vector<Mount>& mounts,
+      const std::vector<process::Future<std::string>>& futures);
+
   const Flags flags;
   CSIServer* csiServer;
 
   // CSI volume information root directory.
   const std::string rootDir;
+
+  hashmap<ContainerID, process::Owned<Info>> infos;
 };
 
 } // namespace slave {
diff --git a/src/slave/containerizer/mesos/isolators/volume/csi/state.hpp 
b/src/slave/containerizer/mesos/isolators/volume/csi/state.hpp
new file mode 100644
index 0000000..6a547d0
--- /dev/null
+++ b/src/slave/containerizer/mesos/isolators/volume/csi/state.hpp
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __VOLUME_CSI_ISOLATOR_STATE_HPP__
+#define __VOLUME_CSI_ISOLATOR_STATE_HPP__
+
+#include <string>
+
+#include <boost/functional/hash.hpp>
+
+// ONLY USEFUL AFTER RUNNING PROTOC.
+#include "slave/containerizer/mesos/isolators/volume/csi/state.pb.h"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+inline bool operator==(const CSIVolume& left, const CSIVolume& right)
+{
+  return (left.plugin_name() == right.plugin_name()) && (left.id() == 
right.id());
+}
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+
+namespace std {
+
+template <>
+struct hash<mesos::internal::slave::CSIVolume>
+{
+  typedef size_t result_type;
+
+  typedef mesos::internal::slave::CSIVolume argument_type;
+
+  result_type operator()(const argument_type& volume) const
+  {
+    size_t seed = 0;
+    boost::hash_combine(seed, std::hash<std::string>()(volume.plugin_name()));
+    boost::hash_combine(seed, std::hash<std::string>()(volume.id()));
+    return seed;
+  }
+};
+
+} // namespace std {
+
+#endif // __VOLUME_CSI_ISOLATOR_STATE_HPP__
diff --git a/src/slave/containerizer/mesos/isolators/volume/csi/state.proto 
b/src/slave/containerizer/mesos/isolators/volume/csi/state.proto
new file mode 100644
index 0000000..483fe59
--- /dev/null
+++ b/src/slave/containerizer/mesos/isolators/volume/csi/state.proto
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package mesos.internal.slave;
+
+message CSIVolume {
+  string plugin_name = 1;
+  string id = 2;
+}
+
+
+message CSIVolumes {
+  repeated CSIVolume volumes = 1;
+}

Reply via email to