This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new ebaaf07adb GH-49146: [C++] Add option to disable atfork handlers
(#49148)
ebaaf07adb is described below
commit ebaaf07adbd302e95e393b5b77d78c1c97ea3b70
Author: Antoine Pitrou <[email protected]>
AuthorDate: Tue Feb 17 17:22:44 2026 +0100
GH-49146: [C++] Add option to disable atfork handlers (#49148)
### Rationale for this change
The atfork handlers we register in Arrow C++ are generally useful if the
Arrow APIs are meant to be used from the child process, but they also have the
unfortunate effect of executing non-async-signal-safe code in the child process
even if Arrow is not be used there. That is [not allowed by
POSIX](https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_atfork.html)
if the parent process is multi-threaded.
There are situations where fork() is only called just before exec(), and
therefore it is not necessary to run any atfork handler.
### What changes are included in this PR?
1. Add a `GetEnvVarInteger` utility function to automate parsing of a
numeric environment variable
2. Remove hard-coded size limitations for environment variable values on
Windows
3. Add basic unit tests for our APIs for getting and setting environment
variables
4. Add an environment variable `ARROW_REGISTER_ATFORK` to disable the
registration of atfork handlers at runtime
### Are these changes tested?
The new environment variable cannot be easily tested automatically, so I've
checked it manually.
### Are there any user-facing changes?
No, only a new feature.
* GitHub Issue: #49146
Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/arrow/filesystem/s3fs.cc | 8 ++--
cpp/src/arrow/io/interfaces.cc | 24 ++++--------
cpp/src/arrow/testing/gtest_util.cc | 19 ++++++----
cpp/src/arrow/testing/gtest_util.h | 21 +++++++----
cpp/src/arrow/util/atfork_internal.cc | 22 ++++++++++-
cpp/src/arrow/util/atfork_test.cc | 3 ++
cpp/src/arrow/util/fuzz_internal.cc | 17 ++++-----
cpp/src/arrow/util/io_util.cc | 69 +++++++++++++++++++++++++----------
cpp/src/arrow/util/io_util.h | 6 +++
cpp/src/arrow/util/io_util_test.cc | 39 ++++++++++++++++++++
docs/source/cpp/env_vars.rst | 12 ++++++
python/pyarrow/tests/test_misc.py | 3 +-
12 files changed, 176 insertions(+), 67 deletions(-)
diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index f75fd970a1..0c15f6f184 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -119,7 +119,6 @@
#include "arrow/util/string.h"
#include "arrow/util/task_group.h"
#include "arrow/util/thread_pool.h"
-#include "arrow/util/value_parsing.h"
namespace arrow::fs {
@@ -3579,9 +3578,10 @@ S3GlobalOptions S3GlobalOptions::Defaults() {
log_level = S3LogLevel::Off;
}
- value = arrow::internal::GetEnvVar("ARROW_S3_THREADS").ValueOr("1");
- if (uint32_t u; ::arrow::internal::ParseUnsigned(value.data(), value.size(),
&u)) {
- num_event_loop_threads = u;
+ auto maybe_num_threads =
+ arrow::internal::GetEnvVarInteger("ARROW_S3_THREADS", /*min_value=*/1);
+ if (maybe_num_threads.ok()) {
+ num_event_loop_threads = static_cast<int>(*maybe_num_threads);
}
return S3GlobalOptions{log_level, num_event_loop_threads};
}
diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc
index 12c124ce21..cdd2470b62 100644
--- a/cpp/src/arrow/io/interfaces.cc
+++ b/cpp/src/arrow/io/interfaces.cc
@@ -390,23 +390,15 @@ namespace {
constexpr int kDefaultNumIoThreads = 8;
std::shared_ptr<ThreadPool> MakeIOThreadPool() {
- int threads = 0;
- auto maybe_env_var = ::arrow::internal::GetEnvVar("ARROW_IO_THREADS");
- if (maybe_env_var.ok()) {
- auto str = *std::move(maybe_env_var);
- if (!str.empty()) {
- try {
- threads = std::stoi(str);
- } catch (...) {
- }
- if (threads <= 0) {
- ARROW_LOG(WARNING)
- << "ARROW_IO_THREADS does not contain a valid number of threads "
- "(should be an integer > 0)";
- }
- }
+ int threads = kDefaultNumIoThreads;
+ auto maybe_num_threads = ::arrow::internal::GetEnvVarInteger(
+ "ARROW_IO_THREADS", /*min_value=*/1,
/*max_value=*/std::numeric_limits<int>::max());
+ if (maybe_num_threads.ok()) {
+ threads = static_cast<int>(*maybe_num_threads);
+ } else if (!maybe_num_threads.status().IsKeyError()) {
+ maybe_num_threads.status().Warn();
}
- auto maybe_pool = ThreadPool::MakeEternal(threads > 0 ? threads :
kDefaultNumIoThreads);
+ auto maybe_pool = ThreadPool::MakeEternal(threads);
if (!maybe_pool.ok()) {
maybe_pool.status().Abort("Failed to create global IO thread pool");
}
diff --git a/cpp/src/arrow/testing/gtest_util.cc
b/cpp/src/arrow/testing/gtest_util.cc
index 1acc47a99d..0e2cbdb644 100644
--- a/cpp/src/arrow/testing/gtest_util.cc
+++ b/cpp/src/arrow/testing/gtest_util.cc
@@ -660,21 +660,24 @@ LocaleGuard::LocaleGuard(const char* new_locale) :
impl_(new Impl(new_locale)) {
LocaleGuard::~LocaleGuard() {}
-EnvVarGuard::EnvVarGuard(const std::string& name, const std::string& value)
- : name_(name) {
- auto maybe_value = arrow::internal::GetEnvVar(name);
+EnvVarGuard::EnvVarGuard(std::string name, std::optional<std::string> value)
+ : name_(std::move(name)) {
+ auto maybe_value = arrow::internal::GetEnvVar(name_);
if (maybe_value.ok()) {
- was_set_ = true;
old_value_ = *std::move(maybe_value);
} else {
- was_set_ = false;
+ old_value_ = std::nullopt;
+ }
+ if (value.has_value()) {
+ ARROW_CHECK_OK(arrow::internal::SetEnvVar(name_, *value));
+ } else {
+ ARROW_CHECK_OK(arrow::internal::DelEnvVar(name_));
}
- ARROW_CHECK_OK(arrow::internal::SetEnvVar(name, value));
}
EnvVarGuard::~EnvVarGuard() {
- if (was_set_) {
- ARROW_CHECK_OK(arrow::internal::SetEnvVar(name_, old_value_));
+ if (old_value_.has_value()) {
+ ARROW_CHECK_OK(arrow::internal::SetEnvVar(name_, *old_value_));
} else {
ARROW_CHECK_OK(arrow::internal::DelEnvVar(name_));
}
diff --git a/cpp/src/arrow/testing/gtest_util.h
b/cpp/src/arrow/testing/gtest_util.h
index 62bf907a2d..b84d253a89 100644
--- a/cpp/src/arrow/testing/gtest_util.h
+++ b/cpp/src/arrow/testing/gtest_util.h
@@ -418,9 +418,11 @@ ARROW_TESTING_EXPORT
void AssertChildExit(int child_pid, int expected_exit_status = 0);
#endif
-// A RAII-style object that switches to a new locale, and switches back
-// to the old locale when going out of scope. Doesn't do anything if the
-// new locale doesn't exist on the local machine.
+// A RAII-style object that temporarily switches to a new locale
+//
+// The guard switches back to the old locale when going out of scope.
+// It doesn't do anything if the new locale doesn't exist on the local machine.
+//
// ATTENTION: may crash with an assertion failure on Windows debug builds.
// See ARROW-6108, also https://gerrit.libreoffice.org/#/c/54110/
class ARROW_TESTING_EXPORT LocaleGuard {
@@ -433,15 +435,20 @@ class ARROW_TESTING_EXPORT LocaleGuard {
std::unique_ptr<Impl> impl_;
};
+// A RAII-style object that temporarily sets an environment variable
+//
+// The guard restores the variable's previous value when going out of scope,
+// or deletes the variable if it was not initially set.
+// The environment variable can also be temporarily deleted if std::nullopt
+// is passed instead of a string value.
class ARROW_TESTING_EXPORT EnvVarGuard {
public:
- EnvVarGuard(const std::string& name, const std::string& value);
+ EnvVarGuard(std::string name, std::optional<std::string> value);
~EnvVarGuard();
protected:
- const std::string name_;
- std::string old_value_;
- bool was_set_;
+ std::string name_;
+ std::optional<std::string> old_value_;
};
namespace internal {
diff --git a/cpp/src/arrow/util/atfork_internal.cc
b/cpp/src/arrow/util/atfork_internal.cc
index 7772f1c62b..fa3a09d0a2 100644
--- a/cpp/src/arrow/util/atfork_internal.cc
+++ b/cpp/src/arrow/util/atfork_internal.cc
@@ -34,6 +34,22 @@ namespace internal {
namespace {
+bool IsAtForkEnabled() {
+ static bool is_enabled = [] {
+ auto maybe_value =
+ GetEnvVarInteger("ARROW_REGISTER_ATFORK", /*min_value=*/0,
/*max_value=*/1);
+ if (maybe_value.ok()) {
+ return *maybe_value != 0;
+ }
+ if (!maybe_value.status().IsKeyError()) {
+ maybe_value.status().Warn();
+ }
+ // Enabled by default
+ return true;
+ }();
+ return is_enabled;
+}
+
// Singleton state for at-fork management.
// We do not use global variables because of initialization order issues
(ARROW-18383).
// Instead, a function-local static ensures the state is initialized
@@ -147,7 +163,11 @@ AtForkState* GetAtForkState() {
}; // namespace
void RegisterAtFork(std::weak_ptr<AtForkHandler> weak_handler) {
- GetAtForkState()->RegisterAtFork(std::move(weak_handler));
+ // Only fetch the atfork state (and thus lazily call pthread_atfork) if
enabled at all,
+ // to minimize potential nastiness with fork and threads.
+ if (IsAtForkEnabled()) {
+ GetAtForkState()->RegisterAtFork(std::move(weak_handler));
+ }
}
} // namespace internal
diff --git a/cpp/src/arrow/util/atfork_test.cc
b/cpp/src/arrow/util/atfork_test.cc
index 97910f9539..ea9bdca536 100644
--- a/cpp/src/arrow/util/atfork_test.cc
+++ b/cpp/src/arrow/util/atfork_test.cc
@@ -190,6 +190,9 @@ TEST_F(TestAtFork, SingleThread) {
ASSERT_THAT(child_after_, ElementsAre());
}
+// XXX we would like to test the ARROW_REGISTER_ATFORK environment variable,
+// but that would require spawning a test subprocess
+
# if !(defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) || \
defined(THREAD_SANITIZER))
diff --git a/cpp/src/arrow/util/fuzz_internal.cc
b/cpp/src/arrow/util/fuzz_internal.cc
index 935089b2bc..28d210333d 100644
--- a/cpp/src/arrow/util/fuzz_internal.cc
+++ b/cpp/src/arrow/util/fuzz_internal.cc
@@ -36,17 +36,16 @@ MemoryPool* fuzzing_memory_pool() {
void LogFuzzStatus(const Status& st, const uint8_t* data, int64_t size) {
static const int kVerbosity = []() {
- auto maybe_env_value = GetEnvVar("ARROW_FUZZING_VERBOSITY");
- if (maybe_env_value.status().IsKeyError()) {
- return 0;
+ auto maybe_env_value =
+ GetEnvVarInteger("ARROW_FUZZING_VERBOSITY", /*min_value=*/0,
/*max_value=*/1);
+ if (maybe_env_value.ok()) {
+ return static_cast<int>(*maybe_env_value);
}
- auto env_value = std::move(maybe_env_value).ValueOrDie();
- int32_t value;
- if (!ParseValue<Int32Type>(env_value.data(), env_value.length(), &value)) {
- Status::Invalid("Invalid value for ARROW_FUZZING_VERBOSITY: '",
env_value, "'")
- .Abort();
+ if (!maybe_env_value.status().IsKeyError()) {
+ maybe_env_value.status().Abort();
}
- return value;
+ // Quiet by default
+ return 0;
}();
if (!st.ok() && kVerbosity >= 1) {
diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc
index b3ef48d296..03acd8297d 100644
--- a/cpp/src/arrow/util/io_util.cc
+++ b/cpp/src/arrow/util/io_util.cc
@@ -99,6 +99,7 @@
#include "arrow/util/io_util.h"
#include "arrow/util/logging_internal.h"
#include "arrow/util/mutex.h"
+#include "arrow/util/value_parsing.h"
// For filename conversion
#if defined(_WIN32)
@@ -1762,19 +1763,28 @@ Result<std::string> GetEnvVar(std::string_view name) {
#ifdef _WIN32
// On Windows, getenv() reads an early copy of the process' environment
// which doesn't get updated when SetEnvironmentVariable() is called.
- constexpr int32_t bufsize = 2000;
- char c_str[bufsize];
- auto res = GetEnvironmentVariableA(name.data(), c_str, bufsize);
- if (res >= bufsize) {
- return Status::CapacityError("environment variable value too long");
- } else if (res == 0) {
- return Status::KeyError("environment variable '", name, "'undefined");
- }
- return std::string(c_str);
+ std::string value(100, '\0');
+
+ uint32_t res = GetEnvironmentVariableA(name.data(), value.data(),
+ static_cast<uint32_t>(value.size()));
+ if (res >= value.size()) {
+ // Value buffer too small, need to upsize
+ // (`res` includes the null-terminating character in this case)
+ value.resize(res);
+ res = GetEnvironmentVariableA(name.data(), value.data(),
+ static_cast<uint32_t>(value.size()));
+ }
+ if (res == 0) {
+ return Status::KeyError("environment variable '", name, "' undefined");
+ }
+ // On success, `res` does not include the null-terminating character
+ DCHECK_EQ(value.data()[res], 0);
+ value.resize(res);
+ return value;
#else
char* c_str = getenv(name.data());
if (c_str == nullptr) {
- return Status::KeyError("environment variable '", name, "'undefined");
+ return Status::KeyError("environment variable '", name, "' undefined");
}
return std::string(c_str);
#endif
@@ -1782,18 +1792,25 @@ Result<std::string> GetEnvVar(std::string_view name) {
#ifdef _WIN32
Result<NativePathString> GetEnvVarNative(std::string_view name) {
- NativePathString w_name;
- constexpr int32_t bufsize = 2000;
- wchar_t w_str[bufsize];
+ ARROW_ASSIGN_OR_RAISE(std::wstring w_name, StringToNative(name));
+ std::wstring value(100, '\0');
- ARROW_ASSIGN_OR_RAISE(w_name, StringToNative(name));
- auto res = GetEnvironmentVariableW(w_name.c_str(), w_str, bufsize);
- if (res >= bufsize) {
- return Status::CapacityError("environment variable value too long");
- } else if (res == 0) {
- return Status::KeyError("environment variable '", name, "'undefined");
+ uint32_t res = GetEnvironmentVariableW(w_name.data(), value.data(),
+ static_cast<uint32_t>(value.size()));
+ if (res >= value.size()) {
+ // Value buffer too small, need to upsize
+ // (`res` includes the null-terminating character in this case)
+ value.resize(res);
+ res = GetEnvironmentVariableW(w_name.data(), value.data(),
+ static_cast<uint32_t>(value.size()));
+ }
+ if (res == 0) {
+ return Status::KeyError("environment variable '", name, "' undefined");
}
- return NativePathString(w_str);
+ // On success, `res` does not include the null-terminating character
+ DCHECK_EQ(value.data()[res], 0);
+ value.resize(res);
+ return value;
}
#else
@@ -1804,6 +1821,18 @@ Result<NativePathString>
GetEnvVarNative(std::string_view name) {
#endif
+Result<int64_t> GetEnvVarInteger(std::string_view name, std::optional<int64_t>
min_value,
+ std::optional<int64_t> max_value) {
+ ARROW_ASSIGN_OR_RAISE(auto env_string, GetEnvVar(name));
+ int64_t value;
+ if (!ParseValue<Int64Type>(env_string.data(), env_string.length(), &value) ||
+ (min_value.has_value() && value < *min_value) ||
+ (max_value.has_value() && value > *max_value)) {
+ return Status::Invalid("Invalid value for ", name, ": '", env_string, "'");
+ }
+ return value;
+}
+
Status SetEnvVar(std::string_view name, std::string_view value) {
#ifdef _WIN32
if (SetEnvironmentVariableA(name.data(), value.data())) {
diff --git a/cpp/src/arrow/util/io_util.h b/cpp/src/arrow/util/io_util.h
index 56bd4eff3d..fa53c0dc67 100644
--- a/cpp/src/arrow/util/io_util.h
+++ b/cpp/src/arrow/util/io_util.h
@@ -244,6 +244,12 @@ ARROW_EXPORT
Result<std::string> GetEnvVar(std::string_view name);
ARROW_EXPORT
Result<NativePathString> GetEnvVarNative(std::string_view name);
+// Returns KeyError if the environment variable doesn't exist,
+// Invalid if it's not a valid integer in the given range.
+ARROW_EXPORT
+Result<int64_t> GetEnvVarInteger(std::string_view name,
+ std::optional<int64_t> min_value = {},
+ std::optional<int64_t> max_value = {});
ARROW_EXPORT
Status SetEnvVar(std::string_view name, std::string_view value);
diff --git a/cpp/src/arrow/util/io_util_test.cc
b/cpp/src/arrow/util/io_util_test.cc
index de8458dc11..44188b3f2e 100644
--- a/cpp/src/arrow/util/io_util_test.cc
+++ b/cpp/src/arrow/util/io_util_test.cc
@@ -1134,5 +1134,44 @@ TEST(CpuAffinity, NumberOfCores) {
#endif
}
+TEST(Environment, GetEnvVar) {
+ // An environment variable that should exist on roughly all platforms
+ ASSERT_OK_AND_ASSIGN(auto v, GetEnvVar("PATH"));
+ ASSERT_FALSE(v.empty());
+ ASSERT_OK_AND_ASSIGN(auto w, GetEnvVarNative("PATH"));
+ ASSERT_FALSE(w.empty());
+ // An environment variable that most probably does not exist
+ ASSERT_RAISES(KeyError, GetEnvVar("BZZT_NONEXISTENT_VAR"));
+ ASSERT_RAISES(KeyError, GetEnvVarNative("BZZT_NONEXISTENT_VAR"));
+ // (we try not to rely on EnvVarGuard here as that would be circular)
+}
+
+TEST(Environment, GetEnvVarInteger) {
+ {
+ EnvVarGuard guard("FOOBAR", "5");
+ ASSERT_OK_AND_EQ(5, GetEnvVarInteger("FOOBAR"));
+ ASSERT_OK_AND_EQ(5, GetEnvVarInteger("FOOBAR", /*min_value=*/5,
/*max_value=*/7));
+ ASSERT_RAISES(Invalid, GetEnvVarInteger("FOOBAR", /*min_value=*/6,
/*max_value=*/7));
+ ASSERT_RAISES(Invalid, GetEnvVarInteger("FOOBAR", /*min_value=*/3,
/*max_value=*/4));
+ }
+ {
+ EnvVarGuard guard("FOOBAR", "BAZ");
+ ASSERT_RAISES(Invalid, GetEnvVarInteger("FOOBAR"));
+ }
+ {
+ EnvVarGuard guard("FOOBAR", std::nullopt);
+ ASSERT_RAISES(KeyError, GetEnvVarInteger("FOOBAR"));
+ }
+}
+
+TEST(Environment, SetEnvVar) {
+ EnvVarGuard guard("FOOBAR", "one");
+ ASSERT_OK_AND_EQ("one", GetEnvVar("FOOBAR"));
+ ASSERT_OK(SetEnvVar("FOOBAR", "two"));
+ ASSERT_OK_AND_EQ("two", GetEnvVar("FOOBAR"));
+ ASSERT_OK(DelEnvVar("FOOBAR"));
+ ASSERT_RAISES(KeyError, GetEnvVar("FOOBAR"));
+}
+
} // namespace internal
} // namespace arrow
diff --git a/docs/source/cpp/env_vars.rst b/docs/source/cpp/env_vars.rst
index 20df98c5ec..6ee6993e2b 100644
--- a/docs/source/cpp/env_vars.rst
+++ b/docs/source/cpp/env_vars.rst
@@ -87,6 +87,18 @@ that changing their value later will have an effect.
``libhdfs.dylib`` on macOS, ``libhdfs.so`` on other platforms).
Alternatively, one can set :envvar:`HADOOP_HOME`.
+.. envvar:: ARROW_REGISTER_ATFORK
+
+ **Experimental**. An integer value to enable or disable the registration
+ of at-fork handlers. These are enabled by default or explicitly using the
+ value "1"; use "0" to disable.
+
+ If enabled, at-fork handlers make Arrow C++ compatible with the use of the
+ ``fork()`` system call, such as by Python's :python:mod:`multiprocessing`,
+ but at the expense of executing
+ `potentially unsafe code
<https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_atfork.html>`__
+ in a forked child process if the parent process is multi-threaded.
+
.. envvar:: ARROW_S3_LOG_LEVEL
Controls the verbosity of logging produced by S3 calls. Defaults to
``FATAL``
diff --git a/python/pyarrow/tests/test_misc.py
b/python/pyarrow/tests/test_misc.py
index 64f45d8bed..d6a2fe6a27 100644
--- a/python/pyarrow/tests/test_misc.py
+++ b/python/pyarrow/tests/test_misc.py
@@ -80,8 +80,7 @@ def test_env_var_io_thread_count():
for v in ('-1', 'z'):
out, err = run_with_env_var(v)
assert out.strip() == '8' # default value
- assert ("ARROW_IO_THREADS does not contain a valid number of threads"
- in err.strip())
+ assert "Invalid value for ARROW_IO_THREADS" in err.strip()
def test_build_info():