This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 8d545180294ae705f7b8a2fe1578653107ede540
Author: Greg Mann <g...@mesosphere.io>
AuthorDate: Mon Aug 3 10:11:50 2020 -0700

    Enabled pre-provisioned volumes in the volume manager.
    
    This patch makes it possible to publish CSI volumes on an
    agent which were pre-provisioned out of band.
    
    Review: https://reviews.apache.org/r/72681/
---
 src/csi/state.proto                   |  5 ++
 src/csi/v0_volume_manager.cpp         | 95 +++++++++++++++++++++++++++--------
 src/csi/v0_volume_manager.hpp         |  5 +-
 src/csi/v0_volume_manager_process.hpp |  8 ++-
 src/csi/v1_volume_manager.cpp         | 95 +++++++++++++++++++++++++++--------
 src/csi/v1_volume_manager.hpp         |  5 +-
 src/csi/v1_volume_manager_process.hpp |  8 ++-
 src/csi/volume_manager.hpp            | 18 +++++--
 8 files changed, 190 insertions(+), 49 deletions(-)

diff --git a/src/csi/state.proto b/src/csi/state.proto
index 28ad5ef..af0ef1c 100644
--- a/src/csi/state.proto
+++ b/src/csi/state.proto
@@ -70,4 +70,9 @@ message VolumeState {
   // hence needs cleanup. If set, the resource provider MUST transition the
   // volume to `PUBLISHED` state during recovery.
   bool node_publish_required = 7;
+
+  // Indicates that the volume was not created by a CSI plugin, but rather was
+  // pre-provisioned by some other means and then attached to the node using a
+  // CSI plugin.
+  bool pre_provisioned = 8;
 }
diff --git a/src/csi/v0_volume_manager.cpp b/src/csi/v0_volume_manager.cpp
index b383598..5368440 100644
--- a/src/csi/v0_volume_manager.cpp
+++ b/src/csi/v0_volume_manager.cpp
@@ -452,8 +452,29 @@ Future<Nothing> VolumeManagerProcess::detachVolume(const 
string& volumeId)
 }
 
 
-Future<Nothing> VolumeManagerProcess::publishVolume(const string& volumeId)
+Future<Nothing> VolumeManagerProcess::publishVolume(
+    const string& volumeId,
+    const Option<VolumeState>& volumeState)
 {
+  if (volumeState.isSome()) {
+    if (!volumeState->pre_provisioned()) {
+      return Failure(
+          "Cannot specify volume state when publishing a volume unless that"
+          " volume is pre-provisioned");
+    }
+
+    if (volumeState->state() != VolumeState::VOL_READY &&
+        volumeState->state() != VolumeState::NODE_READY) {
+      return Failure(
+          "Cannot specify volume state when publishing a volume unless that"
+          " volume is in either the VOL_READY or NODE_READY state");
+    }
+
+    // This must be an untracked volume. Track it now before we continue.
+    volumes.put(volumeId, VolumeState(volumeState.get()));
+    checkpointVolumeState(volumeId);
+  }
+
   if (!volumes.contains(volumeId)) {
     return Failure("Cannot publish unknown volume '" + volumeId + "'");
   }
@@ -728,16 +749,7 @@ Future<bool> VolumeManagerProcess::_deleteVolume(const 
std::string& volumeId)
   // the future returned by the sequence ready as well.
   return __deleteVolume(volumeId)
     .then(process::defer(self(), [this, volumeId](bool deleted) {
-      volumes.erase(volumeId);
-
-      const string volumePath =
-        paths::getVolumePath(rootDir, info.type(), info.name(), volumeId);
-
-      Try<Nothing> rmdir = os::rmdir(volumePath);
-      CHECK_SOME(rmdir) << "Failed to remove checkpointed volume state at '"
-                        << volumePath << "': " << rmdir.error();
-
-      garbageCollectMountPath(volumeId);
+      removeVolume(volumeId);
 
       return deleted;
     }));
@@ -1051,6 +1063,13 @@ Future<Nothing> 
VolumeManagerProcess::_unpublishVolume(const string& volumeId)
 
   if (volumeState.state() == VolumeState::NODE_READY) {
     CHECK(volumeState.boot_id().empty());
+
+    if (volumeState.pre_provisioned()) {
+      // Since this volume was pre-provisioned, it has reached the end of its
+      // lifecycle. Remove it now.
+      removeVolume(volumeId);
+    }
+
     return Nothing();
   }
 
@@ -1063,9 +1082,16 @@ Future<Nothing> 
VolumeManagerProcess::_unpublishVolume(const string& volumeId)
   }
 
   if (!nodeCapabilities->stageUnstageVolume) {
-    // Since this is a no-op, no need to checkpoint here.
-    volumeState.set_state(VolumeState::NODE_READY);
-    volumeState.clear_boot_id();
+    if (volumeState.pre_provisioned()) {
+      // Since this volume was pre-provisioned, it has reached the end of its
+      // lifecycle. Remove it now.
+      removeVolume(volumeId);
+    } else {
+      // Since this is a no-op, no need to checkpoint here.
+      volumeState.set_state(VolumeState::NODE_READY);
+      volumeState.clear_boot_id();
+    }
+
     return Nothing();
   }
 
@@ -1091,13 +1117,20 @@ Future<Nothing> 
VolumeManagerProcess::_unpublishVolume(const string& volumeId)
   request.set_staging_target_path(stagingPath);
 
   return call(NODE_SERVICE, &Client::nodeUnstageVolume, std::move(request))
-    .then(process::defer(self(), [this, volumeId] {
+    .then(process::defer(self(), [this, volumeId, volumeState] {
       CHECK(volumes.contains(volumeId));
-      VolumeState& volumeState = volumes.at(volumeId).state;
-      volumeState.set_state(VolumeState::NODE_READY);
-      volumeState.clear_boot_id();
 
-      checkpointVolumeState(volumeId);
+      if (volumeState.pre_provisioned()) {
+        // Since this volume was pre-provisioned, it has reached the end of its
+        // lifecycle. Remove it now.
+        removeVolume(volumeId);
+      } else {
+        VolumeState& volumeState = volumes.at(volumeId).state;
+        volumeState.set_state(VolumeState::NODE_READY);
+        volumeState.clear_boot_id();
+
+        checkpointVolumeState(volumeId);
+      }
 
       return Nothing();
     }));
@@ -1188,6 +1221,21 @@ void VolumeManagerProcess::garbageCollectMountPath(const 
string& volumeId)
 }
 
 
+void VolumeManagerProcess::removeVolume(const string& volumeId)
+{
+  volumes.erase(volumeId);
+
+  const string volumePath =
+    paths::getVolumePath(rootDir, info.type(), info.name(), volumeId);
+
+  Try<Nothing> rmdir = os::rmdir(volumePath);
+  CHECK_SOME(rmdir) << "Failed to remove checkpointed volume state at '"
+                    << volumePath << "': " << rmdir.error();
+
+  garbageCollectMountPath(volumeId);
+}
+
+
 VolumeManager::VolumeManager(
     const string& rootDir,
     const CSIPluginInfo& info,
@@ -1297,11 +1345,16 @@ Future<Nothing> VolumeManager::detachVolume(const 
string& volumeId)
 }
 
 
-Future<Nothing> VolumeManager::publishVolume(const string& volumeId)
+Future<Nothing> VolumeManager::publishVolume(
+    const string& volumeId,
+    const Option<VolumeState>& volumeState)
 {
   return recovered
     .then(process::defer(
-        process.get(), &VolumeManagerProcess::publishVolume, volumeId));
+        process.get(),
+        &VolumeManagerProcess::publishVolume,
+        volumeId,
+        volumeState));
 }
 
 
diff --git a/src/csi/v0_volume_manager.hpp b/src/csi/v0_volume_manager.hpp
index 6920eb8..93183c2 100644
--- a/src/csi/v0_volume_manager.hpp
+++ b/src/csi/v0_volume_manager.hpp
@@ -37,6 +37,7 @@
 
 #include "csi/metrics.hpp"
 #include "csi/service_manager.hpp"
+#include "csi/state.hpp"
 #include "csi/volume_manager.hpp"
 
 namespace mesos {
@@ -95,7 +96,9 @@ public:
 
   process::Future<Nothing> detachVolume(const std::string& volumeId) override;
 
-  process::Future<Nothing> publishVolume(const std::string& volumeId) override;
+  process::Future<Nothing> publishVolume(
+      const std::string& volumeId,
+      const Option<state::VolumeState>& volumeState = None()) override;
 
   process::Future<Nothing> unpublishVolume(
       const std::string& volumeId) override;
diff --git a/src/csi/v0_volume_manager_process.hpp 
b/src/csi/v0_volume_manager_process.hpp
index d5a5eb2..7548c43 100644
--- a/src/csi/v0_volume_manager_process.hpp
+++ b/src/csi/v0_volume_manager_process.hpp
@@ -89,7 +89,9 @@ public:
 
   process::Future<Nothing> detachVolume(const std::string& volumeId);
 
-  process::Future<Nothing> publishVolume(const std::string& volumeId);
+  process::Future<Nothing> publishVolume(
+      const std::string& volumeId,
+      const Option<state::VolumeState>& volumeState = None());
 
   process::Future<Nothing> unpublishVolume(const std::string& volumeId);
 
@@ -167,6 +169,10 @@ private:
 
   void garbageCollectMountPath(const std::string& volumeId);
 
+  // Removes the metadata associated with a particular volume both
+  // from memory and from disk.
+  void removeVolume(const std::string& volumeId);
+
   const std::string rootDir;
   const CSIPluginInfo info;
   const hashset<Service> services;
diff --git a/src/csi/v1_volume_manager.cpp b/src/csi/v1_volume_manager.cpp
index a9b80d0..7eae638 100644
--- a/src/csi/v1_volume_manager.cpp
+++ b/src/csi/v1_volume_manager.cpp
@@ -473,8 +473,29 @@ Future<Nothing> VolumeManagerProcess::detachVolume(const 
string& volumeId)
 }
 
 
-Future<Nothing> VolumeManagerProcess::publishVolume(const string& volumeId)
+Future<Nothing> VolumeManagerProcess::publishVolume(
+    const string& volumeId,
+    const Option<VolumeState>& volumeState)
 {
+  if (volumeState.isSome()) {
+    if (!volumeState->pre_provisioned()) {
+      return Failure(
+          "Cannot specify volume state when publishing a volume unless that"
+          " volume is pre-provisioned");
+    }
+
+    if (volumeState->state() != VolumeState::VOL_READY &&
+        volumeState->state() != VolumeState::NODE_READY) {
+      return Failure(
+          "Cannot specify volume state when publishing a volume unless that"
+          " volume is in either the VOL_READY or NODE_READY state");
+    }
+
+    // This must be an untracked volume. Track it now before we continue.
+    volumes.put(volumeId, VolumeState(volumeState.get()));
+    checkpointVolumeState(volumeId);
+  }
+
   if (!volumes.contains(volumeId)) {
     return Failure("Cannot publish unknown volume '" + volumeId + "'");
   }
@@ -750,16 +771,7 @@ Future<bool> VolumeManagerProcess::_deleteVolume(const 
std::string& volumeId)
   // the future returned by the sequence ready as well.
   return __deleteVolume(volumeId)
     .then(process::defer(self(), [this, volumeId](bool deleted) {
-      volumes.erase(volumeId);
-
-      const string volumePath =
-        paths::getVolumePath(rootDir, info.type(), info.name(), volumeId);
-
-      Try<Nothing> rmdir = os::rmdir(volumePath);
-      CHECK_SOME(rmdir) << "Failed to remove checkpointed volume state at '"
-                        << volumePath << "': " << rmdir.error();
-
-      garbageCollectMountPath(volumeId);
+      removeVolume(volumeId);
 
       return deleted;
     }));
@@ -1082,6 +1094,13 @@ Future<Nothing> 
VolumeManagerProcess::_unpublishVolume(const string& volumeId)
 
   if (volumeState.state() == VolumeState::NODE_READY) {
     CHECK(volumeState.boot_id().empty());
+
+    if (volumeState.pre_provisioned()) {
+      // Since this volume was pre-provisioned, it has reached the end of its
+      // lifecycle. Remove it now.
+      removeVolume(volumeId);
+    }
+
     return Nothing();
   }
 
@@ -1094,9 +1113,16 @@ Future<Nothing> 
VolumeManagerProcess::_unpublishVolume(const string& volumeId)
   }
 
   if (!nodeCapabilities->stageUnstageVolume) {
-    // Since this is a no-op, no need to checkpoint here.
-    volumeState.set_state(VolumeState::NODE_READY);
-    volumeState.clear_boot_id();
+    if (volumeState.pre_provisioned()) {
+      // Since this volume was pre-provisioned, it has reached the end of its
+      // lifecycle. Remove it now.
+      removeVolume(volumeId);
+    } else {
+      // Since this is a no-op, no need to checkpoint here.
+      volumeState.set_state(VolumeState::NODE_READY);
+      volumeState.clear_boot_id();
+    }
+
     return Nothing();
   }
 
@@ -1122,13 +1148,20 @@ Future<Nothing> 
VolumeManagerProcess::_unpublishVolume(const string& volumeId)
   request.set_staging_target_path(stagingPath);
 
   return call(NODE_SERVICE, &Client::nodeUnstageVolume, std::move(request))
-    .then(process::defer(self(), [this, volumeId] {
+    .then(process::defer(self(), [this, volumeId, volumeState] {
       CHECK(volumes.contains(volumeId));
-      VolumeState& volumeState = volumes.at(volumeId).state;
-      volumeState.set_state(VolumeState::NODE_READY);
-      volumeState.clear_boot_id();
 
-      checkpointVolumeState(volumeId);
+      if (volumeState.pre_provisioned()) {
+        // Since this volume was pre-provisioned, it has reached the end of its
+        // lifecycle. Remove it now.
+        removeVolume(volumeId);
+      } else {
+        VolumeState& volumeState = volumes.at(volumeId).state;
+        volumeState.set_state(VolumeState::NODE_READY);
+        volumeState.clear_boot_id();
+
+        checkpointVolumeState(volumeId);
+      }
 
       return Nothing();
     }));
@@ -1222,6 +1255,21 @@ void VolumeManagerProcess::garbageCollectMountPath(const 
string& volumeId)
 }
 
 
+void VolumeManagerProcess::removeVolume(const string& volumeId)
+{
+  volumes.erase(volumeId);
+
+  const string volumePath =
+    paths::getVolumePath(rootDir, info.type(), info.name(), volumeId);
+
+  Try<Nothing> rmdir = os::rmdir(volumePath);
+  CHECK_SOME(rmdir) << "Failed to remove checkpointed volume state at '"
+                    << volumePath << "': " << rmdir.error();
+
+  garbageCollectMountPath(volumeId);
+}
+
+
 VolumeManager::VolumeManager(
     const string& rootDir,
     const CSIPluginInfo& info,
@@ -1331,11 +1379,16 @@ Future<Nothing> VolumeManager::detachVolume(const 
string& volumeId)
 }
 
 
-Future<Nothing> VolumeManager::publishVolume(const string& volumeId)
+Future<Nothing> VolumeManager::publishVolume(
+    const string& volumeId,
+    const Option<VolumeState>& volumeState)
 {
   return recovered
     .then(process::defer(
-        process.get(), &VolumeManagerProcess::publishVolume, volumeId));
+        process.get(),
+        &VolumeManagerProcess::publishVolume,
+        volumeId,
+        volumeState));
 }
 
 
diff --git a/src/csi/v1_volume_manager.hpp b/src/csi/v1_volume_manager.hpp
index 4db11b5..2f7897d 100644
--- a/src/csi/v1_volume_manager.hpp
+++ b/src/csi/v1_volume_manager.hpp
@@ -37,6 +37,7 @@
 
 #include "csi/metrics.hpp"
 #include "csi/service_manager.hpp"
+#include "csi/state.hpp"
 #include "csi/volume_manager.hpp"
 
 namespace mesos {
@@ -95,7 +96,9 @@ public:
 
   process::Future<Nothing> detachVolume(const std::string& volumeId) override;
 
-  process::Future<Nothing> publishVolume(const std::string& volumeId) override;
+  process::Future<Nothing> publishVolume(
+      const std::string& volumeId,
+      const Option<state::VolumeState>& volumeState = None()) override;
 
   process::Future<Nothing> unpublishVolume(
       const std::string& volumeId) override;
diff --git a/src/csi/v1_volume_manager_process.hpp 
b/src/csi/v1_volume_manager_process.hpp
index df52c3d..b8a1ef7 100644
--- a/src/csi/v1_volume_manager_process.hpp
+++ b/src/csi/v1_volume_manager_process.hpp
@@ -89,7 +89,9 @@ public:
 
   process::Future<Nothing> detachVolume(const std::string& volumeId);
 
-  process::Future<Nothing> publishVolume(const std::string& volumeId);
+  process::Future<Nothing> publishVolume(
+      const std::string& volumeId,
+      const Option<state::VolumeState>& volumeState = None());
 
   process::Future<Nothing> unpublishVolume(const std::string& volumeId);
 
@@ -167,6 +169,10 @@ private:
 
   void garbageCollectMountPath(const std::string& volumeId);
 
+  // Removes the metadata associated with a particular volume both
+  // from memory and from disk.
+  void removeVolume(const std::string& volumeId);
+
   const std::string rootDir;
   const CSIPluginInfo info;
   const hashset<Service> services;
diff --git a/src/csi/volume_manager.hpp b/src/csi/volume_manager.hpp
index 199cf44..57e7c51 100644
--- a/src/csi/volume_manager.hpp
+++ b/src/csi/volume_manager.hpp
@@ -38,6 +38,7 @@
 
 #include "csi/metrics.hpp"
 #include "csi/service_manager.hpp"
+#include "csi/state.hpp"
 
 namespace mesos {
 namespace csi {
@@ -126,11 +127,22 @@ public:
   virtual process::Future<Nothing> detachVolume(
       const std::string& volumeId) = 0;
 
-  // Transitions a tracked volume to `PUBLISHED` state from any state above.
+  // Transitions a volume to `PUBLISHED` state. This method may be called on
+  // tracked or untracked volumes:
+  // * If `volumeState` is NONE, then `volumeId` must correspond to a tracked
+  //   volume, and this method will transition the volume to `PUBLISHED` from
+  //   any state above.
+  // * If `volumeState` is SOME, then `volumeId` must correspond to an 
untracked
+  //   volume, and thus the ID should be unknown to the volume manager. The
+  //   volume will be tracked by the manager and will become useable on the
+  //   agent if the publish attempt succeeds.
   virtual process::Future<Nothing> publishVolume(
-      const std::string& volumeId) = 0;
+      const std::string& volumeId,
+      const Option<state::VolumeState>& volumeState = None()) = 0;
 
-  // Transitions a tracked volume to `NODE_READY` state from any state below.
+  // Transitions a tracked volume to `NODE_READY` state from any other state.
+  // If the volume was untracked when it was published (a pre-provisioned
+  // volume), then the volume's metadata is removed from the volume manager.
   virtual process::Future<Nothing> unpublishVolume(
       const std::string& volumeId) = 0;
 };

Reply via email to