This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch 1.7.x in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 7c7a874121413453294a4f0b4f06a08115b1b202 Author: Chun-Hung Hsiao <chhs...@mesosphere.io> AuthorDate: Tue Jan 22 23:00:43 2019 -0800 Implemented the RPC retry logic for SLRP. For CSI calls triggered by offer operations, i.e., `CreateVolume` and `DeleteVolume`, if the plugin returns retryable errors (`UNAVAILABLE` or `DEADLINE_EXCEEDED`), SLRP will now retry indefinitely with a random exponential backoff. With this, frameworks will know that the operations are terminated with deterministic volume states when getting `OPERATION_FAILED`. Review: https://reviews.apache.org/r/69812 --- src/csi/client.cpp | 292 ++++++++++------------------- src/csi/client.hpp | 85 ++++----- src/resource_provider/storage/provider.cpp | 134 ++++++++++--- 3 files changed, 254 insertions(+), 257 deletions(-) diff --git a/src/csi/client.cpp b/src/csi/client.cpp index 61ed410..9e17f5b 100644 --- a/src/csi/client.cpp +++ b/src/csi/client.cpp @@ -30,308 +30,222 @@ namespace csi { namespace v0 { template <> -Future<GetPluginInfoResponse> -Client::call<GET_PLUGIN_INFO>( - GetPluginInfoRequest request) +Future<Try<GetPluginInfoResponse, process::grpc::StatusError>> +Client::call<GET_PLUGIN_INFO>(GetPluginInfoRequest request) { - return runtime - .call( - connection, - GRPC_CLIENT_METHOD(Identity, GetPluginInfo), - std::move(request), - CallOptions()) - .then([](const Try<GetPluginInfoResponse, StatusError>& result) - -> Future<GetPluginInfoResponse> { - return result; - }); + return runtime.call( + connection, + GRPC_CLIENT_METHOD(Identity, GetPluginInfo), + std::move(request), + CallOptions()); } template <> -Future<GetPluginCapabilitiesResponse> +Future<Try<GetPluginCapabilitiesResponse, process::grpc::StatusError>> Client::call<GET_PLUGIN_CAPABILITIES>( GetPluginCapabilitiesRequest request) { - return runtime - .call( - connection, - GRPC_CLIENT_METHOD(Identity, GetPluginCapabilities), - std::move(request), - CallOptions()) - .then([](const Try<GetPluginCapabilitiesResponse, StatusError>& result) - -> Future<GetPluginCapabilitiesResponse> { - return result; - }); + return runtime.call( + connection, + GRPC_CLIENT_METHOD(Identity, GetPluginCapabilities), + std::move(request), + CallOptions()); } template <> -Future<ProbeResponse> +Future<Try<ProbeResponse, process::grpc::StatusError>> Client::call<PROBE>( ProbeRequest request) { - return runtime - .call( - connection, - GRPC_CLIENT_METHOD(Identity, Probe), - std::move(request), - CallOptions()) - .then([](const Try<ProbeResponse, StatusError>& result) - -> Future<ProbeResponse> { - return result; - }); + return runtime.call( + connection, + GRPC_CLIENT_METHOD(Identity, Probe), + std::move(request), + CallOptions()); } template <> -Future<CreateVolumeResponse> +Future<Try<CreateVolumeResponse, process::grpc::StatusError>> Client::call<CREATE_VOLUME>( CreateVolumeRequest request) { - return runtime - .call( - connection, - GRPC_CLIENT_METHOD(Controller, CreateVolume), - std::move(request), - CallOptions()) - .then([](const Try<CreateVolumeResponse, StatusError>& result) - -> Future<CreateVolumeResponse> { - return result; - }); + return runtime.call( + connection, + GRPC_CLIENT_METHOD(Controller, CreateVolume), + std::move(request), + CallOptions()); } template <> -Future<DeleteVolumeResponse> +Future<Try<DeleteVolumeResponse, process::grpc::StatusError>> Client::call<DELETE_VOLUME>( DeleteVolumeRequest request) { - return runtime - .call( - connection, - GRPC_CLIENT_METHOD(Controller, DeleteVolume), - std::move(request), - CallOptions()) - .then([](const Try<DeleteVolumeResponse, StatusError>& result) - -> Future<DeleteVolumeResponse> { - return result; - }); + return runtime.call( + connection, + GRPC_CLIENT_METHOD(Controller, DeleteVolume), + std::move(request), + CallOptions()); } template <> -Future<ControllerPublishVolumeResponse> +Future<Try<ControllerPublishVolumeResponse, process::grpc::StatusError>> Client::call<CONTROLLER_PUBLISH_VOLUME>( ControllerPublishVolumeRequest request) { - return runtime - .call( - connection, - GRPC_CLIENT_METHOD(Controller, ControllerPublishVolume), - std::move(request), - CallOptions()) - .then([](const Try<ControllerPublishVolumeResponse, StatusError>& result) - -> Future<ControllerPublishVolumeResponse> { - return result; - }); + return runtime.call( + connection, + GRPC_CLIENT_METHOD(Controller, ControllerPublishVolume), + std::move(request), + CallOptions()); } template <> -Future<ControllerUnpublishVolumeResponse> +Future<Try<ControllerUnpublishVolumeResponse, process::grpc::StatusError>> Client::call<CONTROLLER_UNPUBLISH_VOLUME>( ControllerUnpublishVolumeRequest request) { - return runtime - .call( - connection, - GRPC_CLIENT_METHOD(Controller, ControllerUnpublishVolume), - std::move(request), - CallOptions()) - .then([](const Try<ControllerUnpublishVolumeResponse, StatusError>& result) - -> Future<ControllerUnpublishVolumeResponse> { - return result; - }); + return runtime.call( + connection, + GRPC_CLIENT_METHOD(Controller, ControllerUnpublishVolume), + std::move(request), + CallOptions()); } template <> -Future<ValidateVolumeCapabilitiesResponse> +Future<Try<ValidateVolumeCapabilitiesResponse, process::grpc::StatusError>> Client::call<VALIDATE_VOLUME_CAPABILITIES>( ValidateVolumeCapabilitiesRequest request) { - return runtime - .call( - connection, - GRPC_CLIENT_METHOD(Controller, ValidateVolumeCapabilities), - std::move(request), - CallOptions()) - .then([](const Try<ValidateVolumeCapabilitiesResponse, StatusError>& result) - -> Future<ValidateVolumeCapabilitiesResponse> { - return result; - }); + return runtime.call( + connection, + GRPC_CLIENT_METHOD(Controller, ValidateVolumeCapabilities), + std::move(request), + CallOptions()); } template <> -Future<ListVolumesResponse> +Future<Try<ListVolumesResponse, process::grpc::StatusError>> Client::call<LIST_VOLUMES>( ListVolumesRequest request) { - return runtime - .call( - connection, - GRPC_CLIENT_METHOD(Controller, ListVolumes), - std::move(request), - CallOptions()) - .then([](const Try<ListVolumesResponse, StatusError>& result) - -> Future<ListVolumesResponse> { - return result; - }); + return runtime.call( + connection, + GRPC_CLIENT_METHOD(Controller, ListVolumes), + std::move(request), + CallOptions()); } template <> -Future<GetCapacityResponse> +Future<Try<GetCapacityResponse, process::grpc::StatusError>> Client::call<GET_CAPACITY>( GetCapacityRequest request) { - return runtime - .call( - connection, - GRPC_CLIENT_METHOD(Controller, GetCapacity), - std::move(request), - CallOptions()) - .then([](const Try<GetCapacityResponse, StatusError>& result) - -> Future<GetCapacityResponse> { - return result; - }); + return runtime.call( + connection, + GRPC_CLIENT_METHOD(Controller, GetCapacity), + std::move(request), + CallOptions()); } template <> -Future<ControllerGetCapabilitiesResponse> +Future<Try<ControllerGetCapabilitiesResponse, process::grpc::StatusError>> Client::call<CONTROLLER_GET_CAPABILITIES>( ControllerGetCapabilitiesRequest request) { - return runtime - .call( - connection, - GRPC_CLIENT_METHOD(Controller, ControllerGetCapabilities), - std::move(request), - CallOptions()) - .then([](const Try<ControllerGetCapabilitiesResponse, StatusError>& result) - -> Future<ControllerGetCapabilitiesResponse> { - return result; - }); + return runtime.call( + connection, + GRPC_CLIENT_METHOD(Controller, ControllerGetCapabilities), + std::move(request), + CallOptions()); } template <> -Future<NodeStageVolumeResponse> +Future<Try<NodeStageVolumeResponse, process::grpc::StatusError>> Client::call<NODE_STAGE_VOLUME>( NodeStageVolumeRequest request) { - return runtime - .call( - connection, - GRPC_CLIENT_METHOD(Node, NodeStageVolume), - std::move(request), - CallOptions()) - .then([](const Try<NodeStageVolumeResponse, StatusError>& result) - -> Future<NodeStageVolumeResponse> { - return result; - }); + return runtime.call( + connection, + GRPC_CLIENT_METHOD(Node, NodeStageVolume), + std::move(request), + CallOptions()); } template <> -Future<NodeUnstageVolumeResponse> +Future<Try<NodeUnstageVolumeResponse, process::grpc::StatusError>> Client::call<NODE_UNSTAGE_VOLUME>( NodeUnstageVolumeRequest request) { - return runtime - .call( - connection, - GRPC_CLIENT_METHOD(Node, NodeUnstageVolume), - std::move(request), - CallOptions()) - .then([](const Try<NodeUnstageVolumeResponse, StatusError>& result) - -> Future<NodeUnstageVolumeResponse> { - return result; - }); + return runtime.call( + connection, + GRPC_CLIENT_METHOD(Node, NodeUnstageVolume), + std::move(request), + CallOptions()); } template <> -Future<NodePublishVolumeResponse> +Future<Try<NodePublishVolumeResponse, process::grpc::StatusError>> Client::call<NODE_PUBLISH_VOLUME>( NodePublishVolumeRequest request) { - return runtime - .call( - connection, - GRPC_CLIENT_METHOD(Node, NodePublishVolume), - std::move(request), - CallOptions()) - .then([](const Try<NodePublishVolumeResponse, StatusError>& result) - -> Future<NodePublishVolumeResponse> { - return result; - }); + return runtime.call( + connection, + GRPC_CLIENT_METHOD(Node, NodePublishVolume), + std::move(request), + CallOptions()); } template <> -Future<NodeUnpublishVolumeResponse> +Future<Try<NodeUnpublishVolumeResponse, process::grpc::StatusError>> Client::call<NODE_UNPUBLISH_VOLUME>( NodeUnpublishVolumeRequest request) { - return runtime - .call( - connection, - GRPC_CLIENT_METHOD(Node, NodeUnpublishVolume), - std::move(request), - CallOptions()) - .then([](const Try<NodeUnpublishVolumeResponse, StatusError>& result) - -> Future<NodeUnpublishVolumeResponse> { - return result; - }); + return runtime.call( + connection, + GRPC_CLIENT_METHOD(Node, NodeUnpublishVolume), + std::move(request), + CallOptions()); } template <> -Future<NodeGetIdResponse> +Future<Try<NodeGetIdResponse, process::grpc::StatusError>> Client::call<NODE_GET_ID>( NodeGetIdRequest request) { - return runtime - .call( - connection, - GRPC_CLIENT_METHOD(Node, NodeGetId), - std::move(request), - CallOptions()) - .then([](const Try<NodeGetIdResponse, StatusError>& result) - -> Future<NodeGetIdResponse> { - return result; - }); + return runtime.call( + connection, + GRPC_CLIENT_METHOD(Node, NodeGetId), + std::move(request), + CallOptions()); } template <> -Future<NodeGetCapabilitiesResponse> +Future<Try<NodeGetCapabilitiesResponse, process::grpc::StatusError>> Client::call<NODE_GET_CAPABILITIES>( NodeGetCapabilitiesRequest request) { - return runtime - .call( - connection, - GRPC_CLIENT_METHOD(Node, NodeGetCapabilities), - std::move(request), - CallOptions()) - .then([](const Try<NodeGetCapabilitiesResponse, StatusError>& result) - -> Future<NodeGetCapabilitiesResponse> { - return result; - }); + return runtime.call( + connection, + GRPC_CLIENT_METHOD(Node, NodeGetCapabilities), + std::move(request), + CallOptions()); } } // namespace v0 { diff --git a/src/csi/client.hpp b/src/csi/client.hpp index 5d40d54..a0dfedf 100644 --- a/src/csi/client.hpp +++ b/src/csi/client.hpp @@ -35,8 +35,9 @@ public: : connection(_connection), runtime(_runtime) {} template <RPC rpc> - process::Future<typename RPCTraits<rpc>::response_type> call( - typename RPCTraits<rpc>::request_type request); + process::Future< + Try<typename RPCTraits<rpc>::response_type, process::grpc::StatusError>> + call(typename RPCTraits<rpc>::request_type request); private: process::grpc::client::Connection connection; @@ -45,105 +46,95 @@ private: template <> -process::Future<GetPluginInfoResponse> -Client::call<GET_PLUGIN_INFO>( - GetPluginInfoRequest request); +process::Future<Try<GetPluginInfoResponse, process::grpc::StatusError>> +Client::call<GET_PLUGIN_INFO>(GetPluginInfoRequest request); template <> -process::Future<GetPluginCapabilitiesResponse> -Client::call<GET_PLUGIN_CAPABILITIES>( - GetPluginCapabilitiesRequest request); +process::Future<Try<GetPluginCapabilitiesResponse, process::grpc::StatusError>> +Client::call<GET_PLUGIN_CAPABILITIES>(GetPluginCapabilitiesRequest request); template <> -process::Future<ProbeResponse> -Client::call<PROBE>( - ProbeRequest request); +process::Future<Try<ProbeResponse, process::grpc::StatusError>> +Client::call<PROBE>(ProbeRequest request); template <> -process::Future<CreateVolumeResponse> -Client::call<CREATE_VOLUME>( - CreateVolumeRequest request); +process::Future<Try<CreateVolumeResponse, process::grpc::StatusError>> +Client::call<CREATE_VOLUME>(CreateVolumeRequest request); template <> -process::Future<DeleteVolumeResponse> -Client::call<DELETE_VOLUME>( - DeleteVolumeRequest request); +process::Future<Try<DeleteVolumeResponse, process::grpc::StatusError>> +Client::call<DELETE_VOLUME>(DeleteVolumeRequest request); template <> -process::Future<ControllerPublishVolumeResponse> -Client::call<CONTROLLER_PUBLISH_VOLUME>( - ControllerPublishVolumeRequest request); +process::Future< + Try<ControllerPublishVolumeResponse, process::grpc::StatusError>> +Client::call<CONTROLLER_PUBLISH_VOLUME>(ControllerPublishVolumeRequest request); template <> -process::Future<ControllerUnpublishVolumeResponse> +process::Future< + Try<ControllerUnpublishVolumeResponse, process::grpc::StatusError>> Client::call<CONTROLLER_UNPUBLISH_VOLUME>( ControllerUnpublishVolumeRequest request); template <> -process::Future<ValidateVolumeCapabilitiesResponse> +process::Future< + Try<ValidateVolumeCapabilitiesResponse, process::grpc::StatusError>> Client::call<VALIDATE_VOLUME_CAPABILITIES>( ValidateVolumeCapabilitiesRequest request); template <> -process::Future<ListVolumesResponse> -Client::call<LIST_VOLUMES>( - ListVolumesRequest request); +process::Future<Try<ListVolumesResponse, process::grpc::StatusError>> +Client::call<LIST_VOLUMES>(ListVolumesRequest request); template <> -process::Future<GetCapacityResponse> -Client::call<GET_CAPACITY>( - GetCapacityRequest request); +process::Future<Try<GetCapacityResponse, process::grpc::StatusError>> +Client::call<GET_CAPACITY>(GetCapacityRequest request); template <> -process::Future<ControllerGetCapabilitiesResponse> +process::Future< + Try<ControllerGetCapabilitiesResponse, process::grpc::StatusError>> Client::call<CONTROLLER_GET_CAPABILITIES>( ControllerGetCapabilitiesRequest request); template <> -process::Future<NodeStageVolumeResponse> -Client::call<NODE_STAGE_VOLUME>( - NodeStageVolumeRequest request); +process::Future<Try<NodeStageVolumeResponse, process::grpc::StatusError>> +Client::call<NODE_STAGE_VOLUME>(NodeStageVolumeRequest request); template <> -process::Future<NodeUnstageVolumeResponse> -Client::call<NODE_UNSTAGE_VOLUME>( - NodeUnstageVolumeRequest request); +process::Future<Try<NodeUnstageVolumeResponse, process::grpc::StatusError>> +Client::call<NODE_UNSTAGE_VOLUME>(NodeUnstageVolumeRequest request); template <> -process::Future<NodePublishVolumeResponse> -Client::call<NODE_PUBLISH_VOLUME>( - NodePublishVolumeRequest request); +process::Future<Try<NodePublishVolumeResponse, process::grpc::StatusError>> +Client::call<NODE_PUBLISH_VOLUME>(NodePublishVolumeRequest request); template <> -process::Future<NodeUnpublishVolumeResponse> -Client::call<NODE_UNPUBLISH_VOLUME>( - NodeUnpublishVolumeRequest request); +process::Future<Try<NodeUnpublishVolumeResponse, process::grpc::StatusError>> +Client::call<NODE_UNPUBLISH_VOLUME>(NodeUnpublishVolumeRequest request); template <> -process::Future<NodeGetIdResponse> -Client::call<NODE_GET_ID>( - NodeGetIdRequest request); +process::Future<Try<NodeGetIdResponse, process::grpc::StatusError>> +Client::call<NODE_GET_ID>(NodeGetIdRequest request); template <> -process::Future<NodeGetCapabilitiesResponse> -Client::call<NODE_GET_CAPABILITIES>( - NodeGetCapabilitiesRequest request); +process::Future<Try<NodeGetCapabilitiesResponse, process::grpc::StatusError>> +Client::call<NODE_GET_CAPABILITIES>(NodeGetCapabilitiesRequest request); } // namespace v0 { } // namespace csi { diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index f9f9312..1abb9c8 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -18,6 +18,7 @@ #include <algorithm> #include <cctype> +#include <cstdlib> #include <memory> #include <numeric> #include <utility> @@ -48,6 +49,7 @@ #include <mesos/v1/resource_provider.hpp> +#include <stout/duration.hpp> #include <stout/foreach.hpp> #include <stout/hashmap.hpp> #include <stout/hashset.hpp> @@ -55,6 +57,7 @@ #include <stout/os.hpp> #include <stout/path.hpp> #include <stout/strings.hpp> +#include <stout/unreachable.hpp> #include <stout/os/realpath.hpp> @@ -109,6 +112,8 @@ using process::Sequence; using process::spawn; using process::Timeout; +using process::grpc::StatusError; + using process::http::authentication::Principal; using process::metrics::Counter; @@ -130,6 +135,23 @@ using mesos::v1::resource_provider::Driver; namespace mesos { namespace internal { +// Timeout for a CSI plugin component to create its endpoint socket. +// +// TODO(chhsiao): Make the timeout configurable. +constexpr Duration CSI_ENDPOINT_CREATION_TIMEOUT = Minutes(1); + +// Storage local resource provider initially picks a random amount of time +// between `[0, b]`, where `b = DEFAULT_CSI_RETRY_BACKOFF_FACTOR`, to retry CSI +// calls related to `CREATE_DISK` or `DESTROY_DISK` operations. Subsequent +// retries are exponentially backed off based on this interval (e.g., 2nd retry +// uses a random value between `[0, b * 2^1]`, 3rd retry between `[0, b * 2^2]`, +// etc) up to a maximum of `DEFAULT_CSI_RETRY_INTERVAL_MAX`. +// +// TODO(chhsiao): Make the retry parameters configurable. +constexpr Duration DEFAULT_CSI_RETRY_BACKOFF_FACTOR = Seconds(10); +constexpr Duration DEFAULT_CSI_RETRY_INTERVAL_MAX = Minutes(10); + + // Returns true if the string is a valid Java identifier. static bool isValidName(const string& s) { @@ -164,11 +186,6 @@ static bool isValidType(const string& s) } -// Timeout for a CSI plugin component to create its endpoint socket. -// TODO(chhsiao): Make the timeout configurable. -static const Duration CSI_ENDPOINT_CREATION_TIMEOUT = Minutes(1); - - // Returns the container ID of the standalone container to run a CSI plugin // component. The container ID is of the following format: // <cid_prefix><csi_type>-<csi_name>--<list_of_services> @@ -408,10 +425,12 @@ private: typename std::enable_if<rpc != csi::v0::PROBE, int>::type = 0> Future<typename csi::v0::RPCTraits<rpc>::response_type> call( const ContainerID& containerId, - const typename csi::v0::RPCTraits<rpc>::request_type& request); + const typename csi::v0::RPCTraits<rpc>::request_type& request, + bool retry = false); template <csi::v0::RPC rpc> - Future<typename csi::v0::RPCTraits<rpc>::response_type> _call( + Future<Try<typename csi::v0::RPCTraits<rpc>::response_type, StatusError>> + _call( csi::v0::Client client, const typename csi::v0::RPCTraits<rpc>::request_type& request); @@ -1873,32 +1892,98 @@ template < Future<typename csi::v0::RPCTraits<rpc>::response_type> StorageLocalResourceProviderProcess::call( const ContainerID& containerId, - const typename csi::v0::RPCTraits<rpc>::request_type& request) + const typename csi::v0::RPCTraits<rpc>::request_type& request, + bool retry) { - // Get the latest service future before making the call. - return getService(containerId) - .then(defer(self(), &Self::_call<rpc>, lambda::_1, request)); + using Response = typename csi::v0::RPCTraits<rpc>::response_type; + + Duration maxBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR; + + return loop( + self(), + [=] { + // Perform the call with the latest service future. + return getService(containerId) + .then(defer( + self(), + &StorageLocalResourceProviderProcess::_call<rpc>, + lambda::_1, + request)); + }, + [=](const Try<Response, StatusError>& result) mutable + -> Future<ControlFlow<Response>> { + if (result.isSome()) { + return Break(result.get()); + } + + if (retry) { + // See the link below for retryable status codes: + // https://grpc.io/grpc/cpp/namespacegrpc.html#aff1730578c90160528f6a8d67ef5c43b // NOLINT + switch (result.error().status.error_code()) { + case grpc::DEADLINE_EXCEEDED: + case grpc::UNAVAILABLE: { + Duration delay = + maxBackoff * (static_cast<double>(os::random()) / RAND_MAX); + + maxBackoff = + std::min(maxBackoff * 2, DEFAULT_CSI_RETRY_INTERVAL_MAX); + + LOG(ERROR) + << "Received '" << result.error() << "' while calling " << rpc + << ". Retrying in " << delay; + + return after(delay) + .then([]() -> Future<ControlFlow<Response>> { + return Continue(); + }); + } + case grpc::CANCELLED: + case grpc::UNKNOWN: + case grpc::INVALID_ARGUMENT: + case grpc::NOT_FOUND: + case grpc::ALREADY_EXISTS: + case grpc::PERMISSION_DENIED: + case grpc::UNAUTHENTICATED: + case grpc::RESOURCE_EXHAUSTED: + case grpc::FAILED_PRECONDITION: + case grpc::ABORTED: + case grpc::OUT_OF_RANGE: + case grpc::UNIMPLEMENTED: + case grpc::INTERNAL: + case grpc::DATA_LOSS: { + break; + } + case grpc::OK: + case grpc::DO_NOT_USE: { + UNREACHABLE(); + } + } + } + + return Failure(result.error()); + }); } template <csi::v0::RPC rpc> -Future<typename csi::v0::RPCTraits<rpc>::response_type> +Future<Try<typename csi::v0::RPCTraits<rpc>::response_type, StatusError>> StorageLocalResourceProviderProcess::_call( csi::v0::Client client, const typename csi::v0::RPCTraits<rpc>::request_type& request) { + using Response = typename csi::v0::RPCTraits<rpc>::response_type; + ++metrics.csi_plugin_rpcs_pending.at(rpc); return client.call<rpc>(request) - .onAny(defer(self(), [=]( - const Future<typename csi::v0::RPCTraits<rpc>::response_type>& future) { + .onAny(defer(self(), [=](const Future<Try<Response, StatusError>>& future) { --metrics.csi_plugin_rpcs_pending.at(rpc); - if (future.isReady()) { + if (future.isReady() && future->isSome()) { ++metrics.csi_plugin_rpcs_successes.at(rpc); - } else if (future.isFailed()) { - ++metrics.csi_plugin_rpcs_errors.at(rpc); - } else { + } else if (future.isDiscarded()) { ++metrics.csi_plugin_rpcs_cancelled.at(rpc); + } else { + ++metrics.csi_plugin_rpcs_errors.at(rpc); } })); } @@ -1939,7 +2024,14 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::waitService( return service .then(defer(self(), [=](csi::v0::Client client) { return _call<csi::v0::PROBE>(client, csi::v0::ProbeRequest()) - .then([=]() -> csi::v0::Client { return client; }); + .then([=](const Try<csi::v0::ProbeResponse, StatusError>& result) + -> Future<csi::v0::Client> { + if (result.isError()) { + return Failure(result.error()); + } + + return client; + }); })); } @@ -2678,7 +2770,7 @@ Future<string> StorageLocalResourceProviderProcess::createVolume( CHECK_SOME(controllerContainerId); return call<csi::v0::CREATE_VOLUME>( - controllerContainerId.get(), std::move(request)) + controllerContainerId.get(), std::move(request), true) // Retry. .then(defer(self(), [=]( const csi::v0::CreateVolumeResponse& response) -> string { const csi::v0::Volume& volume = response.volume(); @@ -2770,7 +2862,7 @@ Future<bool> StorageLocalResourceProviderProcess::deleteVolume( CHECK_SOME(controllerContainerId); return call<csi::v0::DELETE_VOLUME>( - controllerContainerId.get(), std::move(request)) + controllerContainerId.get(), std::move(request), true) // Retry. .then([] { return Nothing(); }); })); }