This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new ebaaf07adb GH-49146: [C++] Add option to disable atfork handlers 
(#49148)
ebaaf07adb is described below

commit ebaaf07adbd302e95e393b5b77d78c1c97ea3b70
Author: Antoine Pitrou <[email protected]>
AuthorDate: Tue Feb 17 17:22:44 2026 +0100

    GH-49146: [C++] Add option to disable atfork handlers (#49148)
    
    ### Rationale for this change
    
    The atfork handlers we register in Arrow C++ are generally useful if the 
Arrow APIs are meant to be used from the child process, but they also have the 
unfortunate effect of executing non-async-signal-safe code in the child process 
even if Arrow is not be used there. That is [not allowed by 
POSIX](https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_atfork.html)
 if the parent process is multi-threaded.
    
    There are situations where fork() is only called just before exec(), and 
therefore it is not necessary to run any atfork handler.
    
    ### What changes are included in this PR?
    
    1. Add a `GetEnvVarInteger` utility function to automate parsing of a 
numeric environment variable
    2. Remove hard-coded size limitations for environment variable values on 
Windows
    3. Add basic unit tests for our APIs for getting and setting environment 
variables
    4. Add an environment variable `ARROW_REGISTER_ATFORK` to disable the 
registration of atfork handlers at runtime
    
    ### Are these changes tested?
    
    The new environment variable cannot be easily tested automatically, so I've 
checked it manually.
    
    ### Are there any user-facing changes?
    
    No, only a new feature.
    
    * GitHub Issue: #49146
    
    Authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/filesystem/s3fs.cc      |  8 ++--
 cpp/src/arrow/io/interfaces.cc        | 24 ++++--------
 cpp/src/arrow/testing/gtest_util.cc   | 19 ++++++----
 cpp/src/arrow/testing/gtest_util.h    | 21 +++++++----
 cpp/src/arrow/util/atfork_internal.cc | 22 ++++++++++-
 cpp/src/arrow/util/atfork_test.cc     |  3 ++
 cpp/src/arrow/util/fuzz_internal.cc   | 17 ++++-----
 cpp/src/arrow/util/io_util.cc         | 69 +++++++++++++++++++++++++----------
 cpp/src/arrow/util/io_util.h          |  6 +++
 cpp/src/arrow/util/io_util_test.cc    | 39 ++++++++++++++++++++
 docs/source/cpp/env_vars.rst          | 12 ++++++
 python/pyarrow/tests/test_misc.py     |  3 +-
 12 files changed, 176 insertions(+), 67 deletions(-)

diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index f75fd970a1..0c15f6f184 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -119,7 +119,6 @@
 #include "arrow/util/string.h"
 #include "arrow/util/task_group.h"
 #include "arrow/util/thread_pool.h"
-#include "arrow/util/value_parsing.h"
 
 namespace arrow::fs {
 
@@ -3579,9 +3578,10 @@ S3GlobalOptions S3GlobalOptions::Defaults() {
     log_level = S3LogLevel::Off;
   }
 
-  value = arrow::internal::GetEnvVar("ARROW_S3_THREADS").ValueOr("1");
-  if (uint32_t u; ::arrow::internal::ParseUnsigned(value.data(), value.size(), 
&u)) {
-    num_event_loop_threads = u;
+  auto maybe_num_threads =
+      arrow::internal::GetEnvVarInteger("ARROW_S3_THREADS", /*min_value=*/1);
+  if (maybe_num_threads.ok()) {
+    num_event_loop_threads = static_cast<int>(*maybe_num_threads);
   }
   return S3GlobalOptions{log_level, num_event_loop_threads};
 }
diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc
index 12c124ce21..cdd2470b62 100644
--- a/cpp/src/arrow/io/interfaces.cc
+++ b/cpp/src/arrow/io/interfaces.cc
@@ -390,23 +390,15 @@ namespace {
 constexpr int kDefaultNumIoThreads = 8;
 
 std::shared_ptr<ThreadPool> MakeIOThreadPool() {
-  int threads = 0;
-  auto maybe_env_var = ::arrow::internal::GetEnvVar("ARROW_IO_THREADS");
-  if (maybe_env_var.ok()) {
-    auto str = *std::move(maybe_env_var);
-    if (!str.empty()) {
-      try {
-        threads = std::stoi(str);
-      } catch (...) {
-      }
-      if (threads <= 0) {
-        ARROW_LOG(WARNING)
-            << "ARROW_IO_THREADS does not contain a valid number of threads "
-               "(should be an integer > 0)";
-      }
-    }
+  int threads = kDefaultNumIoThreads;
+  auto maybe_num_threads = ::arrow::internal::GetEnvVarInteger(
+      "ARROW_IO_THREADS", /*min_value=*/1, 
/*max_value=*/std::numeric_limits<int>::max());
+  if (maybe_num_threads.ok()) {
+    threads = static_cast<int>(*maybe_num_threads);
+  } else if (!maybe_num_threads.status().IsKeyError()) {
+    maybe_num_threads.status().Warn();
   }
-  auto maybe_pool = ThreadPool::MakeEternal(threads > 0 ? threads : 
kDefaultNumIoThreads);
+  auto maybe_pool = ThreadPool::MakeEternal(threads);
   if (!maybe_pool.ok()) {
     maybe_pool.status().Abort("Failed to create global IO thread pool");
   }
diff --git a/cpp/src/arrow/testing/gtest_util.cc 
b/cpp/src/arrow/testing/gtest_util.cc
index 1acc47a99d..0e2cbdb644 100644
--- a/cpp/src/arrow/testing/gtest_util.cc
+++ b/cpp/src/arrow/testing/gtest_util.cc
@@ -660,21 +660,24 @@ LocaleGuard::LocaleGuard(const char* new_locale) : 
impl_(new Impl(new_locale)) {
 
 LocaleGuard::~LocaleGuard() {}
 
-EnvVarGuard::EnvVarGuard(const std::string& name, const std::string& value)
-    : name_(name) {
-  auto maybe_value = arrow::internal::GetEnvVar(name);
+EnvVarGuard::EnvVarGuard(std::string name, std::optional<std::string> value)
+    : name_(std::move(name)) {
+  auto maybe_value = arrow::internal::GetEnvVar(name_);
   if (maybe_value.ok()) {
-    was_set_ = true;
     old_value_ = *std::move(maybe_value);
   } else {
-    was_set_ = false;
+    old_value_ = std::nullopt;
+  }
+  if (value.has_value()) {
+    ARROW_CHECK_OK(arrow::internal::SetEnvVar(name_, *value));
+  } else {
+    ARROW_CHECK_OK(arrow::internal::DelEnvVar(name_));
   }
-  ARROW_CHECK_OK(arrow::internal::SetEnvVar(name, value));
 }
 
 EnvVarGuard::~EnvVarGuard() {
-  if (was_set_) {
-    ARROW_CHECK_OK(arrow::internal::SetEnvVar(name_, old_value_));
+  if (old_value_.has_value()) {
+    ARROW_CHECK_OK(arrow::internal::SetEnvVar(name_, *old_value_));
   } else {
     ARROW_CHECK_OK(arrow::internal::DelEnvVar(name_));
   }
diff --git a/cpp/src/arrow/testing/gtest_util.h 
b/cpp/src/arrow/testing/gtest_util.h
index 62bf907a2d..b84d253a89 100644
--- a/cpp/src/arrow/testing/gtest_util.h
+++ b/cpp/src/arrow/testing/gtest_util.h
@@ -418,9 +418,11 @@ ARROW_TESTING_EXPORT
 void AssertChildExit(int child_pid, int expected_exit_status = 0);
 #endif
 
-// A RAII-style object that switches to a new locale, and switches back
-// to the old locale when going out of scope.  Doesn't do anything if the
-// new locale doesn't exist on the local machine.
+// A RAII-style object that temporarily switches to a new locale
+//
+// The guard switches back to the old locale when going out of scope.
+// It doesn't do anything if the new locale doesn't exist on the local machine.
+//
 // ATTENTION: may crash with an assertion failure on Windows debug builds.
 // See ARROW-6108, also https://gerrit.libreoffice.org/#/c/54110/
 class ARROW_TESTING_EXPORT LocaleGuard {
@@ -433,15 +435,20 @@ class ARROW_TESTING_EXPORT LocaleGuard {
   std::unique_ptr<Impl> impl_;
 };
 
+// A RAII-style object that temporarily sets an environment variable
+//
+// The guard restores the variable's previous value when going out of scope,
+// or deletes the variable if it was not initially set.
+// The environment variable can also be temporarily deleted if std::nullopt
+// is passed instead of a string value.
 class ARROW_TESTING_EXPORT EnvVarGuard {
  public:
-  EnvVarGuard(const std::string& name, const std::string& value);
+  EnvVarGuard(std::string name, std::optional<std::string> value);
   ~EnvVarGuard();
 
  protected:
-  const std::string name_;
-  std::string old_value_;
-  bool was_set_;
+  std::string name_;
+  std::optional<std::string> old_value_;
 };
 
 namespace internal {
diff --git a/cpp/src/arrow/util/atfork_internal.cc 
b/cpp/src/arrow/util/atfork_internal.cc
index 7772f1c62b..fa3a09d0a2 100644
--- a/cpp/src/arrow/util/atfork_internal.cc
+++ b/cpp/src/arrow/util/atfork_internal.cc
@@ -34,6 +34,22 @@ namespace internal {
 
 namespace {
 
+bool IsAtForkEnabled() {
+  static bool is_enabled = [] {
+    auto maybe_value =
+        GetEnvVarInteger("ARROW_REGISTER_ATFORK", /*min_value=*/0, 
/*max_value=*/1);
+    if (maybe_value.ok()) {
+      return *maybe_value != 0;
+    }
+    if (!maybe_value.status().IsKeyError()) {
+      maybe_value.status().Warn();
+    }
+    // Enabled by default
+    return true;
+  }();
+  return is_enabled;
+}
+
 // Singleton state for at-fork management.
 // We do not use global variables because of initialization order issues 
(ARROW-18383).
 // Instead, a function-local static ensures the state is initialized
@@ -147,7 +163,11 @@ AtForkState* GetAtForkState() {
 };  // namespace
 
 void RegisterAtFork(std::weak_ptr<AtForkHandler> weak_handler) {
-  GetAtForkState()->RegisterAtFork(std::move(weak_handler));
+  // Only fetch the atfork state (and thus lazily call pthread_atfork) if 
enabled at all,
+  // to minimize potential nastiness with fork and threads.
+  if (IsAtForkEnabled()) {
+    GetAtForkState()->RegisterAtFork(std::move(weak_handler));
+  }
 }
 
 }  // namespace internal
diff --git a/cpp/src/arrow/util/atfork_test.cc 
b/cpp/src/arrow/util/atfork_test.cc
index 97910f9539..ea9bdca536 100644
--- a/cpp/src/arrow/util/atfork_test.cc
+++ b/cpp/src/arrow/util/atfork_test.cc
@@ -190,6 +190,9 @@ TEST_F(TestAtFork, SingleThread) {
   ASSERT_THAT(child_after_, ElementsAre());
 }
 
+// XXX we would like to test the ARROW_REGISTER_ATFORK environment variable,
+// but that would require spawning a test subprocess
+
 #  if !(defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) || \
         defined(THREAD_SANITIZER))
 
diff --git a/cpp/src/arrow/util/fuzz_internal.cc 
b/cpp/src/arrow/util/fuzz_internal.cc
index 935089b2bc..28d210333d 100644
--- a/cpp/src/arrow/util/fuzz_internal.cc
+++ b/cpp/src/arrow/util/fuzz_internal.cc
@@ -36,17 +36,16 @@ MemoryPool* fuzzing_memory_pool() {
 
 void LogFuzzStatus(const Status& st, const uint8_t* data, int64_t size) {
   static const int kVerbosity = []() {
-    auto maybe_env_value = GetEnvVar("ARROW_FUZZING_VERBOSITY");
-    if (maybe_env_value.status().IsKeyError()) {
-      return 0;
+    auto maybe_env_value =
+        GetEnvVarInteger("ARROW_FUZZING_VERBOSITY", /*min_value=*/0, 
/*max_value=*/1);
+    if (maybe_env_value.ok()) {
+      return static_cast<int>(*maybe_env_value);
     }
-    auto env_value = std::move(maybe_env_value).ValueOrDie();
-    int32_t value;
-    if (!ParseValue<Int32Type>(env_value.data(), env_value.length(), &value)) {
-      Status::Invalid("Invalid value for ARROW_FUZZING_VERBOSITY: '", 
env_value, "'")
-          .Abort();
+    if (!maybe_env_value.status().IsKeyError()) {
+      maybe_env_value.status().Abort();
     }
-    return value;
+    // Quiet by default
+    return 0;
   }();
 
   if (!st.ok() && kVerbosity >= 1) {
diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc
index b3ef48d296..03acd8297d 100644
--- a/cpp/src/arrow/util/io_util.cc
+++ b/cpp/src/arrow/util/io_util.cc
@@ -99,6 +99,7 @@
 #include "arrow/util/io_util.h"
 #include "arrow/util/logging_internal.h"
 #include "arrow/util/mutex.h"
+#include "arrow/util/value_parsing.h"
 
 // For filename conversion
 #if defined(_WIN32)
@@ -1762,19 +1763,28 @@ Result<std::string> GetEnvVar(std::string_view name) {
 #ifdef _WIN32
   // On Windows, getenv() reads an early copy of the process' environment
   // which doesn't get updated when SetEnvironmentVariable() is called.
-  constexpr int32_t bufsize = 2000;
-  char c_str[bufsize];
-  auto res = GetEnvironmentVariableA(name.data(), c_str, bufsize);
-  if (res >= bufsize) {
-    return Status::CapacityError("environment variable value too long");
-  } else if (res == 0) {
-    return Status::KeyError("environment variable '", name, "'undefined");
-  }
-  return std::string(c_str);
+  std::string value(100, '\0');
+
+  uint32_t res = GetEnvironmentVariableA(name.data(), value.data(),
+                                         static_cast<uint32_t>(value.size()));
+  if (res >= value.size()) {
+    // Value buffer too small, need to upsize
+    // (`res` includes the null-terminating character in this case)
+    value.resize(res);
+    res = GetEnvironmentVariableA(name.data(), value.data(),
+                                  static_cast<uint32_t>(value.size()));
+  }
+  if (res == 0) {
+    return Status::KeyError("environment variable '", name, "' undefined");
+  }
+  // On success, `res` does not include the null-terminating character
+  DCHECK_EQ(value.data()[res], 0);
+  value.resize(res);
+  return value;
 #else
   char* c_str = getenv(name.data());
   if (c_str == nullptr) {
-    return Status::KeyError("environment variable '", name, "'undefined");
+    return Status::KeyError("environment variable '", name, "' undefined");
   }
   return std::string(c_str);
 #endif
@@ -1782,18 +1792,25 @@ Result<std::string> GetEnvVar(std::string_view name) {
 
 #ifdef _WIN32
 Result<NativePathString> GetEnvVarNative(std::string_view name) {
-  NativePathString w_name;
-  constexpr int32_t bufsize = 2000;
-  wchar_t w_str[bufsize];
+  ARROW_ASSIGN_OR_RAISE(std::wstring w_name, StringToNative(name));
+  std::wstring value(100, '\0');
 
-  ARROW_ASSIGN_OR_RAISE(w_name, StringToNative(name));
-  auto res = GetEnvironmentVariableW(w_name.c_str(), w_str, bufsize);
-  if (res >= bufsize) {
-    return Status::CapacityError("environment variable value too long");
-  } else if (res == 0) {
-    return Status::KeyError("environment variable '", name, "'undefined");
+  uint32_t res = GetEnvironmentVariableW(w_name.data(), value.data(),
+                                         static_cast<uint32_t>(value.size()));
+  if (res >= value.size()) {
+    // Value buffer too small, need to upsize
+    // (`res` includes the null-terminating character in this case)
+    value.resize(res);
+    res = GetEnvironmentVariableW(w_name.data(), value.data(),
+                                  static_cast<uint32_t>(value.size()));
+  }
+  if (res == 0) {
+    return Status::KeyError("environment variable '", name, "' undefined");
   }
-  return NativePathString(w_str);
+  // On success, `res` does not include the null-terminating character
+  DCHECK_EQ(value.data()[res], 0);
+  value.resize(res);
+  return value;
 }
 
 #else
@@ -1804,6 +1821,18 @@ Result<NativePathString> 
GetEnvVarNative(std::string_view name) {
 
 #endif
 
+Result<int64_t> GetEnvVarInteger(std::string_view name, std::optional<int64_t> 
min_value,
+                                 std::optional<int64_t> max_value) {
+  ARROW_ASSIGN_OR_RAISE(auto env_string, GetEnvVar(name));
+  int64_t value;
+  if (!ParseValue<Int64Type>(env_string.data(), env_string.length(), &value) ||
+      (min_value.has_value() && value < *min_value) ||
+      (max_value.has_value() && value > *max_value)) {
+    return Status::Invalid("Invalid value for ", name, ": '", env_string, "'");
+  }
+  return value;
+}
+
 Status SetEnvVar(std::string_view name, std::string_view value) {
 #ifdef _WIN32
   if (SetEnvironmentVariableA(name.data(), value.data())) {
diff --git a/cpp/src/arrow/util/io_util.h b/cpp/src/arrow/util/io_util.h
index 56bd4eff3d..fa53c0dc67 100644
--- a/cpp/src/arrow/util/io_util.h
+++ b/cpp/src/arrow/util/io_util.h
@@ -244,6 +244,12 @@ ARROW_EXPORT
 Result<std::string> GetEnvVar(std::string_view name);
 ARROW_EXPORT
 Result<NativePathString> GetEnvVarNative(std::string_view name);
+// Returns KeyError if the environment variable doesn't exist,
+// Invalid if it's not a valid integer in the given range.
+ARROW_EXPORT
+Result<int64_t> GetEnvVarInteger(std::string_view name,
+                                 std::optional<int64_t> min_value = {},
+                                 std::optional<int64_t> max_value = {});
 
 ARROW_EXPORT
 Status SetEnvVar(std::string_view name, std::string_view value);
diff --git a/cpp/src/arrow/util/io_util_test.cc 
b/cpp/src/arrow/util/io_util_test.cc
index de8458dc11..44188b3f2e 100644
--- a/cpp/src/arrow/util/io_util_test.cc
+++ b/cpp/src/arrow/util/io_util_test.cc
@@ -1134,5 +1134,44 @@ TEST(CpuAffinity, NumberOfCores) {
 #endif
 }
 
+TEST(Environment, GetEnvVar) {
+  // An environment variable that should exist on roughly all platforms
+  ASSERT_OK_AND_ASSIGN(auto v, GetEnvVar("PATH"));
+  ASSERT_FALSE(v.empty());
+  ASSERT_OK_AND_ASSIGN(auto w, GetEnvVarNative("PATH"));
+  ASSERT_FALSE(w.empty());
+  // An environment variable that most probably does not exist
+  ASSERT_RAISES(KeyError, GetEnvVar("BZZT_NONEXISTENT_VAR"));
+  ASSERT_RAISES(KeyError, GetEnvVarNative("BZZT_NONEXISTENT_VAR"));
+  // (we try not to rely on EnvVarGuard here as that would be circular)
+}
+
+TEST(Environment, GetEnvVarInteger) {
+  {
+    EnvVarGuard guard("FOOBAR", "5");
+    ASSERT_OK_AND_EQ(5, GetEnvVarInteger("FOOBAR"));
+    ASSERT_OK_AND_EQ(5, GetEnvVarInteger("FOOBAR", /*min_value=*/5, 
/*max_value=*/7));
+    ASSERT_RAISES(Invalid, GetEnvVarInteger("FOOBAR", /*min_value=*/6, 
/*max_value=*/7));
+    ASSERT_RAISES(Invalid, GetEnvVarInteger("FOOBAR", /*min_value=*/3, 
/*max_value=*/4));
+  }
+  {
+    EnvVarGuard guard("FOOBAR", "BAZ");
+    ASSERT_RAISES(Invalid, GetEnvVarInteger("FOOBAR"));
+  }
+  {
+    EnvVarGuard guard("FOOBAR", std::nullopt);
+    ASSERT_RAISES(KeyError, GetEnvVarInteger("FOOBAR"));
+  }
+}
+
+TEST(Environment, SetEnvVar) {
+  EnvVarGuard guard("FOOBAR", "one");
+  ASSERT_OK_AND_EQ("one", GetEnvVar("FOOBAR"));
+  ASSERT_OK(SetEnvVar("FOOBAR", "two"));
+  ASSERT_OK_AND_EQ("two", GetEnvVar("FOOBAR"));
+  ASSERT_OK(DelEnvVar("FOOBAR"));
+  ASSERT_RAISES(KeyError, GetEnvVar("FOOBAR"));
+}
+
 }  // namespace internal
 }  // namespace arrow
diff --git a/docs/source/cpp/env_vars.rst b/docs/source/cpp/env_vars.rst
index 20df98c5ec..6ee6993e2b 100644
--- a/docs/source/cpp/env_vars.rst
+++ b/docs/source/cpp/env_vars.rst
@@ -87,6 +87,18 @@ that changing their value later will have an effect.
    ``libhdfs.dylib`` on macOS, ``libhdfs.so`` on other platforms).
    Alternatively, one can set :envvar:`HADOOP_HOME`.
 
+.. envvar:: ARROW_REGISTER_ATFORK
+
+   **Experimental**. An integer value to enable or disable the registration
+   of at-fork handlers. These are enabled by default or explicitly using the
+   value "1"; use "0" to disable.
+
+   If enabled, at-fork handlers make Arrow C++ compatible with the use of the
+   ``fork()`` system call, such as by Python's :python:mod:`multiprocessing`,
+   but at the expense of executing
+   `potentially unsafe code 
<https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_atfork.html>`__
+   in a forked child process if the parent process is multi-threaded.
+
 .. envvar:: ARROW_S3_LOG_LEVEL
 
    Controls the verbosity of logging produced by S3 calls. Defaults to 
``FATAL``
diff --git a/python/pyarrow/tests/test_misc.py 
b/python/pyarrow/tests/test_misc.py
index 64f45d8bed..d6a2fe6a27 100644
--- a/python/pyarrow/tests/test_misc.py
+++ b/python/pyarrow/tests/test_misc.py
@@ -80,8 +80,7 @@ def test_env_var_io_thread_count():
     for v in ('-1', 'z'):
         out, err = run_with_env_var(v)
         assert out.strip() == '8'  # default value
-        assert ("ARROW_IO_THREADS does not contain a valid number of threads"
-                in err.strip())
+        assert "Invalid value for ARROW_IO_THREADS" in err.strip()
 
 
 def test_build_info():

Reply via email to