This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 27900a6cb7 GH-45371: [C++] Fix data race in
`SimpleRecordBatch::columns` (#45372)
27900a6cb7 is described below
commit 27900a6cb790e7e5573118d2d877295edb2cb17e
Author: Colin <[email protected]>
AuthorDate: Wed Feb 5 04:11:45 2025 -0800
GH-45371: [C++] Fix data race in `SimpleRecordBatch::columns` (#45372)
### Rationale for this change
GH-45371
### What changes are included in this PR?
Use `std::atomic_compare_exchange` to initialize `boxed_columns_[i]` so
they are correctly written only once. This means that a reference to
`boxed_columns_` is safe to read after each element has been initialized.
### Are these changes tested?
Yes, there is a test case `TestRecordBatch.ColumnsThreadSafety` which
passes under TSAN.
### Are there any user-facing changes?
No
**This PR contains a "Critical Fix".**
Without this fix, concurrent calls to `SimpleRecordBatch::columns` could
lead to an invalid memory access and crash.
* GitHub Issue: #45371
Lead-authored-by: Colin Schultz <[email protected]>
Co-authored-by: Colin <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/arrow/record_batch.cc | 10 ++++++++--
cpp/src/arrow/record_batch_test.cc | 38 ++++++++++++++++++++++++++++++++++++++
2 files changed, 46 insertions(+), 2 deletions(-)
diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc
index 5ce33a3731..9622fe8322 100644
--- a/cpp/src/arrow/record_batch.cc
+++ b/cpp/src/arrow/record_batch.cc
@@ -18,6 +18,7 @@
#include "arrow/record_batch.h"
#include <algorithm>
+#include <atomic>
#include <cmath>
#include <cstdlib>
#include <memory>
@@ -102,8 +103,13 @@ class SimpleRecordBatch : public RecordBatch {
std::shared_ptr<Array> column(int i) const override {
std::shared_ptr<Array> result = std::atomic_load(&boxed_columns_[i]);
if (!result) {
- result = MakeArray(columns_[i]);
- std::atomic_store(&boxed_columns_[i], result);
+ auto new_array = MakeArray(columns_[i]);
+ // Be careful not to overwrite existing entry if another thread has been
calling
+ // `column(i)` at the same time, since the `boxed_columns_` contents are
exposed
+ // by `columns()` (see GH-45371).
+ if (std::atomic_compare_exchange_strong(&boxed_columns_[i], &result,
new_array)) {
+ return new_array;
+ }
}
return result;
}
diff --git a/cpp/src/arrow/record_batch_test.cc
b/cpp/src/arrow/record_batch_test.cc
index 21d51ae506..d41a67f35e 100644
--- a/cpp/src/arrow/record_batch_test.cc
+++ b/cpp/src/arrow/record_batch_test.cc
@@ -20,8 +20,13 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>
+#include <cstddef>
#include <cstdint>
#include <memory>
+#include <mutex>
+#include <string>
+#include <thread>
+#include <utility>
#include <vector>
#include "arrow/array/array_base.h"
@@ -31,6 +36,7 @@
#include "arrow/array/util.h"
#include "arrow/c/abi.h"
#include "arrow/chunked_array.h"
+#include "arrow/config.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/tensor.h"
@@ -38,6 +44,7 @@
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/type.h"
+#include "arrow/type_fwd.h"
#include "arrow/util/float16.h"
#include "arrow/util/iterator.h"
#include "arrow/util/key_value_metadata.h"
@@ -393,6 +400,37 @@ TEST_F(TestRecordBatch, RemoveColumnEmpty) {
AssertBatchesEqual(*added, *batch1);
}
+TEST_F(TestRecordBatch, ColumnsThreadSafety) {
+#ifndef ARROW_ENABLE_THREADING
+ GTEST_SKIP() << "Test requires threading support";
+#endif
+ constexpr int kNumThreads = 10;
+ constexpr int kLength = 10;
+
+ random::RandomArrayGenerator gen(42);
+ std::shared_ptr<ArrayData> array_data = gen.ArrayOf(utf8(), kLength)->data();
+ auto schema = ::arrow::schema({field("f1", utf8())});
+ auto record_batch = RecordBatch::Make(schema, kLength, {array_data});
+ std::mutex mutex;
+ std::vector<std::thread> threads;
+ std::vector<Array*> all_columns;
+ for (int i = 0; i < kNumThreads; i++) {
+ threads.emplace_back([&]() {
+ auto columns = record_batch->columns();
+ mutex.lock();
+ all_columns.push_back(columns[0].get());
+ mutex.unlock();
+ });
+ }
+ for (auto& thread : threads) {
+ thread.join();
+ }
+ // assert that all calls to columns() return the same arrays
+ for (const auto& col : all_columns) {
+ ASSERT_EQ(col, all_columns[0]);
+ }
+}
+
TEST_F(TestRecordBatch, ToFromEmptyStructArray) {
auto batch1 =
RecordBatch::Make(::arrow::schema({}), 10,
std::vector<std::shared_ptr<Array>>{});