This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 16bcf61231b6d14019b1d703d887c55b01b85aee Author: Chun-Hung Hsiao <chhs...@mesosphere.io> AuthorDate: Fri Oct 12 15:12:41 2018 -0700 Synced SLRP checkpoints to the filesystem. Currently if a system crashes, SLRP checkpoints might not be synced to the filesystem, so it is possible that an old or empty checkpoint will be read upon recovery. Moreover, if a CSI call has been issued right before the crash, the recovered state may be inconsistent with the actual state reported by the plugin. For example, the plugin might have created a volume but the checkpointed state does not know about it. To avoid this inconsistency, we always call fsync() when checkpointing SLRP states. Review: https://reviews.apache.org/r/69010 --- src/resource_provider/storage/provider.cpp | 102 ++++++++++++++++++----------- src/slave/state.hpp | 34 ++++++---- 2 files changed, 83 insertions(+), 53 deletions(-) diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index db783b5..025b13b 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -21,6 +21,7 @@ #include <memory> #include <numeric> #include <utility> +#include <vector> #include <glog/logging.h> @@ -28,6 +29,7 @@ #include <process/collect.hpp> #include <process/defer.hpp> #include <process/delay.hpp> +#include <process/dispatch.hpp> #include <process/id.hpp> #include <process/loop.hpp> #include <process/process.hpp> @@ -88,23 +90,24 @@ using std::shared_ptr; using std::string; using std::vector; +using process::after; +using process::await; using process::Break; +using process::collect; using process::Continue; using process::ControlFlow; +using process::defer; +using process::delay; +using process::dispatch; using process::Failure; using process::Future; +using process::loop; using process::Owned; using process::Process; using process::Promise; using process::Sequence; -using process::Timeout; - -using process::after; -using process::await; -using process::collect; -using process::defer; -using process::loop; using process::spawn; +using process::Timeout; using process::http::authentication::Principal; @@ -425,6 +428,8 @@ private: const id::UUID& operationUuid, const Try<vector<ResourceConversion>>& conversions); + void garbageCollectOperationPath(const id::UUID& operationUuid); + void checkpointResourceProviderState(); void checkpointVolumeState(const string& volumeId); @@ -1154,7 +1159,16 @@ StorageLocalResourceProviderProcess::reconcileOperationStatuses() uuid.error()); } - CHECK(operations.contains(uuid.get())); + // NOTE: This could happen if we failed to remove the operation path before. + if (!operations.contains(uuid.get())) { + LOG(WARNING) + << "Ignoring unknown operation (uuid: " << uuid.get() + << ") for resource provider " << info.id(); + + garbageCollectOperationPath(uuid.get()); + continue; + } + operationUuids.emplace_back(std::move(uuid.get())); } @@ -1165,27 +1179,23 @@ StorageLocalResourceProviderProcess::reconcileOperationStatuses() using StreamState = typename OperationStatusUpdateManagerState::StreamState; - // Clean up the operations that are terminated. + // Clean up the operations that are completed. + vector<id::UUID> completedOperations; foreachpair (const id::UUID& uuid, const Option<StreamState>& stream, statusUpdateManagerState.streams) { if (stream.isSome() && stream->terminated) { operations.erase(uuid); - - // Garbage collect the operation metadata. - const string path = slave::paths::getOperationPath( - slave::paths::getResourceProviderPath( - metaDir, slaveId, info.type(), info.name(), info.id()), - uuid); - - Try<Nothing> rmdir = os::rmdir(path); - if (rmdir.isError()) { - return Failure( - "Failed to remove directory '" + path + "': " + rmdir.error()); - } + completedOperations.push_back(uuid); } } + // Garbage collect the operation streams after checkpointing. + checkpointResourceProviderState(); + foreach (const id::UUID& uuid, completedOperations) { + garbageCollectOperationPath(uuid); + } + // Send updates for all missing statuses. foreachpair (const id::UUID& uuid, const Operation& operation, @@ -1790,25 +1800,11 @@ void StorageLocalResourceProviderProcess::acknowledgeOperationStatus( // acknowledgement will be received. In this case, the following call // will fail, so we just leave an error log. statusUpdateManager.acknowledgement(operationUuid.get(), statusUuid.get()) - .then(defer(self(), [=](bool continuation) -> Future<Nothing> { + .then(defer(self(), [=](bool continuation) { if (!continuation) { operations.erase(operationUuid.get()); - - // Garbage collect the operation metadata. - const string path = slave::paths::getOperationPath( - slave::paths::getResourceProviderPath( - metaDir, slaveId, info.type(), info.name(), info.id()), - operationUuid.get()); - - // NOTE: We check if the path exists since we do not checkpoint - // some status updates, such as OPERATION_DROPPED. - if (os::exists(path)) { - Try<Nothing> rmdir = os::rmdir(path); - if (rmdir.isError()) { - return Failure( - "Failed to remove directory '" + path + "': " + rmdir.error()); - } - } + checkpointResourceProviderState(); + garbageCollectOperationPath(operationUuid.get()); } return Nothing(); @@ -3436,6 +3432,28 @@ Try<Nothing> StorageLocalResourceProviderProcess::updateOperationStatus( } +void StorageLocalResourceProviderProcess::garbageCollectOperationPath( + const id::UUID& operationUuid) +{ + CHECK(!operations.contains(operationUuid)); + + const string path = slave::paths::getOperationPath( + slave::paths::getResourceProviderPath( + metaDir, slaveId, info.type(), info.name(), info.id()), + operationUuid); + + // NOTE: We check if the path exists since we do not checkpoint some status + // updates, such as OPERATION_DROPPED. + if (os::exists(path)) { + Try<Nothing> rmdir = os::rmdir(path); + if (rmdir.isError()) { + LOG(ERROR) + << "Failed to remove directory '" << path << "': " << rmdir.error(); + } + } +} + + void StorageLocalResourceProviderProcess::checkpointResourceProviderState() { ResourceProviderState state; @@ -3476,7 +3494,9 @@ void StorageLocalResourceProviderProcess::checkpointResourceProviderState() const string statePath = slave::paths::getResourceProviderStatePath( metaDir, slaveId, info.type(), info.name(), info.id()); - Try<Nothing> checkpoint = slave::state::checkpoint(statePath, state); + // NOTE: We ensure the checkpoint is synced to the filesystem to avoid + // resulting in a stale or empty checkpoint when a system crash happens. + Try<Nothing> checkpoint = slave::state::checkpoint(statePath, state, true); CHECK_SOME(checkpoint) << "Failed to checkpoint resource provider state to '" << statePath << "': " << checkpoint.error(); @@ -3492,8 +3512,10 @@ void StorageLocalResourceProviderProcess::checkpointVolumeState( info.storage().plugin().name(), volumeId); + // NOTE: We ensure the checkpoint is synced to the filesystem to avoid + // resulting in a stale or empty checkpoint when a system crash happens. Try<Nothing> checkpoint = - slave::state::checkpoint(statePath, volumes.at(volumeId).state); + slave::state::checkpoint(statePath, volumes.at(volumeId).state, true); CHECK_SOME(checkpoint) << "Failed to checkpoint volume state to '" << statePath << "':" diff --git a/src/slave/state.hpp b/src/slave/state.hpp index 003211e..4f3d4ce 100644 --- a/src/slave/state.hpp +++ b/src/slave/state.hpp @@ -122,9 +122,10 @@ namespace internal { inline Try<Nothing> checkpoint( const std::string& path, - const std::string& message) + const std::string& message, + bool sync) { - return ::os::write(path, message); + return ::os::write(path, message, sync); } @@ -133,7 +134,7 @@ template < typename std::enable_if< std::is_convertible<T*, google::protobuf::Message*>::value, int>::type = 0> -inline Try<Nothing> checkpoint(const std::string& path, T message) +inline Try<Nothing> checkpoint(const std::string& path, T message, bool sync) { // If the `Try` from `downgradeResources` returns an `Error`, we currently // continue to checkpoint the resources in a partially downgraded state. @@ -144,13 +145,14 @@ inline Try<Nothing> checkpoint(const std::string& path, T message) // TODO(mpark): Do something smarter with the result once // something like an agent recovery capability is introduced. downgradeResources(&message); - return ::protobuf::write(path, message); + return ::protobuf::write(path, message, sync); } inline Try<Nothing> checkpoint( const std::string& path, - google::protobuf::RepeatedPtrField<Resource> resources) + google::protobuf::RepeatedPtrField<Resource> resources, + bool sync) { // If the `Try` from `downgradeResources` returns an `Error`, we currently // continue to checkpoint the resources in a partially downgraded state. @@ -161,16 +163,17 @@ inline Try<Nothing> checkpoint( // TODO(mpark): Do something smarter with the result once // something like an agent recovery capability is introduced. downgradeResources(&resources); - return ::protobuf::write(path, resources); + return ::protobuf::write(path, resources, sync); } inline Try<Nothing> checkpoint( const std::string& path, - const Resources& resources) + const Resources& resources, + bool sync) { const google::protobuf::RepeatedPtrField<Resource>& messages = resources; - return checkpoint(path, messages); + return checkpoint(path, messages, sync); } } // namespace internal { @@ -187,14 +190,19 @@ inline Try<Nothing> checkpoint( // // NOTE: We provide atomic (all-or-nothing) semantics here by always // writing to a temporary file first then using os::rename to atomically -// move it to the desired path. +// move it to the desired path. If `sync` is set to true, this call succeeds +// only if `fsync` is supported and successfully commits the changes to the +// filesystem for the checkpoint file and each created directory. +// +// TODO(chhsiao): Consider enabling syncing by default after evaluating its +// performance impact. template <typename T> -Try<Nothing> checkpoint(const std::string& path, const T& t) +Try<Nothing> checkpoint(const std::string& path, const T& t, bool sync = false) { // Create the base directory. std::string base = Path(path).dirname(); - Try<Nothing> mkdir = os::mkdir(base); + Try<Nothing> mkdir = os::mkdir(base, true, sync); if (mkdir.isError()) { return Error("Failed to create directory '" + base + "': " + mkdir.error()); } @@ -211,7 +219,7 @@ Try<Nothing> checkpoint(const std::string& path, const T& t) } // Now checkpoint the instance of T to the temporary file. - Try<Nothing> checkpoint = internal::checkpoint(temp.get(), t); + Try<Nothing> checkpoint = internal::checkpoint(temp.get(), t, sync); if (checkpoint.isError()) { // Try removing the temporary file on error. os::rm(temp.get()); @@ -221,7 +229,7 @@ Try<Nothing> checkpoint(const std::string& path, const T& t) } // Rename the temporary file to the path. - Try<Nothing> rename = os::rename(temp.get(), path); + Try<Nothing> rename = os::rename(temp.get(), path, sync); if (rename.isError()) { // Try removing the temporary file on error. os::rm(temp.get());