Repository: mesos
Updated Branches:
  refs/heads/master f4345c1f1 -> 7b768a912


Checkpointed profiles in storage local resource provider.

SLRP now checkpoints profiles associated with storage pools, and does
not depend on the `DiskProfileAdaptor` module to return the set of
previously-known profiles during recovery.

Review: https://reviews.apache.org/r/65594/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7b768a91
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7b768a91
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7b768a91

Branch: refs/heads/master
Commit: 7b768a9120bc7e6f8dd9095d3f965d5f9b7352dd
Parents: fd49c4a
Author: Chun-Hung Hsiao <chhs...@apache.org>
Authored: Mon Apr 30 14:20:10 2018 -0700
Committer: Chun-Hung Hsiao <chhs...@mesosphere.io>
Committed: Mon Apr 30 14:59:40 2018 -0700

----------------------------------------------------------------------
 src/resource_provider/state.proto          |  22 ++++-
 src/resource_provider/storage/provider.cpp | 104 ++++++++++++------------
 2 files changed, 70 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7b768a91/src/resource_provider/state.proto
----------------------------------------------------------------------
diff --git a/src/resource_provider/state.proto 
b/src/resource_provider/state.proto
index 8577b58..eb56a91 100644
--- a/src/resource_provider/state.proto
+++ b/src/resource_provider/state.proto
@@ -16,6 +16,8 @@
 
 syntax = "proto2";
 
+import "csi.proto";
+
 import "mesos/mesos.proto";
 
 package mesos.resource_provider;
@@ -25,11 +27,25 @@ option java_outer_classname = "Protos";
 
 
 message ResourceProviderState {
-  // This includes only pending operations. Operations that have
-  // unacknowledged statuses should be recovered through the status
-  // update manager.
+  // The list of operations that have unacknowledged terminal statuses.
   repeated Operation operations = 1;
 
   // The total resources provided by this resource provider.
   repeated Resource resources = 2;
+
+  // Storage resource provider related states.
+  message Storage {
+    // Describes a manifest of CSI properties associated with a profile.
+    // See DiskProfileAdaptor::ProfileInfo.
+    message ProfileInfo {
+      required .csi.v0.VolumeCapability capability = 1;
+      map<string, string> parameters = 2;
+    }
+
+    // The set of profiles associated with any storage pool in the total
+    // resources.
+    map<string, ProfileInfo> profiles = 1;
+  }
+
+  optional Storage storage = 3;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/7b768a91/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp 
b/src/resource_provider/storage/provider.cpp
index 8001ade..d1267cf 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -345,7 +345,6 @@ private:
   Future<Nothing> recoverServices();
   Future<Nothing> recoverVolumes();
   Future<Nothing> recoverResourceProviderState();
-  Future<Nothing> recoverProfiles();
 
   void doReliableRegistration();
 
@@ -637,7 +636,6 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::recover()
   return recoverServices()
     .then(defer(self(), &Self::recoverVolumes))
     .then(defer(self(), &Self::recoverResourceProviderState))
-    .then(defer(self(), &Self::recoverProfiles))
     .then(defer(self(), [=]() -> Future<Nothing> {
       LOG(INFO)
         << "Finished recovery for resource provider with type '" << info.type()
@@ -998,63 +996,36 @@ 
StorageLocalResourceProviderProcess::recoverResourceProviderState()
       }
 
       totalResources = resourceProviderState->resources();
-    }
-  }
-
-  return Nothing();
-}
 
+      const ResourceProviderState::Storage& storage =
+        resourceProviderState->storage();
 
-// NOTE: Currently we need to recover profiles for replaying pending
-// `CREATE_VOLUME` or `CREATE_BLOCK` operations after failover. Consider
-// either checkpointing the required profiles for these calls, or
-// checkpointing CSI volume states by volume names instead of IDs.
-Future<Nothing> StorageLocalResourceProviderProcess::recoverProfiles()
-{
-  // Rebuild the set of required profiles from the checkpointed storage
-  // pools (i.e., RAW resources that have no volume ID). We do not need
-  // to resolve profiles for resources that have volume IDs, since their
-  // volume capabilities are already checkpointed.
-  hashset<string> requiredProfiles;
-  foreach (const Resource& resource, totalResources) {
-    if (!resource.disk().source().has_id()) {
-      requiredProfiles.insert(resource.disk().source().profile());
-    }
-  }
+      using ProfileEntry = google::protobuf::MapPair<
+          string, ResourceProviderState::Storage::ProfileInfo>;
 
-  // If no pending operation uses any profile, there is no need to
-  // recover any profile. Watching the DiskProfileAdaptor will be
-  // initiated later.
-  if (requiredProfiles.empty()) {
-    return Nothing();
-  }
+      foreach (const ProfileEntry& entry, storage.profiles()) {
+        profileInfos.put(
+            entry.first,
+            {entry.second.capability(), entry.second.parameters()});
+      }
 
-  LOG(INFO)
-    << "Waiting for DiskProfileAdaptor to recover profiles: "
-    << stringify(requiredProfiles);
-
-  // The DiskProfileAdapter module must at least have knowledge of
-  // the required profiles. Because the module is initialized separately
-  // from this resource provider, we must watch the module until all
-  // required profiles have been recovered.
-  return loop(
-      self(),
-      [=] { return diskProfileAdaptor->watch(knownProfiles, info); },
-      [=](const hashset<string>& profiles) -> ControlFlow<Nothing> {
-        // Save the returned set of profiles so that we can watch the
-        // module for changes to it, both in this loop and after
-        // recovery completes.
-        knownProfiles = profiles;
-
-        foreach (const string& profile, requiredProfiles) {
-          if (!knownProfiles.contains(profile)) {
-            return Continue();
-          }
+      // We only checkpoint profiles associated with storage pools (i.e.,
+      // resources without IDs) in `checkpointResourceProviderState` as only
+      // these profiles might be used by pending operations, so we validate 
here
+      // that all such profiles exist.
+      foreach (const Resource& resource, totalResources) {
+        if (!resource.disk().source().has_id() &&
+            resource.disk().source().has_profile() &&
+            !profileInfos.contains(resource.disk().source().profile())) {
+          return Failure(
+              "Cannot recover profile for storage pool '" +
+              stringify(resource) + "' from '" + statePath + "'");
         }
+      }
+    }
+  }
 
-        return Break();
-      })
-    .then(defer(self(), &Self::updateProfiles));
+  return Nothing();
 }
 
 
@@ -3281,6 +3252,33 @@ void 
StorageLocalResourceProviderProcess::checkpointResourceProviderState()
 
   state.mutable_resources()->CopyFrom(totalResources);
 
+  ResourceProviderState::Storage* storage = state.mutable_storage();
+
+  // NOTE: We only checkpoint profiles associated with any storage
+  // pool (i.e., resource that has no volume ID) in the total resources.
+  // We do not need to checkpoint profiles for resources that have
+  // volume IDs, as their volume capabilities are already checkpointed.
+  hashset<string> requiredProfiles;
+  foreach (const Resource& resource, totalResources) {
+    if (!resource.disk().source().has_id()) {
+      CHECK(resource.disk().source().has_profile());
+      requiredProfiles.insert(resource.disk().source().profile());
+    }
+  }
+
+  foreach (const string& profile, requiredProfiles) {
+    CHECK(profileInfos.contains(profile));
+
+    const DiskProfileAdaptor::ProfileInfo& profileInfo =
+      profileInfos.at(profile);
+
+    ResourceProviderState::Storage::ProfileInfo& profileInfo_ =
+      (*storage->mutable_profiles())[profile];
+
+    *profileInfo_.mutable_capability() = profileInfo.capability;
+    *profileInfo_.mutable_parameters() = profileInfo.parameters;
+  }
+
   const string statePath = slave::paths::getResourceProviderStatePath(
       metaDir, slaveId, info.type(), info.name(), info.id());
 

Reply via email to