This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 27900a6cb7 GH-45371: [C++] Fix data race in 
`SimpleRecordBatch::columns` (#45372)
27900a6cb7 is described below

commit 27900a6cb790e7e5573118d2d877295edb2cb17e
Author: Colin <[email protected]>
AuthorDate: Wed Feb 5 04:11:45 2025 -0800

    GH-45371: [C++] Fix data race in `SimpleRecordBatch::columns` (#45372)
    
    ### Rationale for this change
    
    GH-45371
    
    ### What changes are included in this PR?
    
    Use `std::atomic_compare_exchange` to initialize `boxed_columns_[i]` so 
they are correctly written only once. This means that a reference to 
`boxed_columns_` is safe to read after each element has been initialized.
    
    ### Are these changes tested?
    
    Yes, there is a test case `TestRecordBatch.ColumnsThreadSafety` which 
passes under TSAN.
    
    ### Are there any user-facing changes?
    
    No
    
    **This PR contains a "Critical Fix".**
    
    Without this fix, concurrent calls to `SimpleRecordBatch::columns` could 
lead to an invalid memory access and crash.
    * GitHub Issue: #45371
    
    Lead-authored-by: Colin Schultz <[email protected]>
    Co-authored-by: Colin <[email protected]>
    Co-authored-by: Antoine Pitrou <[email protected]>
    Co-authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/record_batch.cc      | 10 ++++++++--
 cpp/src/arrow/record_batch_test.cc | 38 ++++++++++++++++++++++++++++++++++++++
 2 files changed, 46 insertions(+), 2 deletions(-)

diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc
index 5ce33a3731..9622fe8322 100644
--- a/cpp/src/arrow/record_batch.cc
+++ b/cpp/src/arrow/record_batch.cc
@@ -18,6 +18,7 @@
 #include "arrow/record_batch.h"
 
 #include <algorithm>
+#include <atomic>
 #include <cmath>
 #include <cstdlib>
 #include <memory>
@@ -102,8 +103,13 @@ class SimpleRecordBatch : public RecordBatch {
   std::shared_ptr<Array> column(int i) const override {
     std::shared_ptr<Array> result = std::atomic_load(&boxed_columns_[i]);
     if (!result) {
-      result = MakeArray(columns_[i]);
-      std::atomic_store(&boxed_columns_[i], result);
+      auto new_array = MakeArray(columns_[i]);
+      // Be careful not to overwrite existing entry if another thread has been 
calling
+      // `column(i)` at the same time, since the `boxed_columns_` contents are 
exposed
+      // by `columns()` (see GH-45371).
+      if (std::atomic_compare_exchange_strong(&boxed_columns_[i], &result, 
new_array)) {
+        return new_array;
+      }
     }
     return result;
   }
diff --git a/cpp/src/arrow/record_batch_test.cc 
b/cpp/src/arrow/record_batch_test.cc
index 21d51ae506..d41a67f35e 100644
--- a/cpp/src/arrow/record_batch_test.cc
+++ b/cpp/src/arrow/record_batch_test.cc
@@ -20,8 +20,13 @@
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
+#include <cstddef>
 #include <cstdint>
 #include <memory>
+#include <mutex>
+#include <string>
+#include <thread>
+#include <utility>
 #include <vector>
 
 #include "arrow/array/array_base.h"
@@ -31,6 +36,7 @@
 #include "arrow/array/util.h"
 #include "arrow/c/abi.h"
 #include "arrow/chunked_array.h"
+#include "arrow/config.h"
 #include "arrow/status.h"
 #include "arrow/table.h"
 #include "arrow/tensor.h"
@@ -38,6 +44,7 @@
 #include "arrow/testing/gtest_util.h"
 #include "arrow/testing/random.h"
 #include "arrow/type.h"
+#include "arrow/type_fwd.h"
 #include "arrow/util/float16.h"
 #include "arrow/util/iterator.h"
 #include "arrow/util/key_value_metadata.h"
@@ -393,6 +400,37 @@ TEST_F(TestRecordBatch, RemoveColumnEmpty) {
   AssertBatchesEqual(*added, *batch1);
 }
 
+TEST_F(TestRecordBatch, ColumnsThreadSafety) {
+#ifndef ARROW_ENABLE_THREADING
+  GTEST_SKIP() << "Test requires threading support";
+#endif
+  constexpr int kNumThreads = 10;
+  constexpr int kLength = 10;
+
+  random::RandomArrayGenerator gen(42);
+  std::shared_ptr<ArrayData> array_data = gen.ArrayOf(utf8(), kLength)->data();
+  auto schema = ::arrow::schema({field("f1", utf8())});
+  auto record_batch = RecordBatch::Make(schema, kLength, {array_data});
+  std::mutex mutex;
+  std::vector<std::thread> threads;
+  std::vector<Array*> all_columns;
+  for (int i = 0; i < kNumThreads; i++) {
+    threads.emplace_back([&]() {
+      auto columns = record_batch->columns();
+      mutex.lock();
+      all_columns.push_back(columns[0].get());
+      mutex.unlock();
+    });
+  }
+  for (auto& thread : threads) {
+    thread.join();
+  }
+  // assert that all calls to columns() return the same arrays
+  for (const auto& col : all_columns) {
+    ASSERT_EQ(col, all_columns[0]);
+  }
+}
+
 TEST_F(TestRecordBatch, ToFromEmptyStructArray) {
   auto batch1 =
       RecordBatch::Make(::arrow::schema({}), 10, 
std::vector<std::shared_ptr<Array>>{});

Reply via email to