This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch 1.7.x in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 2877a0070b108033644817ec1b8f99085f73ab4b Author: Chun-Hung Hsiao <chhs...@mesosphere.io> AuthorDate: Wed Jan 23 21:44:10 2019 -0800 Exposed `StorageLocalResourceProviderProcess` for testing purpose. This patch moves the declaration of the SLRP process into an internal header file and add a `__call` function, so a follow-up test could use `FUTURE_DISPATCH` to capture a dispatch on an RPC retry. To simplify the declarations, it also internalizes `RPCTraits` and introduce new type aliases, and moves `DEFAULT_CSI_RETRY_BACKOFF_FACTOR` and `DEFAULT_CSI_RETRY_INTERVAL_MAX` to the new header for testing. Review: https://reviews.apache.org/r/69827 --- src/Makefile.am | 3 +- src/csi/client.hpp | 5 +- src/csi/rpc.hpp | 11 + src/resource_provider/storage/provider.cpp | 602 ++++++--------------- src/resource_provider/storage/provider.hpp | 19 +- src/resource_provider/storage/provider_process.hpp | 420 ++++++++++++++ src/tests/csi_client_tests.cpp | 2 +- 7 files changed, 608 insertions(+), 454 deletions(-) diff --git a/src/Makefile.am b/src/Makefile.am index eb50fc7..8238c4d 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1469,7 +1469,8 @@ libmesos_no_3rdparty_la_SOURCES += \ resource_provider/storage/disk_profile_utils.cpp \ resource_provider/storage/disk_profile_utils.hpp \ resource_provider/storage/provider.cpp \ - resource_provider/storage/provider.hpp + resource_provider/storage/provider.hpp \ + resource_provider/storage/provider_process.hpp libmesos_no_3rdparty_la_CPPFLAGS = $(MESOS_CPPFLAGS) diff --git a/src/csi/client.hpp b/src/csi/client.hpp index a0dfedf..c2583cf 100644 --- a/src/csi/client.hpp +++ b/src/csi/client.hpp @@ -35,9 +35,8 @@ public: : connection(_connection), runtime(_runtime) {} template <RPC rpc> - process::Future< - Try<typename RPCTraits<rpc>::response_type, process::grpc::StatusError>> - call(typename RPCTraits<rpc>::request_type request); + process::Future<Try<Response<rpc>, process::grpc::StatusError>> call( + Request<rpc> request); private: process::grpc::client::Connection connection; diff --git a/src/csi/rpc.hpp b/src/csi/rpc.hpp index c30a509..b2502ce 100644 --- a/src/csi/rpc.hpp +++ b/src/csi/rpc.hpp @@ -52,6 +52,8 @@ enum RPC }; +namespace internal { + template <RPC> struct RPCTraits; @@ -191,6 +193,15 @@ struct RPCTraits<NODE_GET_CAPABILITIES> typedef NodeGetCapabilitiesResponse response_type; }; +} // namespace internal { + + +template <RPC rpc> +using Request = typename internal::RPCTraits<rpc>::request_type; + +template <RPC rpc> +using Response = typename internal::RPCTraits<rpc>::response_type; + std::ostream& operator<<(std::ostream& stream, const RPC& rpc); diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index 1abb9c8..c4068a5 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -19,8 +19,12 @@ #include <algorithm> #include <cctype> #include <cstdlib> +#include <functional> +#include <list> #include <memory> #include <numeric> +#include <queue> +#include <type_traits> #include <utility> #include <vector> @@ -31,6 +35,8 @@ #include <process/defer.hpp> #include <process/delay.hpp> #include <process/dispatch.hpp> +#include <process/future.hpp> +#include <process/grpc.hpp> #include <process/id.hpp> #include <process/loop.hpp> #include <process/process.hpp> @@ -41,23 +47,28 @@ #include <process/metrics/metrics.hpp> #include <process/metrics/push_gauge.hpp> +#include <mesos/http.hpp> #include <mesos/resources.hpp> #include <mesos/type_utils.hpp> #include <mesos/resource_provider/resource_provider.hpp> + #include <mesos/resource_provider/storage/disk_profile_adaptor.hpp> #include <mesos/v1/resource_provider.hpp> +#include <stout/bytes.hpp> #include <stout/duration.hpp> #include <stout/foreach.hpp> #include <stout/hashmap.hpp> #include <stout/hashset.hpp> #include <stout/linkedhashmap.hpp> +#include <stout/nothing.hpp> #include <stout/os.hpp> #include <stout/path.hpp> #include <stout/strings.hpp> #include <stout/unreachable.hpp> +#include <stout/uuid.hpp> #include <stout/os/realpath.hpp> @@ -77,6 +88,8 @@ #include "resource_provider/detector.hpp" #include "resource_provider/state.hpp" +#include "resource_provider/storage/provider_process.hpp" + #include "slave/container_daemon.hpp" #include "slave/paths.hpp" #include "slave/state.hpp" @@ -106,7 +119,7 @@ using process::Failure; using process::Future; using process::loop; using process::Owned; -using process::Process; +using process::ProcessBase; using process::Promise; using process::Sequence; using process::spawn; @@ -140,17 +153,6 @@ namespace internal { // TODO(chhsiao): Make the timeout configurable. constexpr Duration CSI_ENDPOINT_CREATION_TIMEOUT = Minutes(1); -// Storage local resource provider initially picks a random amount of time -// between `[0, b]`, where `b = DEFAULT_CSI_RETRY_BACKOFF_FACTOR`, to retry CSI -// calls related to `CREATE_DISK` or `DESTROY_DISK` operations. Subsequent -// retries are exponentially backed off based on this interval (e.g., 2nd retry -// uses a random value between `[0, b * 2^1]`, 3rd retry between `[0, b * 2^2]`, -// etc) up to a maximum of `DEFAULT_CSI_RETRY_INTERVAL_MAX`. -// -// TODO(chhsiao): Make the retry parameters configurable. -constexpr Duration DEFAULT_CSI_RETRY_BACKOFF_FACTOR = Seconds(10); -constexpr Duration DEFAULT_CSI_RETRY_INTERVAL_MAX = Minutes(10); - // Returns true if the string is a valid Java identifier. static bool isValidName(const string& s) @@ -304,273 +306,32 @@ static inline Resource createRawDiskResource( } -class StorageLocalResourceProviderProcess - : public Process<StorageLocalResourceProviderProcess> -{ -public: - explicit StorageLocalResourceProviderProcess( - const http::URL& _url, - const string& _workDir, - const ResourceProviderInfo& _info, - const SlaveID& _slaveId, - const Option<string>& _authToken, - bool _strict) - : ProcessBase(process::ID::generate("storage-local-resource-provider")), - state(RECOVERING), - url(_url), - workDir(_workDir), - metaDir(slave::paths::getMetaRootDir(_workDir)), - contentType(ContentType::PROTOBUF), - info(_info), - vendor( - info.storage().plugin().type() + "." + - info.storage().plugin().name()), - slaveId(_slaveId), - authToken(_authToken), - strict(_strict), - resourceVersion(id::UUID::random()), - sequence("storage-local-resource-provider-sequence"), - metrics("resource_providers/" + info.type() + "." + info.name() + "/") - { - diskProfileAdaptor = DiskProfileAdaptor::getAdaptor(); - CHECK_NOTNULL(diskProfileAdaptor.get()); - } - - StorageLocalResourceProviderProcess( - const StorageLocalResourceProviderProcess& other) = delete; - - StorageLocalResourceProviderProcess& operator=( - const StorageLocalResourceProviderProcess& other) = delete; - - void connected(); - void disconnected(); - void received(const Event& event); - -private: - struct VolumeData - { - VolumeData(VolumeState&& _state) - : state(_state), sequence(new Sequence("volume-sequence")) {} - - VolumeState state; - - // We run all CSI operations for the same volume on a sequence to - // ensure that they are processed in a sequential order. - Owned<Sequence> sequence; - }; - - void initialize() override; - void fatal(); - - // The recover functions are responsible to recover the state of the - // resource provider and CSI volumes from checkpointed data. - Future<Nothing> recover(); - Future<Nothing> recoverServices(); - Future<Nothing> recoverVolumes(); - Future<Nothing> recoverResourceProviderState(); - - void doReliableRegistration(); - - // The reconcile functions are responsible to reconcile the state of - // the resource provider from the recovered state and other sources of - // truth, such as CSI plugin responses or the status update manager. - Future<Nothing> reconcileResourceProviderState(); - Future<Nothing> reconcileOperationStatuses(); - ResourceConversion reconcileResources( - const Resources& checkpointed, - const Resources& discovered); - - // Spawns a loop to watch for changes in the set of known profiles and update - // the profile mapping and storage pools accordingly. - void watchProfiles(); - - // Update the profile mapping when the set of known profiles changes. - // NOTE: This function never fails. If it fails to translate a new - // profile, the resource provider will continue to operate with the - // set of profiles it knows about. - Future<Nothing> updateProfiles(const hashset<string>& profiles); - - // Reconcile the storage pools when the set of known profiles changes, - // or a volume with an unknown profile is destroyed. - Future<Nothing> reconcileStoragePools(); - - // Returns true if the storage pools are allowed to be reconciled when - // the operation is being applied. - static bool allowsReconciliation(const Offer::Operation& operation); - - // Functions for received events. - void subscribed(const Event::Subscribed& subscribed); - void applyOperation(const Event::ApplyOperation& operation); - void publishResources(const Event::PublishResources& publish); - void acknowledgeOperationStatus( - const Event::AcknowledgeOperationStatus& acknowledge); - void reconcileOperations( - const Event::ReconcileOperations& reconcile); - - // Wrapper functions to make CSI calls and update RPC metrics. - // - // The call is made asynchronously and thus no guarantee is provided on the - // order in which calls are sent. Callers need to either ensure to not have - // multiple conflicting calls in flight, or treat results idempotently. - // - // NOTE: We currently ensure this by 1) resource locking to forbid concurrent - // calls on the same volume, and 2) no profile update while there are ongoing - // `CREATE_DISK` or `DESTROY_DISK` operations. - // - // NOTE: Since this function uses `getService` to obtain the latest service - // future, which depends on probe results, it is disabled for making probe - // calls; `_call` should be used directly instead. - template < - csi::v0::RPC rpc, - typename std::enable_if<rpc != csi::v0::PROBE, int>::type = 0> - Future<typename csi::v0::RPCTraits<rpc>::response_type> call( - const ContainerID& containerId, - const typename csi::v0::RPCTraits<rpc>::request_type& request, - bool retry = false); - - template <csi::v0::RPC rpc> - Future<Try<typename csi::v0::RPCTraits<rpc>::response_type, StatusError>> - _call( - csi::v0::Client client, - const typename csi::v0::RPCTraits<rpc>::request_type& request); - - Future<csi::v0::Client> waitService(const string& endpoint); - Future<csi::v0::Client> getService(const ContainerID& containerId); - Future<hashmap<ContainerID, Option<ContainerStatus>>> getContainers(); - Future<Nothing> waitContainer(const ContainerID& containerId); - Future<Nothing> killContainer(const ContainerID& containerId); - - Future<Nothing> prepareIdentityService(); - Future<Nothing> prepareControllerService(); - Future<Nothing> prepareNodeService(); - Future<Nothing> controllerPublish(const string& volumeId); - Future<Nothing> controllerUnpublish(const string& volumeId); - Future<Nothing> nodeStage(const string& volumeId); - Future<Nothing> nodeUnstage(const string& volumeId); - Future<Nothing> nodePublish(const string& volumeId); - Future<Nothing> nodeUnpublish(const string& volumeId); - Future<string> createVolume( - const string& name, - const Bytes& capacity, - const DiskProfileAdaptor::ProfileInfo& profileInfo); - Future<bool> deleteVolume(const string& volumeId); - Future<Nothing> validateVolume( - const string& volumeId, - const Option<Labels>& metadata, - const DiskProfileAdaptor::ProfileInfo& profileInfo); - Future<Resources> listVolumes(); - Future<Resources> getCapacities(); - - Future<Nothing> _applyOperation(const id::UUID& operationUuid); - void dropOperation( - const id::UUID& operationUuid, - const Option<FrameworkID>& frameworkId, - const Option<Offer::Operation>& operation, - const string& message); - - Future<vector<ResourceConversion>> applyCreateDisk( - const Resource& resource, - const id::UUID& operationUuid, - const Resource::DiskInfo::Source::Type& targetType, - const Option<string>& targetProfile); - Future<vector<ResourceConversion>> applyDestroyDisk( - const Resource& resource); - - Try<Nothing> updateOperationStatus( - const id::UUID& operationUuid, - const Try<vector<ResourceConversion>>& conversions); - - void garbageCollectOperationPath(const id::UUID& operationUuid); - - void checkpointResourceProviderState(); - void checkpointVolumeState(const string& volumeId); - - void sendResourceProviderStateUpdate(); - - // NOTE: This is a callback for the status update manager and should - // not be called directly. - void sendOperationStatusUpdate( - const UpdateOperationStatusMessage& update); - - enum State - { - RECOVERING, - DISCONNECTED, - CONNECTED, - SUBSCRIBED, - READY - } state; - - const http::URL url; - const string workDir; - const string metaDir; - const ContentType contentType; - ResourceProviderInfo info; - const string vendor; - const SlaveID slaveId; - const Option<string> authToken; - const bool strict; - - shared_ptr<DiskProfileAdaptor> diskProfileAdaptor; - - string bootId; - process::grpc::client::Runtime runtime; - Owned<v1::resource_provider::Driver> driver; - OperationStatusUpdateManager statusUpdateManager; - - // The mapping of known profiles fetched from the DiskProfileAdaptor. - hashmap<string, DiskProfileAdaptor::ProfileInfo> profileInfos; - - hashmap<ContainerID, Owned<ContainerDaemon>> daemons; - hashmap<ContainerID, Owned<Promise<csi::v0::Client>>> services; - - Option<ContainerID> nodeContainerId; - Option<ContainerID> controllerContainerId; - Option<csi::v0::GetPluginInfoResponse> pluginInfo; - csi::v0::PluginCapabilities pluginCapabilities; - csi::v0::ControllerCapabilities controllerCapabilities; - csi::v0::NodeCapabilities nodeCapabilities; - Option<string> nodeId; - - // We maintain the following invariant: if one operation depends on - // another, they cannot be in PENDING state at the same time, i.e., - // the result of the preceding operation must have been reflected in - // the total resources. - // NOTE: We store the list of operations in a `LinkedHashMap` to - // preserve the order we receive the operations in case we need it. - LinkedHashMap<id::UUID, Operation> operations; - Resources totalResources; - id::UUID resourceVersion; - hashmap<string, VolumeData> volumes; - - // If pending, it means that the storage pools are being reconciled, and all - // incoming operations that disallow reconciliation will be dropped. - Future<Nothing> reconciled; - - // We maintain a sequence to coordinate reconciliations of storage pools. It - // keeps track of pending operations that disallow reconciliation, and ensures - // that any reconciliation waits for these operations to finish. - Sequence sequence; - - struct Metrics - { - explicit Metrics(const string& prefix); - ~Metrics(); - - // CSI plugin metrics. - Counter csi_plugin_container_terminations; - hashmap<csi::v0::RPC, PushGauge> csi_plugin_rpcs_pending; - hashmap<csi::v0::RPC, Counter> csi_plugin_rpcs_successes; - hashmap<csi::v0::RPC, Counter> csi_plugin_rpcs_errors; - hashmap<csi::v0::RPC, Counter> csi_plugin_rpcs_cancelled; - - // Operation state metrics. - hashmap<Offer::Operation::Type, PushGauge> operations_pending; - hashmap<Offer::Operation::Type, Counter> operations_finished; - hashmap<Offer::Operation::Type, Counter> operations_failed; - hashmap<Offer::Operation::Type, Counter> operations_dropped; - } metrics; -}; +StorageLocalResourceProviderProcess::StorageLocalResourceProviderProcess( + const http::URL& _url, + const string& _workDir, + const ResourceProviderInfo& _info, + const SlaveID& _slaveId, + const Option<string>& _authToken, + bool _strict) + : ProcessBase(process::ID::generate("storage-local-resource-provider")), + state(RECOVERING), + url(_url), + workDir(_workDir), + metaDir(slave::paths::getMetaRootDir(_workDir)), + contentType(ContentType::PROTOBUF), + info(_info), + vendor( + info.storage().plugin().type() + "." + info.storage().plugin().name()), + slaveId(_slaveId), + authToken(_authToken), + strict(_strict), + resourceVersion(id::UUID::random()), + sequence("storage-local-resource-provider-sequence"), + metrics("resource_providers/" + info.type() + "." + info.name() + "/") +{ + diskProfileAdaptor = DiskProfileAdaptor::getAdaptor(); + CHECK_NOTNULL(diskProfileAdaptor.get()); +} void StorageLocalResourceProviderProcess::connected() @@ -635,6 +396,121 @@ void StorageLocalResourceProviderProcess::received(const Event& event) } +template < + csi::v0::RPC rpc, + typename std::enable_if<rpc != csi::v0::PROBE, int>::type> +Future<csi::v0::Response<rpc>> StorageLocalResourceProviderProcess::call( + const ContainerID& containerId, + const csi::v0::Request<rpc>& request, + const bool retry) +{ + Duration maxBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR; + + return loop( + self(), + [=] { + // Perform the call with the latest service future. + return getService(containerId) + .then(defer( + self(), + &StorageLocalResourceProviderProcess::_call<rpc>, + lambda::_1, + request)); + }, + [=](const Try<csi::v0::Response<rpc>, StatusError>& result) mutable + -> Future<ControlFlow<csi::v0::Response<rpc>>> { + Option<Duration> backoff = retry + ? maxBackoff * (static_cast<double>(os::random()) / RAND_MAX) + : Option<Duration>::none(); + + maxBackoff = std::min(maxBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX); + + // We dispatch `__call` for testing purpose. + return dispatch( + self(), + &StorageLocalResourceProviderProcess::__call<rpc>, + result, + backoff); + }); +} + + +template <csi::v0::RPC rpc> +Future<Try<csi::v0::Response<rpc>, StatusError>> +StorageLocalResourceProviderProcess::_call( + csi::v0::Client client, const csi::v0::Request<rpc>& request) +{ + ++metrics.csi_plugin_rpcs_pending.at(rpc); + + return client.call<rpc>(request) + .onAny(defer(self(), [=]( + const Future<Try<csi::v0::Response<rpc>, StatusError>>& future) { + --metrics.csi_plugin_rpcs_pending.at(rpc); + if (future.isReady() && future->isSome()) { + ++metrics.csi_plugin_rpcs_successes.at(rpc); + } else if (future.isDiscarded()) { + ++metrics.csi_plugin_rpcs_cancelled.at(rpc); + } else { + ++metrics.csi_plugin_rpcs_errors.at(rpc); + } + })); +} + + +template <csi::v0::RPC rpc> +Future<ControlFlow<csi::v0::Response<rpc>>> +StorageLocalResourceProviderProcess::__call( + const Try<csi::v0::Response<rpc>, StatusError>& result, + const Option<Duration>& backoff) +{ + if (result.isSome()) { + return Break(result.get()); + } + + if (backoff.isNone()) { + return Failure(result.error()); + } + + // See the link below for retryable status codes: + // https://grpc.io/grpc/cpp/namespacegrpc.html#aff1730578c90160528f6a8d67ef5c43b // NOLINT + switch (result.error().status.error_code()) { + case grpc::DEADLINE_EXCEEDED: + case grpc::UNAVAILABLE: { + LOG(ERROR) + << "Received '" << result.error() << "' while calling " << rpc + << ". Retrying in " << backoff.get(); + + return after(backoff.get()) + .then([]() -> Future<ControlFlow<csi::v0::Response<rpc>>> { + return Continue(); + }); + } + case grpc::CANCELLED: + case grpc::UNKNOWN: + case grpc::INVALID_ARGUMENT: + case grpc::NOT_FOUND: + case grpc::ALREADY_EXISTS: + case grpc::PERMISSION_DENIED: + case grpc::UNAUTHENTICATED: + case grpc::RESOURCE_EXHAUSTED: + case grpc::FAILED_PRECONDITION: + case grpc::ABORTED: + case grpc::OUT_OF_RANGE: + case grpc::UNIMPLEMENTED: + case grpc::INTERNAL: + case grpc::DATA_LOSS: { + return Failure(result.error()); + } + case grpc::OK: + case grpc::DO_NOT_USE: { + UNREACHABLE(); + } + } + + UNREACHABLE(); +} + + void StorageLocalResourceProviderProcess::initialize() { Try<string> _bootId = os::bootId(); @@ -1886,112 +1762,6 @@ void StorageLocalResourceProviderProcess::reconcileOperations( } -template < - csi::v0::RPC rpc, - typename std::enable_if<rpc != csi::v0::PROBE, int>::type> -Future<typename csi::v0::RPCTraits<rpc>::response_type> -StorageLocalResourceProviderProcess::call( - const ContainerID& containerId, - const typename csi::v0::RPCTraits<rpc>::request_type& request, - bool retry) -{ - using Response = typename csi::v0::RPCTraits<rpc>::response_type; - - Duration maxBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR; - - return loop( - self(), - [=] { - // Perform the call with the latest service future. - return getService(containerId) - .then(defer( - self(), - &StorageLocalResourceProviderProcess::_call<rpc>, - lambda::_1, - request)); - }, - [=](const Try<Response, StatusError>& result) mutable - -> Future<ControlFlow<Response>> { - if (result.isSome()) { - return Break(result.get()); - } - - if (retry) { - // See the link below for retryable status codes: - // https://grpc.io/grpc/cpp/namespacegrpc.html#aff1730578c90160528f6a8d67ef5c43b // NOLINT - switch (result.error().status.error_code()) { - case grpc::DEADLINE_EXCEEDED: - case grpc::UNAVAILABLE: { - Duration delay = - maxBackoff * (static_cast<double>(os::random()) / RAND_MAX); - - maxBackoff = - std::min(maxBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX); - - LOG(ERROR) - << "Received '" << result.error() << "' while calling " << rpc - << ". Retrying in " << delay; - - return after(delay) - .then([]() -> Future<ControlFlow<Response>> { - return Continue(); - }); - } - case grpc::CANCELLED: - case grpc::UNKNOWN: - case grpc::INVALID_ARGUMENT: - case grpc::NOT_FOUND: - case grpc::ALREADY_EXISTS: - case grpc::PERMISSION_DENIED: - case grpc::UNAUTHENTICATED: - case grpc::RESOURCE_EXHAUSTED: - case grpc::FAILED_PRECONDITION: - case grpc::ABORTED: - case grpc::OUT_OF_RANGE: - case grpc::UNIMPLEMENTED: - case grpc::INTERNAL: - case grpc::DATA_LOSS: { - break; - } - case grpc::OK: - case grpc::DO_NOT_USE: { - UNREACHABLE(); - } - } - } - - return Failure(result.error()); - }); -} - - -template <csi::v0::RPC rpc> -Future<Try<typename csi::v0::RPCTraits<rpc>::response_type, StatusError>> -StorageLocalResourceProviderProcess::_call( - csi::v0::Client client, - const typename csi::v0::RPCTraits<rpc>::request_type& request) -{ - using Response = typename csi::v0::RPCTraits<rpc>::response_type; - - ++metrics.csi_plugin_rpcs_pending.at(rpc); - - return client.call<rpc>(request) - .onAny(defer(self(), [=](const Future<Try<Response, StatusError>>& future) { - --metrics.csi_plugin_rpcs_pending.at(rpc); - if (future.isReady() && future->isSome()) { - ++metrics.csi_plugin_rpcs_successes.at(rpc); - } else if (future.isDiscarded()) { - ++metrics.csi_plugin_rpcs_cancelled.at(rpc); - } else { - ++metrics.csi_plugin_rpcs_errors.at(rpc); - } - })); -} - - -// Returns a future of a CSI client that waits for the endpoint socket -// to appear if necessary, then connects to the socket and check its -// readiness. Future<csi::v0::Client> StorageLocalResourceProviderProcess::waitService( const string& endpoint) { @@ -2036,9 +1806,6 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::waitService( } -// Returns a future of the latest CSI client for the specified plugin -// container. If the container is not already running, this method will -// start a new a new container daemon. Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService( const ContainerID& containerId) { @@ -2189,9 +1956,6 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService( } -// Lists all running plugin containers for this resource provider. -// NOTE: This might return containers that are not actually running, -// e.g., if they are being destroyed. Future<hashmap<ContainerID, Option<ContainerStatus>>> StorageLocalResourceProviderProcess::getContainers() { @@ -2244,7 +2008,6 @@ StorageLocalResourceProviderProcess::getContainers() } -// Waits for the specified plugin container to be terminated. Future<Nothing> StorageLocalResourceProviderProcess::waitContainer( const ContainerID& containerId) { @@ -2271,7 +2034,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::waitContainer( } -// Kills the specified plugin container. Future<Nothing> StorageLocalResourceProviderProcess::killContainer( const ContainerID& containerId) { @@ -2323,7 +2085,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareIdentityService() } -// NOTE: This can only be called after `prepareIdentityService`. Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService() { CHECK_SOME(pluginInfo); @@ -2364,8 +2125,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareControllerService() } -// NOTE: This can only be called after `prepareIdentityService` and -// `prepareControllerService`. Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService() { CHECK_SOME(nodeContainerId); @@ -2394,11 +2153,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService() } -// Transitions the state of the specified volume from `CREATED` or -// `CONTROLLER_PUBLISH` to `NODE_READY`. -// -// NOTE: This can only be called after `prepareControllerService` and -// `prepareNodeService`. Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish( const string& volumeId) { @@ -2447,11 +2201,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish( } -// Transitions the state of the specified volume from `NODE_READY`, -// `CONTROLLER_PUBLISH` or `CONTROLLER_UNPUBLISH` to `CREATED`. -// -// NOTE: This can only be called after `prepareControllerService` and -// `prepareNodeService`. Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish( const string& volumeId) { @@ -2500,10 +2249,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::controllerUnpublish( } -// Transitions the state of the specified volume from `NODE_READY` or -// `NODE_STAGE` to `VOL_READY`. -// -// NOTE: This can only be called after `prepareNodeService`. Future<Nothing> StorageLocalResourceProviderProcess::nodeStage( const string& volumeId) { @@ -2564,10 +2309,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeStage( } -// Transitions the state of the specified volume from `VOL_READY`, `NODE_STAGE` -// or `NODE_UNSTAGE` to `NODE_READY`. -// -// NOTE: This can only be called after `prepareNodeService`. Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage( const string& volumeId) { @@ -2624,10 +2365,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnstage( } -// Transitions the state of the specified volume from `VOL_READY` or -// `NODE_PUBLISH` to `PUBLISHED`. -// -// NOTE: This can only be called after `prepareNodeService`. Future<Nothing> StorageLocalResourceProviderProcess::nodePublish( const string& volumeId) { @@ -2691,10 +2428,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodePublish( } -// Transitions the state of the specified volume from `PUBLISHED`, -// `NODE_PUBLISH` or `NODE_UNPUBLISH` to `VOL_READY`. -// -// NOTE: This can only be called after `prepareNodeService`. Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish( const string& volumeId) { @@ -2747,9 +2480,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish( } -// Returns a CSI volume ID. -// -// NOTE: This can only be called after `prepareControllerService`. Future<string> StorageLocalResourceProviderProcess::createVolume( const string& name, const Bytes& capacity, @@ -2795,10 +2525,6 @@ Future<string> StorageLocalResourceProviderProcess::createVolume( } -// Returns true if the volume has been deprovisioned. -// -// NOTE: This can only be called after `prepareControllerService` and -// `prepareNodeService` (since it may require `NodeUnpublishVolume`). Future<bool> StorageLocalResourceProviderProcess::deleteVolume( const string& volumeId) { @@ -2898,12 +2624,6 @@ Future<bool> StorageLocalResourceProviderProcess::deleteVolume( } -// Validates if a volume supports the capability of the specified profile. -// -// NOTE: This can only be called after `prepareIdentityService`. -// -// TODO(chhsiao): Validate the volume against the parameters of the profile once -// we get CSI v1. Future<Nothing> StorageLocalResourceProviderProcess::validateVolume( const string& volumeId, const Option<Labels>& metadata, @@ -2937,6 +2657,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::validateVolume( volumeAttributes = CHECK_NOTERROR(convertLabelsToStringMap(metadata.get())); } + // TODO(chhsiao): Validate the volume against the parameters of the profile + // once we get CSI v1. csi::v0::ValidateVolumeCapabilitiesRequest request; request.set_volume_id(volumeId); *request.add_volume_capabilities() = profileInfo.capability; @@ -2969,8 +2691,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::validateVolume( } -// NOTE: This can only be called after `prepareControllerService` and -// the resource provider ID has been obtained. Future<Resources> StorageLocalResourceProviderProcess::listVolumes() { CHECK(info.has_id()); @@ -3019,8 +2739,6 @@ Future<Resources> StorageLocalResourceProviderProcess::listVolumes() } -// NOTE: This can only be called after `prepareControllerService` and -// the resource provider ID has been obtained. Future<Resources> StorageLocalResourceProviderProcess::getCapacities() { CHECK(info.has_id()); @@ -3062,8 +2780,6 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities() } -// Applies the operation. Speculative operations will be synchronously -// applied. Do nothing if the operation is already in a terminal state. Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation( const id::UUID& operationUuid) { @@ -3154,8 +2870,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation( } -// Sends `OPERATION_DROPPED` without checkpointing the status of -// the operation. void StorageLocalResourceProviderProcess::dropOperation( const id::UUID& operationUuid, const Option<FrameworkID>& frameworkId, @@ -3382,8 +3096,6 @@ StorageLocalResourceProviderProcess::applyDestroyDisk( } -// Synchronously updates `totalResources` and the operation status and -// then asks the status update manager to send status updates. Try<Nothing> StorageLocalResourceProviderProcess::updateOperationStatus( const id::UUID& operationUuid, const Try<vector<ResourceConversion>>& conversions) diff --git a/src/resource_provider/storage/provider.hpp b/src/resource_provider/storage/provider.hpp index 331f7b7..ccd09df 100644 --- a/src/resource_provider/storage/provider.hpp +++ b/src/resource_provider/storage/provider.hpp @@ -17,6 +17,17 @@ #ifndef __RESOURCE_PROVIDER_STORAGE_PROVIDER_HPP__ #define __RESOURCE_PROVIDER_STORAGE_PROVIDER_HPP__ +#include <string> + +#include <mesos/mesos.hpp> + +#include <process/http.hpp> +#include <process/owned.hpp> + +#include <stout/error.hpp> +#include <stout/option.hpp> +#include <stout/try.hpp> + #include "resource_provider/local.hpp" namespace mesos { @@ -32,15 +43,15 @@ public: static Try<process::Owned<LocalResourceProvider>> create( const process::http::URL& url, const std::string& workDir, - const mesos::ResourceProviderInfo& info, + const ResourceProviderInfo& info, const SlaveID& slaveId, const Option<std::string>& authToken, bool strict); static Try<process::http::authentication::Principal> principal( - const mesos::ResourceProviderInfo& info); + const ResourceProviderInfo& info); - static Option<Error> validate(const mesos::ResourceProviderInfo& info); + static Option<Error> validate(const ResourceProviderInfo& info); ~StorageLocalResourceProvider() override; @@ -54,7 +65,7 @@ private: explicit StorageLocalResourceProvider( const process::http::URL& url, const std::string& workDir, - const mesos::ResourceProviderInfo& info, + const ResourceProviderInfo& info, const SlaveID& slaveId, const Option<std::string>& authToken, bool strict); diff --git a/src/resource_provider/storage/provider_process.hpp b/src/resource_provider/storage/provider_process.hpp new file mode 100644 index 0000000..36187fb --- /dev/null +++ b/src/resource_provider/storage/provider_process.hpp @@ -0,0 +1,420 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__ +#define __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__ + +#include <functional> +#include <memory> +#include <string> +#include <type_traits> +#include <vector> + +#include <mesos/http.hpp> +#include <mesos/mesos.hpp> +#include <mesos/resources.hpp> + +#include <mesos/resource_provider/resource_provider.hpp> + +#include <mesos/resource_provider/storage/disk_profile_adaptor.hpp> + +#include <mesos/v1/resource_provider.hpp> + +#include <process/future.hpp> +#include <process/grpc.hpp> +#include <process/http.hpp> +#include <process/loop.hpp> +#include <process/owned.hpp> +#include <process/process.hpp> +#include <process/sequence.hpp> + +#include <process/metrics/counter.hpp> +#include <process/metrics/push_gauge.hpp> + +#include <stout/bytes.hpp> +#include <stout/duration.hpp> +#include <stout/hashset.hpp> +#include <stout/linkedhashmap.hpp> +#include <stout/nothing.hpp> +#include <stout/option.hpp> +#include <stout/try.hpp> +#include <stout/uuid.hpp> + +#include "csi/client.hpp" +#include "csi/rpc.hpp" +#include "csi/state.hpp" +#include "csi/utils.hpp" + +#include "slave/container_daemon.hpp" + +#include "status_update_manager/operation.hpp" + +namespace mesos { +namespace internal { + +// Storage local resource provider initially picks a random amount of time +// between `[0, b]`, where `b = DEFAULT_CSI_RETRY_BACKOFF_FACTOR`, to retry CSI +// calls related to `CREATE_DISK` or `DESTROY_DISK` operations. Subsequent +// retries are exponentially backed off based on this interval (e.g., 2nd retry +// uses a random value between `[0, b * 2^1]`, 3rd retry between `[0, b * 2^2]`, +// etc) up to a maximum of `DEFAULT_CSI_RETRY_INTERVAL_MAX`. +// +// TODO(chhsiao): Make the retry parameters configurable. +constexpr Duration DEFAULT_CSI_RETRY_BACKOFF_FACTOR = Seconds(10); +constexpr Duration DEFAULT_CSI_RETRY_INTERVAL_MAX = Minutes(10); + + +class StorageLocalResourceProviderProcess + : public process::Process<StorageLocalResourceProviderProcess> +{ +public: + explicit StorageLocalResourceProviderProcess( + const process::http::URL& _url, + const std::string& _workDir, + const ResourceProviderInfo& _info, + const SlaveID& _slaveId, + const Option<std::string>& _authToken, + bool _strict); + + StorageLocalResourceProviderProcess( + const StorageLocalResourceProviderProcess& other) = delete; + + StorageLocalResourceProviderProcess& operator=( + const StorageLocalResourceProviderProcess& other) = delete; + + void connected(); + void disconnected(); + void received(const resource_provider::Event& event); + + // Wrapper functions to make CSI calls and update RPC metrics. Made public for + // testing purpose. + // + // The call is made asynchronously and thus no guarantee is provided on the + // order in which calls are sent. Callers need to either ensure to not have + // multiple conflicting calls in flight, or treat results idempotently. + // + // NOTE: We currently ensure this by 1) resource locking to forbid concurrent + // calls on the same volume, and 2) no profile update while there are ongoing + // `CREATE_DISK` or `DESTROY_DISK` operations. + // + // NOTE: Since this function uses `getService` to obtain the latest service + // future, which depends on probe results, it is disabled for making probe + // calls; `_call` should be used directly instead. + template < + csi::v0::RPC rpc, + typename std::enable_if<rpc != csi::v0::PROBE, int>::type = 0> + process::Future<csi::v0::Response<rpc>> call( + const ContainerID& containerId, + const csi::v0::Request<rpc>& request, + const bool retry = false); // remains const in a mutable lambda. + + template <csi::v0::RPC rpc> + process::Future<Try<csi::v0::Response<rpc>, process::grpc::StatusError>> + _call(csi::v0::Client client, const csi::v0::Request<rpc>& request); + + template <csi::v0::RPC rpc> + process::Future<process::ControlFlow<csi::v0::Response<rpc>>> __call( + const Try<csi::v0::Response<rpc>, process::grpc::StatusError>& result, + const Option<Duration>& backoff); + +private: + struct VolumeData + { + VolumeData(csi::state::VolumeState&& _state) + : state(_state), sequence(new process::Sequence("volume-sequence")) {} + + csi::state::VolumeState state; + + // We run all CSI operations for the same volume on a sequence to + // ensure that they are processed in a sequential order. + process::Owned<process::Sequence> sequence; + }; + + void initialize() override; + void fatal(); + + // The recover functions are responsible to recover the state of the + // resource provider and CSI volumes from checkpointed data. + process::Future<Nothing> recover(); + process::Future<Nothing> recoverServices(); + process::Future<Nothing> recoverVolumes(); + process::Future<Nothing> recoverResourceProviderState(); + + void doReliableRegistration(); + + // The reconcile functions are responsible to reconcile the state of + // the resource provider from the recovered state and other sources of + // truth, such as CSI plugin responses or the status update manager. + process::Future<Nothing> reconcileResourceProviderState(); + process::Future<Nothing> reconcileOperationStatuses(); + ResourceConversion reconcileResources( + const Resources& checkpointed, + const Resources& discovered); + + // Spawns a loop to watch for changes in the set of known profiles and update + // the profile mapping and storage pools accordingly. + void watchProfiles(); + + // Update the profile mapping when the set of known profiles changes. + // NOTE: This function never fails. If it fails to translate a new + // profile, the resource provider will continue to operate with the + // set of profiles it knows about. + process::Future<Nothing> updateProfiles(const hashset<std::string>& profiles); + + // Reconcile the storage pools when the set of known profiles changes, + // or a volume with an unknown profile is destroyed. + process::Future<Nothing> reconcileStoragePools(); + + // Returns true if the storage pools are allowed to be reconciled when + // the operation is being applied. + static bool allowsReconciliation(const Offer::Operation& operation); + + // Functions for received events. + void subscribed(const resource_provider::Event::Subscribed& subscribed); + void applyOperation( + const resource_provider::Event::ApplyOperation& operation); + void publishResources( + const resource_provider::Event::PublishResources& publish); + void acknowledgeOperationStatus( + const resource_provider::Event::AcknowledgeOperationStatus& acknowledge); + void reconcileOperations( + const resource_provider::Event::ReconcileOperations& reconcile); + + // Returns a future of a CSI client that waits for the endpoint socket to + // appear if necessary, then connects to the socket and check its readiness. + process::Future<csi::v0::Client> waitService(const std::string& endpoint); + + // Returns a future of the latest CSI client for the specified plugin + // container. If the container is not already running, this method will start + // a new a new container daemon. + process::Future<csi::v0::Client> getService(const ContainerID& containerId); + + // Lists all running plugin containers for this resource provider. + // NOTE: This might return containers that are not actually running, e.g., if + // they are being destroyed. + process::Future<hashmap<ContainerID, Option<ContainerStatus>>> + getContainers(); + + // Waits for the specified plugin container to be terminated. + process::Future<Nothing> waitContainer(const ContainerID& containerId); + + // Kills the specified plugin container. + process::Future<Nothing> killContainer(const ContainerID& containerId); + + process::Future<Nothing> prepareIdentityService(); + + // NOTE: This can only be called after `prepareIdentityService`. + process::Future<Nothing> prepareControllerService(); + + // NOTE: This can only be called after `prepareIdentityService` and + // `prepareControllerService`. + process::Future<Nothing> prepareNodeService(); + + // Transitions the state of the specified volume from `CREATED` or + // `CONTROLLER_PUBLISH` to `NODE_READY`. + // + // NOTE: This can only be called after `prepareControllerService` and + // `prepareNodeService`. + process::Future<Nothing> controllerPublish(const std::string& volumeId); + + // Transitions the state of the specified volume from `NODE_READY`, + // `CONTROLLER_PUBLISH` or `CONTROLLER_UNPUBLISH` to `CREATED`. + // + // NOTE: This can only be called after `prepareControllerService` and + // `prepareNodeService`. + process::Future<Nothing> controllerUnpublish(const std::string& volumeId); + + // Transitions the state of the specified volume from `NODE_READY` or + // `NODE_STAGE` to `VOL_READY`. + // + // NOTE: This can only be called after `prepareNodeService`. + process::Future<Nothing> nodeStage(const std::string& volumeId); + + // Transitions the state of the specified volume from `VOL_READY`, + // `NODE_STAGE` or `NODE_UNSTAGE` to `NODE_READY`. + // + // NOTE: This can only be called after `prepareNodeService`. + process::Future<Nothing> nodeUnstage(const std::string& volumeId); + + // Transitions the state of the specified volume from `VOL_READY` or + // `NODE_PUBLISH` to `PUBLISHED`. + // + // NOTE: This can only be called after `prepareNodeService`. + process::Future<Nothing> nodePublish(const std::string& volumeId); + + // Transitions the state of the specified volume from `PUBLISHED`, + // `NODE_PUBLISH` or `NODE_UNPUBLISH` to `VOL_READY`. + // + // NOTE: This can only be called after `prepareNodeService`. + process::Future<Nothing> nodeUnpublish(const std::string& volumeId); + + // Returns a CSI volume ID. + // + // NOTE: This can only be called after `prepareControllerService`. + process::Future<std::string> createVolume( + const std::string& name, + const Bytes& capacity, + const DiskProfileAdaptor::ProfileInfo& profileInfo); + + // Returns true if the volume has been deprovisioned. + // + // NOTE: This can only be called after `prepareControllerService` and + // `prepareNodeService` (since it may require `NodeUnpublishVolume`). + process::Future<bool> deleteVolume(const std::string& volumeId); + + // Validates if a volume supports the capability of the specified profile. + // + // NOTE: This can only be called after `prepareIdentityService`. + process::Future<Nothing> validateVolume( + const std::string& volumeId, + const Option<Labels>& metadata, + const DiskProfileAdaptor::ProfileInfo& profileInfo); + + // NOTE: This can only be called after `prepareControllerService` and the + // resource provider ID has been obtained. + process::Future<Resources> listVolumes(); + + // NOTE: This can only be called after `prepareControllerService` and the + // resource provider ID has been obtained. + process::Future<Resources> getCapacities(); + + // Applies the operation. Speculative operations will be synchronously + // applied. Do nothing if the operation is already in a terminal state. + process::Future<Nothing> _applyOperation(const id::UUID& operationUuid); + + // Sends `OPERATION_DROPPED` without checkpointing the operation status. + void dropOperation( + const id::UUID& operationUuid, + const Option<FrameworkID>& frameworkId, + const Option<Offer::Operation>& operation, + const std::string& message); + + process::Future<std::vector<ResourceConversion>> applyCreateDisk( + const Resource& resource, + const id::UUID& operationUuid, + const Resource::DiskInfo::Source::Type& targetType, + const Option<std::string>& targetProfile); + process::Future<std::vector<ResourceConversion>> applyDestroyDisk( + const Resource& resource); + + // Synchronously updates `totalResources` and the operation status and + // then asks the status update manager to send status updates. + Try<Nothing> updateOperationStatus( + const id::UUID& operationUuid, + const Try<std::vector<ResourceConversion>>& conversions); + + void garbageCollectOperationPath(const id::UUID& operationUuid); + + void checkpointResourceProviderState(); + void checkpointVolumeState(const std::string& volumeId); + + void sendResourceProviderStateUpdate(); + + // NOTE: This is a callback for the status update manager and should + // not be called directly. + void sendOperationStatusUpdate( + const UpdateOperationStatusMessage& update); + + enum State + { + RECOVERING, + DISCONNECTED, + CONNECTED, + SUBSCRIBED, + READY + } state; + + const process::http::URL url; + const std::string workDir; + const std::string metaDir; + const ContentType contentType; + ResourceProviderInfo info; + const std::string vendor; + const SlaveID slaveId; + const Option<std::string> authToken; + const bool strict; + + std::shared_ptr<DiskProfileAdaptor> diskProfileAdaptor; + + std::string bootId; + process::grpc::client::Runtime runtime; + process::Owned<v1::resource_provider::Driver> driver; + OperationStatusUpdateManager statusUpdateManager; + + // The mapping of known profiles fetched from the DiskProfileAdaptor. + hashmap<std::string, DiskProfileAdaptor::ProfileInfo> profileInfos; + + hashmap<ContainerID, process::Owned<slave::ContainerDaemon>> daemons; + hashmap<ContainerID, process::Owned<process::Promise<csi::v0::Client>>> + services; + + Option<ContainerID> nodeContainerId; + Option<ContainerID> controllerContainerId; + Option<csi::v0::GetPluginInfoResponse> pluginInfo; + csi::v0::PluginCapabilities pluginCapabilities; + csi::v0::ControllerCapabilities controllerCapabilities; + csi::v0::NodeCapabilities nodeCapabilities; + Option<std::string> nodeId; + + // We maintain the following invariant: if one operation depends on + // another, they cannot be in PENDING state at the same time, i.e., + // the result of the preceding operation must have been reflected in + // the total resources. + // + // NOTE: We store the list of operations in a `LinkedHashMap` to + // preserve the order we receive the operations in case we need it. + LinkedHashMap<id::UUID, Operation> operations; + Resources totalResources; + id::UUID resourceVersion; + hashmap<std::string, VolumeData> volumes; + + // If pending, it means that the storage pools are being reconciled, and all + // incoming operations that disallow reconciliation will be dropped. + process::Future<Nothing> reconciled; + + // We maintain a sequence to coordinate reconciliations of storage pools. It + // keeps track of pending operations that disallow reconciliation, and ensures + // that any reconciliation waits for these operations to finish. + process::Sequence sequence; + + struct Metrics + { + explicit Metrics(const std::string& prefix); + ~Metrics(); + + // CSI plugin metrics. + process::metrics::Counter csi_plugin_container_terminations; + hashmap<csi::v0::RPC, process::metrics::PushGauge> csi_plugin_rpcs_pending; + hashmap<csi::v0::RPC, process::metrics::Counter> csi_plugin_rpcs_successes; + hashmap<csi::v0::RPC, process::metrics::Counter> csi_plugin_rpcs_errors; + hashmap<csi::v0::RPC, process::metrics::Counter> csi_plugin_rpcs_cancelled; + + // Operation state metrics. + hashmap<Offer::Operation::Type, process::metrics::PushGauge> + operations_pending; + hashmap<Offer::Operation::Type, process::metrics::Counter> + operations_finished; + hashmap<Offer::Operation::Type, process::metrics::Counter> + operations_failed; + hashmap<Offer::Operation::Type, process::metrics::Counter> + operations_dropped; + } metrics; +}; + +} // namespace internal { +} // namespace mesos { + +#endif // __RESOURCE_PROVIDER_STORAGE_PROVIDER_PROCESS_HPP__ diff --git a/src/tests/csi_client_tests.cpp b/src/tests/csi_client_tests.cpp index 7751636..c8f3f04 100644 --- a/src/tests/csi_client_tests.cpp +++ b/src/tests/csi_client_tests.cpp @@ -63,7 +63,7 @@ struct RPCParam rpc, [](csi::v0::Client client) { return client - .call<rpc>(typename csi::v0::RPCTraits<rpc>::request_type()) + .call<rpc>(csi::v0::Request<rpc>()) .then([] { return Nothing(); }); } };