Repository: mesos Updated Branches: refs/heads/master f4345c1f1 -> 7b768a912
Checkpointed profiles in storage local resource provider. SLRP now checkpoints profiles associated with storage pools, and does not depend on the `DiskProfileAdaptor` module to return the set of previously-known profiles during recovery. Review: https://reviews.apache.org/r/65594/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7b768a91 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7b768a91 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7b768a91 Branch: refs/heads/master Commit: 7b768a9120bc7e6f8dd9095d3f965d5f9b7352dd Parents: fd49c4a Author: Chun-Hung Hsiao <chhs...@apache.org> Authored: Mon Apr 30 14:20:10 2018 -0700 Committer: Chun-Hung Hsiao <chhs...@mesosphere.io> Committed: Mon Apr 30 14:59:40 2018 -0700 ---------------------------------------------------------------------- src/resource_provider/state.proto | 22 ++++- src/resource_provider/storage/provider.cpp | 104 ++++++++++++------------ 2 files changed, 70 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/7b768a91/src/resource_provider/state.proto ---------------------------------------------------------------------- diff --git a/src/resource_provider/state.proto b/src/resource_provider/state.proto index 8577b58..eb56a91 100644 --- a/src/resource_provider/state.proto +++ b/src/resource_provider/state.proto @@ -16,6 +16,8 @@ syntax = "proto2"; +import "csi.proto"; + import "mesos/mesos.proto"; package mesos.resource_provider; @@ -25,11 +27,25 @@ option java_outer_classname = "Protos"; message ResourceProviderState { - // This includes only pending operations. Operations that have - // unacknowledged statuses should be recovered through the status - // update manager. + // The list of operations that have unacknowledged terminal statuses. repeated Operation operations = 1; // The total resources provided by this resource provider. repeated Resource resources = 2; + + // Storage resource provider related states. + message Storage { + // Describes a manifest of CSI properties associated with a profile. + // See DiskProfileAdaptor::ProfileInfo. + message ProfileInfo { + required .csi.v0.VolumeCapability capability = 1; + map<string, string> parameters = 2; + } + + // The set of profiles associated with any storage pool in the total + // resources. + map<string, ProfileInfo> profiles = 1; + } + + optional Storage storage = 3; } http://git-wip-us.apache.org/repos/asf/mesos/blob/7b768a91/src/resource_provider/storage/provider.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index 8001ade..d1267cf 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -345,7 +345,6 @@ private: Future<Nothing> recoverServices(); Future<Nothing> recoverVolumes(); Future<Nothing> recoverResourceProviderState(); - Future<Nothing> recoverProfiles(); void doReliableRegistration(); @@ -637,7 +636,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover() return recoverServices() .then(defer(self(), &Self::recoverVolumes)) .then(defer(self(), &Self::recoverResourceProviderState)) - .then(defer(self(), &Self::recoverProfiles)) .then(defer(self(), [=]() -> Future<Nothing> { LOG(INFO) << "Finished recovery for resource provider with type '" << info.type() @@ -998,63 +996,36 @@ StorageLocalResourceProviderProcess::recoverResourceProviderState() } totalResources = resourceProviderState->resources(); - } - } - - return Nothing(); -} + const ResourceProviderState::Storage& storage = + resourceProviderState->storage(); -// NOTE: Currently we need to recover profiles for replaying pending -// `CREATE_VOLUME` or `CREATE_BLOCK` operations after failover. Consider -// either checkpointing the required profiles for these calls, or -// checkpointing CSI volume states by volume names instead of IDs. -Future<Nothing> StorageLocalResourceProviderProcess::recoverProfiles() -{ - // Rebuild the set of required profiles from the checkpointed storage - // pools (i.e., RAW resources that have no volume ID). We do not need - // to resolve profiles for resources that have volume IDs, since their - // volume capabilities are already checkpointed. - hashset<string> requiredProfiles; - foreach (const Resource& resource, totalResources) { - if (!resource.disk().source().has_id()) { - requiredProfiles.insert(resource.disk().source().profile()); - } - } + using ProfileEntry = google::protobuf::MapPair< + string, ResourceProviderState::Storage::ProfileInfo>; - // If no pending operation uses any profile, there is no need to - // recover any profile. Watching the DiskProfileAdaptor will be - // initiated later. - if (requiredProfiles.empty()) { - return Nothing(); - } + foreach (const ProfileEntry& entry, storage.profiles()) { + profileInfos.put( + entry.first, + {entry.second.capability(), entry.second.parameters()}); + } - LOG(INFO) - << "Waiting for DiskProfileAdaptor to recover profiles: " - << stringify(requiredProfiles); - - // The DiskProfileAdapter module must at least have knowledge of - // the required profiles. Because the module is initialized separately - // from this resource provider, we must watch the module until all - // required profiles have been recovered. - return loop( - self(), - [=] { return diskProfileAdaptor->watch(knownProfiles, info); }, - [=](const hashset<string>& profiles) -> ControlFlow<Nothing> { - // Save the returned set of profiles so that we can watch the - // module for changes to it, both in this loop and after - // recovery completes. - knownProfiles = profiles; - - foreach (const string& profile, requiredProfiles) { - if (!knownProfiles.contains(profile)) { - return Continue(); - } + // We only checkpoint profiles associated with storage pools (i.e., + // resources without IDs) in `checkpointResourceProviderState` as only + // these profiles might be used by pending operations, so we validate here + // that all such profiles exist. + foreach (const Resource& resource, totalResources) { + if (!resource.disk().source().has_id() && + resource.disk().source().has_profile() && + !profileInfos.contains(resource.disk().source().profile())) { + return Failure( + "Cannot recover profile for storage pool '" + + stringify(resource) + "' from '" + statePath + "'"); } + } + } + } - return Break(); - }) - .then(defer(self(), &Self::updateProfiles)); + return Nothing(); } @@ -3281,6 +3252,33 @@ void StorageLocalResourceProviderProcess::checkpointResourceProviderState() state.mutable_resources()->CopyFrom(totalResources); + ResourceProviderState::Storage* storage = state.mutable_storage(); + + // NOTE: We only checkpoint profiles associated with any storage + // pool (i.e., resource that has no volume ID) in the total resources. + // We do not need to checkpoint profiles for resources that have + // volume IDs, as their volume capabilities are already checkpointed. + hashset<string> requiredProfiles; + foreach (const Resource& resource, totalResources) { + if (!resource.disk().source().has_id()) { + CHECK(resource.disk().source().has_profile()); + requiredProfiles.insert(resource.disk().source().profile()); + } + } + + foreach (const string& profile, requiredProfiles) { + CHECK(profileInfos.contains(profile)); + + const DiskProfileAdaptor::ProfileInfo& profileInfo = + profileInfos.at(profile); + + ResourceProviderState::Storage::ProfileInfo& profileInfo_ = + (*storage->mutable_profiles())[profile]; + + *profileInfo_.mutable_capability() = profileInfo.capability; + *profileInfo_.mutable_parameters() = profileInfo.parameters; + } + const string statePath = slave::paths::getResourceProviderStatePath( metaDir, slaveId, info.type(), info.name(), info.id());