pitrou commented on a change in pull request #11594:
URL: https://github.com/apache/arrow/pull/11594#discussion_r741752805



##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -563,6 +566,97 @@ class S3Client : public Aws::S3::S3Client {
     req.SetBucket(ToAwsString(bucket));
     return GetBucketRegion(req);
   }
+
+  S3Model::CompleteMultipartUploadOutcome 
CompleteMultipartUploadWithErrorFixup(
+      S3Model::CompleteMultipartUploadRequest&& request) const {
+    // CompletedMultipartUpload can return a 200 OK response with an error
+    // encoded in the response body, in which case we should either retry
+    // or propagate the error to the user (see
+    // 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html).
+    //
+    // Unfortunately the AWS SDK doesn't detect such situations but lets them
+    // return successfully (see https://github.com/aws/aws-sdk-cpp/issues/658).
+    //
+    // We work around the issue by registering a DataReceivedEventHandler
+    // which parses the XML response for embedded errors.
+
+    util::optional<AWSError<Aws::Client::CoreErrors>> aws_error;
+
+    auto handler = [&](const Aws::Http::HttpRequest* http_req,
+                       Aws::Http::HttpResponse* http_resp,
+                       long long) {  // NOLINT runtime/int
+      auto& stream = http_resp->GetResponseBody();
+      const auto pos = stream.tellg();
+      const auto doc = 
Aws::Utils::Xml::XmlDocument::CreateFromXmlStream(stream);
+      // Rewind stream for later
+      stream.clear();
+      stream.seekg(pos);
+
+      if (doc.WasParseSuccessful()) {
+        auto root = doc.GetRootElement();
+        if (!root.IsNull()) {
+          ARROW_LOG(INFO) << "... CompleteMultipartUpload XML response root 
node: "
+                          << root.GetName();
+          if (root.GetName() != "CompleteMultipartUploadResult") {
+            // It's not the expected XML response root node, so it should be 
an error.
+            ARROW_LOG(INFO) << "CompletedMultipartUpload got error: "
+                            << doc.ConvertToString();
+            http_resp->SetResponseCode(
+                Aws::Http::HttpResponseCode::INTERNAL_SERVER_ERROR);
+            aws_error = GetErrorMarshaller()->Marshall(*http_resp);
+            // Rewind stream for later
+            stream.clear();
+            stream.seekg(pos);

Review comment:
       `GetErrorMarshaller()->Marshall` will involve XML parsing over the 
response stream again.

##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -563,6 +566,97 @@ class S3Client : public Aws::S3::S3Client {
     req.SetBucket(ToAwsString(bucket));
     return GetBucketRegion(req);
   }
+
+  S3Model::CompleteMultipartUploadOutcome 
CompleteMultipartUploadWithErrorFixup(
+      S3Model::CompleteMultipartUploadRequest&& request) const {
+    // CompletedMultipartUpload can return a 200 OK response with an error
+    // encoded in the response body, in which case we should either retry
+    // or propagate the error to the user (see
+    // 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html).
+    //
+    // Unfortunately the AWS SDK doesn't detect such situations but lets them
+    // return successfully (see https://github.com/aws/aws-sdk-cpp/issues/658).
+    //
+    // We work around the issue by registering a DataReceivedEventHandler
+    // which parses the XML response for embedded errors.
+
+    util::optional<AWSError<Aws::Client::CoreErrors>> aws_error;
+
+    auto handler = [&](const Aws::Http::HttpRequest* http_req,
+                       Aws::Http::HttpResponse* http_resp,
+                       long long) {  // NOLINT runtime/int
+      auto& stream = http_resp->GetResponseBody();
+      const auto pos = stream.tellg();
+      const auto doc = 
Aws::Utils::Xml::XmlDocument::CreateFromXmlStream(stream);
+      // Rewind stream for later
+      stream.clear();
+      stream.seekg(pos);
+
+      if (doc.WasParseSuccessful()) {
+        auto root = doc.GetRootElement();
+        if (!root.IsNull()) {
+          ARROW_LOG(INFO) << "... CompleteMultipartUpload XML response root 
node: "
+                          << root.GetName();
+          if (root.GetName() != "CompleteMultipartUploadResult") {
+            // It's not the expected XML response root node, so it should be 
an error.
+            ARROW_LOG(INFO) << "CompletedMultipartUpload got error: "
+                            << doc.ConvertToString();
+            http_resp->SetResponseCode(
+                Aws::Http::HttpResponseCode::INTERNAL_SERVER_ERROR);
+            aws_error = GetErrorMarshaller()->Marshall(*http_resp);
+            // Rewind stream for later
+            stream.clear();
+            stream.seekg(pos);
+          }
+        }
+      }
+    };
+
+    request.SetDataReceivedEventHandler(std::move(handler));
+
+    for (int64_t retries = 0;; retries++) {
+      aws_error.reset();
+      auto outcome = 
Aws::S3::S3Client::S3Client::CompleteMultipartUpload(request);
+      if (!outcome.IsSuccess()) {
+        // Error returned in HTTP headers (or client failure)
+        return outcome;
+      }
+      if (!aws_error.has_value()) {
+        // Genuinely successful outcome
+        return outcome;
+      }
+
+      // An error was returned in the XML body despite the 200 OK status code.
+      // We don't have access to the AWS retry strategy (m_retryStrategy
+      // is a private member of AwsClient), so don't use that.  Instead,
+      // call our own if available; otherwise fall back on 
AWSError::ShouldRetry(),
+      // but put a hardcoded cap on the number of retries.
+      const auto detail = WrappedRetryStrategy::ErrorToDetail(*aws_error);
+      const bool should_retry = s3_retry_strategy_
+                                    ? s3_retry_strategy_->ShouldRetry(detail, 
retries)
+                                    : (detail.should_retry && retries < 20);

Review comment:
       Hmm, why not indeed.

##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -505,7 +509,6 @@ class WrappedRetryStrategy : public 
Aws::Client::RetryStrategy {
             detail, static_cast<int64_t>(attempted_retries)));
   }
 
- private:

Review comment:
       Will do.

##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -563,6 +566,97 @@ class S3Client : public Aws::S3::S3Client {
     req.SetBucket(ToAwsString(bucket));
     return GetBucketRegion(req);
   }
+
+  S3Model::CompleteMultipartUploadOutcome 
CompleteMultipartUploadWithErrorFixup(
+      S3Model::CompleteMultipartUploadRequest&& request) const {
+    // CompletedMultipartUpload can return a 200 OK response with an error
+    // encoded in the response body, in which case we should either retry
+    // or propagate the error to the user (see
+    // 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html).
+    //
+    // Unfortunately the AWS SDK doesn't detect such situations but lets them
+    // return successfully (see https://github.com/aws/aws-sdk-cpp/issues/658).
+    //
+    // We work around the issue by registering a DataReceivedEventHandler
+    // which parses the XML response for embedded errors.
+
+    util::optional<AWSError<Aws::Client::CoreErrors>> aws_error;
+
+    auto handler = [&](const Aws::Http::HttpRequest* http_req,
+                       Aws::Http::HttpResponse* http_resp,
+                       long long) {  // NOLINT runtime/int
+      auto& stream = http_resp->GetResponseBody();
+      const auto pos = stream.tellg();
+      const auto doc = 
Aws::Utils::Xml::XmlDocument::CreateFromXmlStream(stream);

Review comment:
       This should be normally detected as an error by the AWS SDK, so no need 
to special-case this on our own (hopefully).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to