This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8eacd8c3a9 GH-36523: [C++] Fix TSan-detected lock ordering issues in 
S3 (#36536)
8eacd8c3a9 is described below

commit 8eacd8c3a98c6de5b026c2a9e71aece760295ea2
Author: Antoine Pitrou <[email protected]>
AuthorDate: Mon Jul 10 09:42:09 2023 +0200

    GH-36523: [C++] Fix TSan-detected lock ordering issues in S3 (#36536)
    
    It is counter-intuitive, but lock ordering issues can happen even with a 
shared mutex locked in shared mode.
    The reason is that locking again in shared mode can block while there are 
threads waiting to take the lock in exclusive mode. For this reason, we must 
avoid to keep the S3ClientLock taken before is it taken again.
    
    * Closes: #36523
    
    Authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/filesystem/s3_test_util.h |  11 +++
 cpp/src/arrow/filesystem/s3fs.cc        | 139 +++++++++++++++++++-------------
 cpp/src/arrow/filesystem/s3fs_test.cc   |   7 --
 cpp/src/arrow/util/io_util.cc           |   4 +-
 4 files changed, 95 insertions(+), 66 deletions(-)

diff --git a/cpp/src/arrow/filesystem/s3_test_util.h 
b/cpp/src/arrow/filesystem/s3_test_util.h
index 17245e0a89..e270a6e1c4 100644
--- a/cpp/src/arrow/filesystem/s3_test_util.h
+++ b/cpp/src/arrow/filesystem/s3_test_util.h
@@ -26,6 +26,7 @@
 #include "arrow/filesystem/s3fs.h"
 #include "arrow/status.h"
 #include "arrow/testing/gtest_util.h"
+#include "arrow/testing/util.h"
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/macros.h"
 
@@ -76,6 +77,13 @@ class MinioTestEnvironment : public ::testing::Environment {
 
 class S3Environment : public ::testing::Environment {
  public:
+  // We set this environment variable to speed up tests by ensuring
+  // DefaultAWSCredentialsProviderChain does not query (inaccessible)
+  // EC2 metadata endpoint.
+  // This must be done before spawning any Minio child process to avoid any 
race
+  // condition accessing environment variables.
+  S3Environment() : ec2_metadata_disabled_guard_("AWS_EC2_METADATA_DISABLED", 
"true") {}
+
   void SetUp() override {
     // Change this to increase logging during tests
     S3GlobalOptions options;
@@ -84,6 +92,9 @@ class S3Environment : public ::testing::Environment {
   }
 
   void TearDown() override { ASSERT_OK(FinalizeS3()); }
+
+ private:
+  EnvVarGuard ec2_metadata_disabled_guard_;
 };
 
 }  // namespace fs
diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index f05d89963c..c57fc7f291 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -712,6 +712,7 @@ void DisableRedirects(Aws::Client::ClientConfiguration* c) {
 // To prevent such issues, we wrap all S3Client instances in a special
 // structure (S3ClientHolder) that prevents usage of S3Client after
 // S3 was finalized.
+// Please make sure you read the comments in S3ClientLock::Move below.
 //
 // See: GH-36346, GH-15054.
 
@@ -722,6 +723,18 @@ class S3ClientLock {
   S3Client* get() { return client_.get(); }
   S3Client* operator->() { return client_.get(); }
 
+  // Move this S3ClientLock into a temporary instance
+  //
+  // It is counter-intuitive, but lock ordering issues can happen even
+  // with a shared mutex locked in shared mode.
+  // The reason is that locking again in shared mode can block while
+  // there are threads waiting to take the lock in exclusive mode.
+  // Therefore, we should avoid to keep the S3ClientLock taken
+  // before is it taken again. This methods helps doing that.
+  //
+  // (see GH-36523)
+  S3ClientLock Move() { return std::move(*this); }
+
  protected:
   friend class S3ClientHolder;
 
@@ -800,11 +813,28 @@ class S3ClientFinalizer : public 
std::enable_shared_from_this<S3ClientFinalizer>
 };
 
 Result<S3ClientLock> S3ClientHolder::Lock() {
-  std::lock_guard lock(mutex_);
-  auto finalizer = finalizer_.lock();
+  std::shared_ptr<S3ClientFinalizer> finalizer;
+  std::shared_ptr<S3Client> client;
+  {
+    std::unique_lock lock(mutex_);
+    finalizer = finalizer_.lock();
+    client = client_;
+  }
+  // Do not hold mutex while taking finalizer lock below.
+  //
+  // Acquiring a shared_mutex in shared mode may block even if not already
+  // acquired in exclusive mode, because of pending writers:
+  // https://github.com/google/sanitizers/issues/1668#issuecomment-1624985664
+  // """It is implementation-defined whether the calling thread acquires
+  // the lock when a writer does not hold the lock and there are writers
+  // blocked on the lock""".
+  //
+  // Therefore, we want to avoid potential lock ordering issues
+  // even when a shared lock is involved (GH-36523).
   if (!finalizer) {
     return ErrorS3Finalized();
   }
+
   S3ClientLock client_lock;
   // Lock the finalizer before examining it
   client_lock.lock_ = finalizer->LockShared();
@@ -812,14 +842,18 @@ Result<S3ClientLock> S3ClientHolder::Lock() {
     return ErrorS3Finalized();
   }
   // (the client can be cleared only if finalizer->finalized_ is true)
-  DCHECK(client_) << "inconsistent S3ClientHolder";
-  client_lock.client_ = client_;
+  DCHECK(client) << "inconsistent S3ClientHolder";
+  client_lock.client_ = std::move(client);
   return client_lock;
 }
 
 void S3ClientHolder::Finalize() {
-  std::lock_guard lock(mutex_);
-  client_.reset();
+  std::shared_ptr<S3Client> client;
+  {
+    std::unique_lock lock(mutex_);
+    client = std::move(client_);
+  }
+  // Do not hold mutex while ~S3Client potentially runs
 }
 
 std::shared_ptr<S3ClientFinalizer> GetClientFinalizer() {
@@ -1158,7 +1192,7 @@ class ObjectInputFile final : public io::RandomAccessFile 
{
     req.SetKey(ToAwsString(path_.key));
 
     ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
-    auto outcome = client_lock->HeadObject(req);
+    auto outcome = client_lock.Move()->HeadObject(req);
     if (!outcome.IsSuccess()) {
       if (IsNotFound(outcome.GetError())) {
         return PathNotFound(path_);
@@ -1343,7 +1377,7 @@ class ObjectOutputStream final : public io::OutputStream {
       req.SetContentType("application/octet-stream");
     }
 
-    auto outcome = client_lock->CreateMultipartUpload(req);
+    auto outcome = client_lock.Move()->CreateMultipartUpload(req);
     if (!outcome.IsSuccess()) {
       return ErrorToStatus(
           std::forward_as_tuple("When initiating multiple part upload for key 
'",
@@ -1368,7 +1402,7 @@ class ObjectOutputStream final : public io::OutputStream {
     req.SetKey(ToAwsString(path_.key));
     req.SetUploadId(upload_id_);
 
-    auto outcome = client_lock->AbortMultipartUpload(req);
+    auto outcome = client_lock.Move()->AbortMultipartUpload(req);
     if (!outcome.IsSuccess()) {
       return ErrorToStatus(
           std::forward_as_tuple("When aborting multiple part upload for key 
'", path_.key,
@@ -1418,7 +1452,8 @@ class ObjectOutputStream final : public io::OutputStream {
       req.SetUploadId(upload_id_);
       req.SetMultipartUpload(std::move(completed_upload));
 
-      auto outcome = 
client_lock->CompleteMultipartUploadWithErrorFixup(std::move(req));
+      auto outcome =
+          
client_lock.Move()->CompleteMultipartUploadWithErrorFixup(std::move(req));
       if (!outcome.IsSuccess()) {
         return ErrorToStatus(
             std::forward_as_tuple("When completing multiple part upload for 
key '",
@@ -1527,8 +1562,6 @@ class ObjectOutputStream final : public io::OutputStream {
 
   Status UploadPart(const void* data, int64_t nbytes,
                     std::shared_ptr<Buffer> owned_buffer = nullptr) {
-    ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
-
     S3Model::UploadPartRequest req;
     req.SetBucket(ToAwsString(path_.bucket));
     req.SetKey(ToAwsString(path_.key));
@@ -1538,7 +1571,8 @@ class ObjectOutputStream final : public io::OutputStream {
 
     if (!background_writes_) {
       req.SetBody(std::make_shared<StringViewStream>(data, nbytes));
-      auto outcome = client_lock->UploadPart(req);
+      ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
+      auto outcome = client_lock.Move()->UploadPart(req);
       if (!outcome.IsSuccess()) {
         return UploadPartError(req, outcome);
       } else {
@@ -1562,21 +1596,17 @@ class ObjectOutputStream final : public 
io::OutputStream {
           upload_state_->pending_parts_completed = Future<>::Make();
         }
       }
-      // XXX This callback returns Aws::Utils::Outcome, it cannot easily call
-      // `holder->Lock()` which returns arrow::Result.
-      ARROW_ASSIGN_OR_RAISE(
-          auto fut,
-          SubmitIO(io_context_, [client_lock = std::move(client_lock), req]() 
mutable {
-            return client_lock->UploadPart(req);
-          }));
+
       // The closure keeps the buffer and the upload state alive
-      auto state = upload_state_;
-      auto part_number = part_number_;
-      auto handler = [owned_buffer, state, part_number,
-                      req](const Result<S3Model::UploadPartOutcome>& result) 
-> void {
-        HandleUploadOutcome(state, part_number, req, result);
+      auto deferred = [owned_buffer, holder = holder_, req = std::move(req),
+                       state = upload_state_,
+                       part_number = part_number_]() mutable -> Status {
+        ARROW_ASSIGN_OR_RAISE(auto client_lock, holder->Lock());
+        auto outcome = client_lock.Move()->UploadPart(req);
+        HandleUploadOutcome(state, part_number, req, outcome);
+        return Status::OK();
       };
-      fut.AddCallback(std::move(handler));
+      RETURN_NOT_OK(SubmitIO(io_context_, std::move(deferred)));
     }
 
     ++part_number_;
@@ -1749,8 +1779,9 @@ struct TreeWalker : public 
std::enable_shared_from_this<TreeWalker> {
       auto cb = *this;
       walker->task_group_->Append([cb]() mutable {
         ARROW_ASSIGN_OR_RAISE(auto client_lock, cb.walker->holder_->Lock());
-        Result<S3Model::ListObjectsV2Outcome> result = 
client_lock->ListObjectsV2(cb.req);
-        return cb(result);
+        Result<S3Model::ListObjectsV2Outcome> result =
+            client_lock.Move()->ListObjectsV2(cb.req);
+        return cb(std::move(result));
       });
     }
 
@@ -1850,7 +1881,7 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
     S3Model::HeadBucketRequest req;
     req.SetBucket(ToAwsString(bucket));
 
-    auto outcome = client_lock->HeadBucket(req);
+    auto outcome = client_lock.Move()->HeadBucket(req);
     if (!outcome.IsSuccess()) {
       if (!IsNotFound(outcome.GetError())) {
         return ErrorToStatus(std::forward_as_tuple(
@@ -1864,13 +1895,12 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
 
   // Create a bucket.  Successful if bucket already exists.
   Status CreateBucket(const std::string& bucket) {
-    ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
-
     // Check bucket exists first.
     {
       S3Model::HeadBucketRequest req;
       req.SetBucket(ToAwsString(bucket));
-      auto outcome = client_lock->HeadBucket(req);
+      ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
+      auto outcome = client_lock.Move()->HeadBucket(req);
 
       if (outcome.IsSuccess()) {
         return Status::OK();
@@ -1900,7 +1930,8 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
     req.SetBucket(ToAwsString(bucket));
     req.SetCreateBucketConfiguration(config);
 
-    auto outcome = client_lock->CreateBucket(req);
+    ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
+    auto outcome = client_lock.Move()->CreateBucket(req);
     if (!outcome.IsSuccess() && !IsAlreadyExists(outcome.GetError())) {
       return ErrorToStatus(std::forward_as_tuple("When creating bucket '", 
bucket, "': "),
                            "CreateBucket", outcome.GetError());
@@ -1918,7 +1949,7 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
     req.SetBody(std::make_shared<std::stringstream>(""));
     return OutcomeToStatus(
         std::forward_as_tuple("When creating key '", key, "' in bucket '", 
bucket, "': "),
-        "PutObject", client_lock->PutObject(req));
+        "PutObject", client_lock.Move()->PutObject(req));
   }
 
   Status CreateEmptyDir(const std::string& bucket, const std::string& key) {
@@ -1934,7 +1965,7 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
     req.SetKey(ToAwsString(key));
     return OutcomeToStatus(
         std::forward_as_tuple("When delete key '", key, "' in bucket '", 
bucket, "': "),
-        "DeleteObject", client_lock->DeleteObject(req));
+        "DeleteObject", client_lock.Move()->DeleteObject(req));
   }
 
   Status CopyObject(const S3Path& src_path, const S3Path& dest_path) {
@@ -1950,7 +1981,7 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
         std::forward_as_tuple("When copying key '", src_path.key, "' in bucket 
'",
                               src_path.bucket, "' to key '", dest_path.key,
                               "' in bucket '", dest_path.bucket, "': "),
-        "CopyObject", client_lock->CopyObject(req));
+        "CopyObject", client_lock.Move()->CopyObject(req));
   }
 
   // On Minio, an empty "directory" doesn't satisfy the same API requests as
@@ -1989,7 +2020,7 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
       req.SetKey(ToAwsString(key));
     }
 
-    auto outcome = client_lock->HeadObject(req);
+    auto outcome = client_lock.Move()->HeadObject(req);
     if (outcome.IsSuccess()) {
       return true;
     }
@@ -2022,7 +2053,7 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
     req.SetPrefix(ToAwsString(path.key) + kSep);
     req.SetDelimiter(Aws::String() + kSep);
     req.SetMaxKeys(1);
-    auto outcome = client_lock->ListObjectsV2(req);
+    auto outcome = client_lock.Move()->ListObjectsV2(req);
     if (outcome.IsSuccess()) {
       const S3Model::ListObjectsV2Result& r = outcome.GetResult();
       // In some cases, there may be 0 keys but some prefixes
@@ -2233,8 +2264,6 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
   // Delete multiple objects at once
   Future<> DeleteObjectsAsync(const std::string& bucket,
                               const std::vector<std::string>& keys) {
-    ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
-
     struct DeleteCallback {
       std::string bucket;
 
@@ -2278,7 +2307,7 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
           SubmitIO(io_context_,
                    [holder = holder_, req = std::move(req), delete_cb]() -> 
Status {
                      ARROW_ASSIGN_OR_RAISE(auto client_lock, holder->Lock());
-                     return delete_cb(client_lock->DeleteObjects(req));
+                     return delete_cb(client_lock.Move()->DeleteObjects(req));
                    }));
       futures.push_back(std::move(fut));
     }
@@ -2344,22 +2373,16 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
 
   Result<std::vector<std::string>> ListBuckets() {
     ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
-
-    auto outcome = client_lock->ListBuckets();
-    return ProcessListBuckets(outcome);
+    return ProcessListBuckets(client_lock.Move()->ListBuckets());
   }
 
   Future<std::vector<std::string>> ListBucketsAsync(io::IOContext ctx) {
-    ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
-
-    return DeferNotOk(SubmitIO(ctx,
-                               [client_lock = std::move(client_lock)]() 
mutable {
-                                 return client_lock->ListBuckets();
-                               }))
-        // TODO(ARROW-12655) Change to Then(Impl::ProcessListBuckets)
-        .Then([](const Aws::S3::Model::ListBucketsOutcome& outcome) {
-          return Impl::ProcessListBuckets(outcome);
-        });
+    auto deferred =
+        [self = shared_from_this()]() mutable -> 
Result<std::vector<std::string>> {
+      ARROW_ASSIGN_OR_RAISE(auto client_lock, self->holder_->Lock());
+      return self->ProcessListBuckets(client_lock.Move()->ListBuckets());
+    };
+    return DeferNotOk(SubmitIO(ctx, std::move(deferred)));
   }
 
   Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const std::string& s,
@@ -2449,7 +2472,7 @@ Result<FileInfo> S3FileSystem::GetFileInfo(const 
std::string& s) {
     S3Model::HeadBucketRequest req;
     req.SetBucket(ToAwsString(path.bucket));
 
-    auto outcome = client_lock->HeadBucket(req);
+    auto outcome = client_lock.Move()->HeadBucket(req);
     if (!outcome.IsSuccess()) {
       if (!IsNotFound(outcome.GetError())) {
         const auto msg = "When getting information for bucket '" + path.bucket 
+ "': ";
@@ -2469,7 +2492,7 @@ Result<FileInfo> S3FileSystem::GetFileInfo(const 
std::string& s) {
     req.SetBucket(ToAwsString(path.bucket));
     req.SetKey(ToAwsString(path.key));
 
-    auto outcome = client_lock->HeadObject(req);
+    auto outcome = client_lock.Move()->HeadObject(req);
     if (outcome.IsSuccess()) {
       // "File" object found
       FileObjectToInfo(outcome.GetResult(), &info);
@@ -2624,7 +2647,7 @@ Status S3FileSystem::DeleteDir(const std::string& s) {
     req.SetBucket(ToAwsString(path.bucket));
     return OutcomeToStatus(
         std::forward_as_tuple("When deleting bucket '", path.bucket, "': "),
-        "DeleteBucket", client_lock->DeleteBucket(req));
+        "DeleteBucket", client_lock.Move()->DeleteBucket(req));
   } else if (path.key.empty()) {
     return Status::IOError("Would delete bucket '", path.bucket, "'. ",
                            "To delete buckets, enable the 
allow_bucket_deletion option.");
@@ -2676,7 +2699,7 @@ Status S3FileSystem::DeleteFile(const std::string& s) {
   req.SetBucket(ToAwsString(path.bucket));
   req.SetKey(ToAwsString(path.key));
 
-  auto outcome = client_lock->HeadObject(req);
+  auto outcome = client_lock.Move()->HeadObject(req);
   if (!outcome.IsSuccess()) {
     if (IsNotFound(outcome.GetError())) {
       return PathNotFound(path);
diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc 
b/cpp/src/arrow/filesystem/s3fs_test.cc
index 5b0287d997..1426fe324b 100644
--- a/cpp/src/arrow/filesystem/s3fs_test.cc
+++ b/cpp/src/arrow/filesystem/s3fs_test.cc
@@ -59,7 +59,6 @@
 #include "arrow/testing/future_util.h"
 #include "arrow/testing/gtest_util.h"
 #include "arrow/testing/matchers.h"
-#include "arrow/testing/util.h"
 #include "arrow/util/async_generator.h"
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/future.h"
@@ -146,11 +145,6 @@ class ShortRetryStrategy : public S3RetryStrategy {
 
 class AwsTestMixin : public ::testing::Test {
  public:
-  // We set this environment variable to speed up tests by ensuring
-  // DefaultAWSCredentialsProviderChain does not query (inaccessible)
-  // EC2 metadata endpoint
-  AwsTestMixin() : ec2_metadata_disabled_guard_("AWS_EC2_METADATA_DISABLED", 
"true") {}
-
   void SetUp() override {
 #ifdef AWS_CPP_SDK_S3_NOT_SHARED
     auto aws_log_level = Aws::Utils::Logging::LogLevel::Fatal;
@@ -169,7 +163,6 @@ class AwsTestMixin : public ::testing::Test {
   }
 
  private:
-  EnvVarGuard ec2_metadata_disabled_guard_;
 #ifdef AWS_CPP_SDK_S3_NOT_SHARED
   Aws::SDKOptions aws_options_;
 #endif
diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc
index 18cac9ae11..ac92618ff6 100644
--- a/cpp/src/arrow/util/io_util.cc
+++ b/cpp/src/arrow/util/io_util.cc
@@ -45,6 +45,7 @@
 #include <random>
 #include <sstream>
 #include <string>
+#include <string_view>
 #include <thread>
 #include <utility>
 #include <vector>
@@ -1896,7 +1897,8 @@ std::vector<NativePathString> GetPlatformTemporaryDirs() {
 }
 
 std::string MakeRandomName(int num_chars) {
-  static const std::string chars = "0123456789abcdefghijklmnopqrstuvwxyz";
+  constexpr std::string_view chars = "0123456789abcdefghijklmnopqrstuvwxyz";
+
   std::default_random_engine gen(
       static_cast<std::default_random_engine::result_type>(GetRandomSeed()));
   std::uniform_int_distribution<int> dist(0, static_cast<int>(chars.length() - 
1));

Reply via email to