This is an automated email from the ASF dual-hosted git repository.
hello-stephen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3ece7b5ddc6 [Enhancement](pyudf) Enhance Python process poll init and
repair (#64039)
3ece7b5ddc6 is described below
commit 3ece7b5ddc6b59700ea584a98538c97962c16d23
Author: linrrarity <[email protected]>
AuthorDate: Tue Jun 9 14:16:42 2026 +0800
[Enhancement](pyudf) Enhance Python process poll init and repair (#64039)
Problem Summary:
Python UDF process pool initialization previously required the whole
pool to finish initialization before BE could continue serving the
query.
In abnormal environments, Python process startup may hang or take a very
long time in paths such as:
- `fork` / child process creation
- waiting for the Python Flight socket to appear
- terminating and waiting for a failed child process to exit
When one process slot gets stuck, the whole process pool initialization
can be blocked. As a result, FE may hit the send fragments RPC timeout
before BE returns a meaningful Python UDF error: `RpcException, msg:
timeout when waiting for send fragments rpc, query timeout:900, left
timeout for this operation:30`.
be.log:
```text
Initializing Python process pool for version 3.8.19 with 8 processes
Python process pool initialization progress for version 3.8.19:
waiting_slot=4/8, success=3, failed=0, elapsed_ms=20508
Python process pool initialization progress for version 3.8.19:
waiting_slot=4/8, success=3, failed=0, elapsed_ms=40508
Python process pool initialization progress for version 3.8.19:
waiting_slot=4/8, success=3, failed=0, elapsed_ms=60508
Python process pool initialization progress for version 3.8.19:
waiting_slot=4/8, success=3, failed=0, elapsed_ms=80508
Python process pool initialization progress for version 3.8.19:
waiting_slot=4/8, success=3, failed=0, elapsed_ms=100508
Python process pool initialization progress for version 3.8.19:
waiting_slot=4/8, success=3, failed=0, elapsed_ms=120508
```
### Solution
Change Python process pool initialization from "wait until all processes
are created" to "return once at least one usable process is available".
The pool no longer treats full-size initialization as a prerequisite for
serving queries. Once one Python process is alive, the current query can
proceed. Missing or failed process slots are repaired asynchronously by
the existing health check / repair path.
- Bound process pool initialization time, so BE can return
`SERVICE_UNAVAILABLE` before FE send fragments RPC timeout.
- Allow partial pool availability: initialization succeeds as long as
one usable Python process exists.
- Mark the first initialization round as completed after success or
timeout, then rely on health check / repair to fill missing slots.
- Add bounded wait/reap logic for Python child shutdown to avoid
blocking indefinitely in `wait`.
- Protect late init / repair workers from writing back after shutdown,
and discard late duplicate processes safely.
- Share repair guarding between foreground repair and health check to
avoid duplicate repair pressure.
---
be/src/udf/python/python_server.cpp | 570 ++++++++++++++++++-------
be/src/udf/python/python_server.h | 50 ++-
be/src/udf/python/python_udf_runtime.cpp | 220 +++++++++-
be/src/udf/python/python_udf_runtime.h | 15 +-
be/test/udf/python/python_server_test.cpp | 364 +++++++++++++++-
be/test/udf/python/python_udf_runtime_test.cpp | 89 ++++
6 files changed, 1104 insertions(+), 204 deletions(-)
diff --git a/be/src/udf/python/python_server.cpp
b/be/src/udf/python/python_server.cpp
index 5ea1ef41409..cf78591a2ef 100644
--- a/be/src/udf/python/python_server.cpp
+++ b/be/src/udf/python/python_server.cpp
@@ -21,19 +21,22 @@
#include <butil/fd_utility.h>
#include <dirent.h>
#include <fmt/core.h>
+#include <signal.h>
#include <sys/poll.h>
#include <sys/stat.h>
+#include <unistd.h>
+#include <algorithm>
#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <chrono>
#include <fstream>
-#include <future>
#include <thread>
#include "arrow/flight/client.h"
#include "common/config.h"
#include "common/status.h"
+#include "runtime/thread_context.h"
#include "udf/python/python_udaf_client.h"
#include "udf/python/python_udf_client.h"
#include "udf/python/python_udtf_client.h"
@@ -41,9 +44,15 @@
namespace doris {
-std::shared_ptr<PythonServerManager::VersionedProcessPool>
+Result<std::shared_ptr<PythonServerManager::VersionedProcessPool>>
PythonServerManager::_get_or_create_process_pool(const PythonVersion& version)
{
std::lock_guard<std::mutex> lock(_pools_mutex);
+ // shutdown() owns the manager lifecycle. Once it starts, creating a new
pool would let detached
+ // init workers publish Python processes that the manager no longer tracks.
+ if (_shutdown_flag.load(std::memory_order_acquire)) {
+ return ResultError(Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
+ "Python server manager is shutting down"));
+ }
auto& pool = _process_pools[version];
if (!pool) {
pool = std::make_shared<VersionedProcessPool>();
@@ -66,18 +75,53 @@ PythonServerManager::_snapshot_process_pools() {
void PythonServerManager::set_process_pool_for_test(const PythonVersion&
version,
std::vector<ProcessPtr>
processes,
bool initialized) {
- auto versioned_pool = _get_or_create_process_pool(version);
+ auto versioned_pool = _get_or_create_process_pool(version).value();
std::lock_guard<std::mutex> lock(versioned_pool->mutex);
versioned_pool->processes = std::move(processes);
- versioned_pool->initialized = initialized;
+ versioned_pool->state = initialized ? PoolState::INITIALIZED :
PoolState::UNINITIALIZED;
+ versioned_pool->has_available_process =
+ std::any_of(versioned_pool->processes.begin(),
versioned_pool->processes.end(),
+ [](const ProcessPtr& process) { return process &&
process->is_alive(); });
}
-std::vector<ProcessPtr>& PythonServerManager::process_pool_for_test(const
PythonVersion& version) {
- auto versioned_pool = _get_or_create_process_pool(version);
+std::vector<ProcessPtr> PythonServerManager::process_pool_snapshot_for_test(
+ const PythonVersion& version) {
+ auto versioned_pool = _get_or_create_process_pool(version).value();
+ std::lock_guard<std::mutex> lock(versioned_pool->mutex);
return versioned_pool->processes;
}
+
+bool PythonServerManager::process_pool_is_initializing_for_test(const
PythonVersion& version) {
+ auto versioned_pool = _get_or_create_process_pool(version).value();
+ std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+ return versioned_pool->state == PoolState::INITIALIZING;
+}
+
+bool PythonServerManager::process_pool_is_initialized_for_test(const
PythonVersion& version) {
+ auto versioned_pool = _get_or_create_process_pool(version).value();
+ std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+ return versioned_pool->state == PoolState::INITIALIZED;
+}
#endif
+bool PythonServerManager::_select_alive_process_from_pool(const
std::vector<ProcessPtr>& pool,
+ ProcessPtr* process)
{
+ auto alive_iter = std::min_element(pool.begin(), pool.end(),
+ [](const ProcessPtr& a, const
ProcessPtr& b) {
+ const bool a_alive = a &&
a->is_alive();
+ const bool b_alive = b &&
b->is_alive();
+ if (a_alive != b_alive) {
+ return a_alive > b_alive;
+ }
+ return a.use_count() <
b.use_count();
+ });
+ if (alive_iter == pool.end() || !*alive_iter ||
!(*alive_iter)->is_alive()) {
+ return false;
+ }
+ *process = *alive_iter;
+ return true;
+}
+
template <typename ClientType>
Status PythonServerManager::get_client(const PythonUDFMeta& func_meta, const
PythonVersion& version,
std::shared_ptr<ClientType>* client,
@@ -99,130 +143,199 @@ Status PythonServerManager::get_client(const
PythonUDFMeta& func_meta, const Pyt
Result<std::shared_ptr<PythonServerManager::VersionedProcessPool>>
PythonServerManager::_ensure_pool_initialized(const PythonVersion& version) {
- auto versioned_pool = _get_or_create_process_pool(version);
- std::lock_guard<std::mutex> lock(versioned_pool->mutex);
-
- // Check if already initialized
- if (versioned_pool->initialized) return versioned_pool;
-
- // 0 means use CPU core count as default, otherwise use the specified value
- int max_pool_size = config::max_python_process_num > 0 ?
config::max_python_process_num
- :
CpuInfo::num_cores();
-
- LOG(INFO) << "Initializing Python process pool for version " <<
version.to_string() << " with "
- << max_pool_size
- << " processes (config::max_python_process_num=" <<
config::max_python_process_num
- << ", CPU cores=" << CpuInfo::num_cores() << ")";
-
- std::vector<std::future<Status>> futures;
- std::vector<ProcessPtr> temp_processes(max_pool_size);
+ auto versioned_pool_result = _get_or_create_process_pool(version);
+ if (!versioned_pool_result.has_value()) {
+ return ResultError(versioned_pool_result.error());
+ }
+ auto versioned_pool = versioned_pool_result.value();
+ const int max_pool_size = config::max_python_process_num > 0 ?
config::max_python_process_num
+ :
CpuInfo::num_cores();
- for (int i = 0; i < max_pool_size; i++) {
- futures.push_back(std::async(std::launch::async, [this, &version, i,
&temp_processes]() {
- ProcessPtr process;
- Status s = fork(version, &process);
- if (s.ok()) {
- temp_processes[i] = std::move(process);
- }
- return s;
- }));
+ std::unique_lock<std::mutex> lock(versioned_pool->mutex);
+ if (versioned_pool->state == PoolState::INITIALIZED) {
+ return versioned_pool;
}
- int success_count = 0;
- int failure_count = 0;
- const auto init_start_time = std::chrono::steady_clock::now();
-#ifdef BE_TEST
- constexpr auto progress_log_interval = std::chrono::milliseconds(50);
-#else
- constexpr auto progress_log_interval = std::chrono::seconds(20);
-#endif
- for (int i = 0; i < max_pool_size; i++) {
- // Print init log every 20s until the current slot is ready.
- while (futures[i].wait_for(progress_log_interval) !=
std::future_status::ready) {
- const auto now = std::chrono::steady_clock::now();
- const auto total_elapsed_ms =
- std::chrono::duration_cast<std::chrono::milliseconds>(now
- init_start_time)
- .count();
- LOG(INFO) << "Python process pool initialization progress for
version "
- << version.to_string() << ": waiting_slot=" << (i + 1)
<< "/" << max_pool_size
- << ", success=" << success_count << ", failed=" <<
failure_count
- << ", elapsed_ms=" << total_elapsed_ms;
+ if (versioned_pool->state != PoolState::STOPPED) {
+ if (versioned_pool->state != PoolState::INITIALIZING) {
+ versioned_pool->state = PoolState::INITIALIZING;
+ versioned_pool->has_available_process = false;
+ versioned_pool->processes.resize(max_pool_size);
+ auto init_finished_count = std::make_shared<std::atomic<int>>(0);
+
+ LOG(INFO) << "Initializing Python process pool for version " <<
version.to_string()
+ << " with " << max_pool_size << " processes
(config::max_python_process_num="
+ << config::max_python_process_num << ", CPU cores=" <<
CpuInfo::num_cores()
+ << ")";
+
+ std::thread([this, versioned_pool, init_finished_count,
max_pool_size]() {
+ SCOPED_INIT_THREAD_CONTEXT();
+ std::unique_lock<std::mutex> lock(versioned_pool->mutex);
+ versioned_pool->cv.wait_for(
+ lock, PROCESS_POOL_INIT_TIMEOUT,
+ [&versioned_pool, init_finished_count,
max_pool_size]() {
+ return versioned_pool->state !=
PoolState::INITIALIZING ||
+
init_finished_count->load(std::memory_order_acquire) >=
+ max_pool_size;
+ });
+ if (versioned_pool->state == PoolState::INITIALIZING) {
+ if (versioned_pool->has_available_process) {
+ // Keep this under the pool lock. shutdown() must
acquire the same lock
+ // before it can destroy manager-owned health-check
state.
+ _start_health_check_thread();
+ versioned_pool->state = PoolState::INITIALIZED;
+ } else {
+ versioned_pool->state = PoolState::UNINITIALIZED;
+ }
+ }
+ versioned_pool->cv.notify_all();
+ }).detach();
+
+ for (int i = 0; i < max_pool_size; ++i) {
+ std::thread([version, versioned_pool, i, max_pool_size,
init_finished_count]() {
+ SCOPED_INIT_THREAD_CONTEXT();
+ ProcessPtr process;
+ Status status = PythonServerManager::fork(version,
&process);
+ const bool ok = status.ok() && process;
+ ProcessPtr process_to_shutdown;
+ {
+ std::lock_guard<std::mutex>
lock(versioned_pool->mutex);
+ // shutdown() and repair can race with detached init
workers after timeout.
+ // Late successful forks only fill slots that are
still empty or dead.
+ if (ok &&
+ (versioned_pool->state == PoolState::INITIALIZING
||
+ versioned_pool->state == PoolState::INITIALIZED)
&&
+ i < versioned_pool->processes.size() &&
+ (!versioned_pool->processes[i] ||
+ !versioned_pool->processes[i]->is_alive())) {
+ versioned_pool->processes[i] = std::move(process);
+ versioned_pool->has_available_process = true;
+ } else if (ok) {
+ process_to_shutdown = std::move(process);
+ } else [[unlikely]] {
+ LOG(WARNING) << "Failed to create Python process "
<< (i + 1) << "/"
+ << max_pool_size << " for version "
<< version.to_string()
+ << ": " << status.to_string();
+ }
+ }
+ init_finished_count->fetch_add(1,
std::memory_order_acq_rel);
+ versioned_pool->cv.notify_all();
+ if (process_to_shutdown) {
+ process_to_shutdown->shutdown();
+ }
+ }).detach();
+ }
}
- Status s = futures[i].get();
- if (s.ok() && temp_processes[i]) {
-
versioned_pool->processes.emplace_back(std::move(temp_processes[i]));
- success_count++;
- } else {
- failure_count++;
- LOG(WARNING) << "Failed to create Python process " << (i + 1) <<
"/" << max_pool_size
- << ": " << s.to_string();
+ // Wait only for the first usable process. INITIALIZED is set later by
the last init worker
+ // after every slot has attempted initialization.
+ versioned_pool->cv.wait_for(lock, PROCESS_POOL_INIT_TIMEOUT,
[&versioned_pool]() {
+ return versioned_pool->has_available_process ||
+ versioned_pool->state == PoolState::STOPPED ||
+ versioned_pool->state != PoolState::INITIALIZING;
+ });
+ if (versioned_pool->has_available_process) {
+ return versioned_pool;
}
+ versioned_pool->cv.notify_all();
}
- if (versioned_pool->processes.empty()) {
- return ResultError(Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
- "Failed to initialize Python process pool: all {} process
creation attempts failed",
- max_pool_size));
- }
-
- const auto total_elapsed_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now() -
init_start_time)
- .count();
- LOG(INFO) << "Python process pool initialized for version " <<
version.to_string()
- << ": created " << success_count << " processes"
- << (failure_count > 0 ? fmt::format(" ({} failed)",
failure_count) : "")
- << ", elapsed_ms=" << total_elapsed_ms;
-
- versioned_pool->initialized = true;
- _start_health_check_thread();
-
- return versioned_pool;
+ return ResultError(Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
+ "Failed to initialize Python process pool for version {}: no
process became available "
+ "within {} ms",
+ version.to_string(), PROCESS_POOL_INIT_TIMEOUT.count()));
}
Status PythonServerManager::_get_process(
const PythonVersion& version, const
std::shared_ptr<VersionedProcessPool>& versioned_pool,
ProcessPtr* process) {
- std::lock_guard<std::mutex> lock(versioned_pool->mutex);
- std::vector<ProcessPtr>& pool = versioned_pool->processes;
+ {
+ std::unique_lock<std::mutex> lock(versioned_pool->mutex);
+ std::vector<ProcessPtr>& pool = versioned_pool->processes;
- if (UNLIKELY(pool.empty())) {
- return Status::InternalError("Python process pool is empty for version
{}",
- version.to_string());
- }
+ if (versioned_pool->state == PoolState::STOPPED) {
+ versioned_pool->has_available_process = false;
+ return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
+ "Python process pool has stopped for version {}",
version.to_string());
+ }
- // Prefer an already-alive process and only use load balancing inside that
alive subset.
- // keep dead entries stay in the pool for the background health checker
- // unless there is no alive process left for the current request.
- auto min_alive_iter = std::min_element(pool.begin(), pool.end(),
- [](const ProcessPtr& a, const
ProcessPtr& b) {
- const bool a_alive = a &&
a->is_alive();
- const bool b_alive = b &&
b->is_alive();
- if (a_alive != b_alive) {
- return a_alive > b_alive;
- }
- return a.use_count() <
b.use_count();
- });
-
- if (min_alive_iter != pool.end() && *min_alive_iter &&
(*min_alive_iter)->is_alive()) {
- *process = *min_alive_iter;
- return Status::OK();
- }
+ if (_select_alive_process_from_pool(pool, process)) [[likely]] {
+ versioned_pool->has_available_process = true;
+ return Status::OK();
+ }
+ versioned_pool->has_available_process = false;
+
+ if (versioned_pool->state == PoolState::INITIALIZING) {
+ versioned_pool->cv.wait_for(lock, PROCESS_REPAIR_WAIT_TIMEOUT,
[&versioned_pool]() {
+ return std::any_of(versioned_pool->processes.begin(),
+ versioned_pool->processes.end(),
+ [](const ProcessPtr& p) { return p &&
p->is_alive(); }) ||
+ versioned_pool->state != PoolState::INITIALIZING;
+ });
+ if (versioned_pool->state == PoolState::STOPPED) {
+ return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
+ "Python process pool has stopped for version {}",
version.to_string());
+ }
+ if (_select_alive_process_from_pool(pool, process)) {
+ versioned_pool->has_available_process = true;
+ return Status::OK();
+ }
+ versioned_pool->has_available_process = false;
+ return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
+ "Python process pool is initializing but has no available
process for version "
+ "{} after waiting {} ms",
+ version.to_string(), PROCESS_REPAIR_WAIT_TIMEOUT.count());
+ }
+
+ if (versioned_pool->state != PoolState::INITIALIZED) {
+ return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
+ "Python process pool is not initialized for version {}",
version.to_string());
+ }
+
+ if (!versioned_pool->repairing) {
+ versioned_pool->repairing = true;
+ // Repair is done in the background because fork can be slow. The
current request still
+ // waits briefly below so a transient all-dead pool can recover
without failing.
+ std::thread([version, versioned_pool]() {
+ SCOPED_INIT_THREAD_CONTEXT();
+ int recreated =
PythonServerManager::_repair_process_pool(version, versioned_pool);
+ {
+ std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+ versioned_pool->repairing = false;
+ }
+ versioned_pool->cv.notify_all();
+ if (recreated > 0) {
+ LOG(INFO) << "Repaired Python process pool for version "
<< version.to_string()
+ << ": recreated=" << recreated;
+ }
+ }).detach();
+ }
+
+ // Keep the request recoverable in the common case where the Python
runtime can fork
+ // normally and only the existing pool entries died. The wait is short
so a wedged fork path
+ // still returns SERVICE_UNAVAILABLE promptly.
+ versioned_pool->cv.wait_for(lock, PROCESS_REPAIR_WAIT_TIMEOUT,
[&versioned_pool]() {
+ return std::any_of(versioned_pool->processes.begin(),
versioned_pool->processes.end(),
+ [](const ProcessPtr& p) { return p &&
p->is_alive(); }) ||
+ versioned_pool->state == PoolState::STOPPED;
+ });
+ if (versioned_pool->state == PoolState::STOPPED) {
+ versioned_pool->has_available_process = false;
+ return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
+ "Python process pool has stopped for version {}",
version.to_string());
+ }
- // Only reach here when the pool has no alive process at all. Try one
foreground
- // recovery so the caller has a chance to proceed; leave batch repair to
health check.
- auto& candidate = pool.front();
- ProcessPtr replacement;
- Status status = fork(version, &replacement);
- if (!status.ok()) {
- return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
- "Python process pool has no available process for version {},
reason: {}",
- version.to_string(), status.to_string());
+ if (_select_alive_process_from_pool(pool, process)) {
+ versioned_pool->has_available_process = true;
+ return Status::OK();
+ }
+ versioned_pool->has_available_process = false;
}
- candidate = std::move(replacement);
- *process = candidate;
- return Status::OK();
+ return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
+ "Python process pool has no available process for version {} after
waiting repair for "
+ "{} ms",
+ version.to_string(), PROCESS_REPAIR_WAIT_TIMEOUT.count());
}
Status PythonServerManager::fork(const PythonVersion& version, ProcessPtr*
process) {
@@ -243,13 +356,14 @@ Status PythonServerManager::fork(const PythonVersion&
version, ProcessPtr* proce
}
}));
- // Wait for socket file to be created (indicates server is ready)
- std::string expected_socket_path = get_unix_socket_file_path(c.id());
+ // Bound socket readiness: a child process can start but never create
the Flight socket.
+ // Without this, pool initialization can block until FE reports
send-fragments RPC timeout.
+ pid_t child_pid = c.id();
+ std::string expected_socket_path =
get_unix_socket_file_path(child_pid);
bool started_successfully = false;
std::chrono::steady_clock::time_point start =
std::chrono::steady_clock::now();
- const auto timeout = std::chrono::milliseconds(5000);
- while (std::chrono::steady_clock::now() - start < timeout) {
+ while (std::chrono::steady_clock::now() - start <
PROCESS_START_TIMEOUT) {
struct stat buffer;
if (stat(expected_socket_path.c_str(), &buffer) == 0) {
started_successfully = true;
@@ -259,14 +373,43 @@ Status PythonServerManager::fork(const PythonVersion&
version, ProcessPtr* proce
if (!c.running()) {
break;
}
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
if (!started_successfully) {
+ int exit_status = 0;
if (c.running()) {
- c.terminate();
- c.wait();
+ // Don't use the `wait` of boost::process, but use the
operating system signal and waitpid with timeout instead.
+ // Because boost::process may block the initialization/repair
thread for a long time,
+ // exceeding the timeout limit expected by the process pool.
+ ::kill(child_pid, SIGTERM);
+ auto wait_result = PythonUDFProcess::wait_child_exit(
+ child_pid, PROCESS_TERMINATE_TIMEOUT, &exit_status);
+ if (wait_result ==
PythonUDFProcess::ChildExitWaitResult::TIMEOUT ||
+ wait_result ==
PythonUDFProcess::ChildExitWaitResult::ERROR) {
+ LOG(WARNING) << "Python server start timeout and terminate
timeout exceeded,"
+ << " sending SIGKILL to pid=" << child_pid;
+ ::kill(child_pid, SIGKILL);
+ wait_result = PythonUDFProcess::wait_child_exit(
+ child_pid, PROCESS_TERMINATE_TIMEOUT,
&exit_status);
+ if (wait_result ==
PythonUDFProcess::ChildExitWaitResult::TIMEOUT ||
+ wait_result ==
PythonUDFProcess::ChildExitWaitResult::ERROR) [[unlikely]] {
+ // The child was SIGKILLed but not reaped within the
bounded wait. Do not
+ // drop waitpid ownership after detach; otherwise a
later exit can leave a
+ // zombie Python process under BE.
+ PythonUDFProcess::enqueue_child_for_reap(child_pid);
+ c.detach();
+ return Status::InternalError(
+ "Python server start failed: process did not
exit after SIGKILL, "
+ "pid={}",
+ child_pid);
+ }
+ }
+ } else {
+ PythonUDFProcess::wait_child_exit(child_pid,
std::chrono::milliseconds(0),
+ &exit_status);
}
+ c.detach();
return Status::InternalError("Python server start failed: socket
file not found at {}",
expected_socket_path);
}
@@ -282,7 +425,7 @@ Status PythonServerManager::fork(const PythonVersion&
version, ProcessPtr* proce
void PythonServerManager::_start_health_check_thread() {
std::lock_guard<std::mutex> lock(_health_check_mutex);
- if (_health_check_thread) return;
+ if (_health_check_thread ||
_shutdown_flag.load(std::memory_order_acquire)) return;
LOG(INFO) << "Starting Python process health check thread (interval: 30
seconds)";
@@ -308,72 +451,181 @@ void PythonServerManager::_start_health_check_thread() {
}
void PythonServerManager::_check_and_recreate_processes() {
- int total_checked = 0;
- int total_dead = 0;
int total_recreated = 0;
-
for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
+ {
+ std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+ if (versioned_pool->state != PoolState::INITIALIZED ||
versioned_pool->repairing) {
+ continue;
+ }
+ // Share the same repair guard with foreground requests. Otherwise
health check and
+ // _get_process() can fork the same empty/dead slots at the same
time under failures.
+ versioned_pool->repairing = true;
+ }
+ int recreated = _repair_process_pool(version, versioned_pool);
+ {
+ std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+ versioned_pool->repairing = false;
+ }
+ versioned_pool->cv.notify_all();
+ total_recreated += recreated;
+ }
+
+ if (total_recreated > 0) {
+ LOG(INFO) << "Health check completed: recreated=" << total_recreated;
+ }
+}
+
+int PythonServerManager::_repair_process_pool(
+ const PythonVersion& version, const
std::shared_ptr<VersionedProcessPool>& versioned_pool) {
+ const int max_pool_size = config::max_python_process_num > 0 ?
config::max_python_process_num
+ :
CpuInfo::num_cores();
+ std::vector<size_t> died_process_indices;
+ {
std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+ if (versioned_pool->state != PoolState::INITIALIZED) {
+ return 0;
+ }
+
auto& pool = versioned_pool->processes;
+ died_process_indices.reserve(std::max<size_t>(pool.size(),
max_pool_size));
+ // Need to fix the following two cases
+ // 1. Existing processes have died
+ // 2. Process pool not filled due to fork timeouts and other issues
for (size_t i = 0; i < pool.size(); ++i) {
- auto& process = pool[i];
- if (!process) continue;
-
- total_checked++;
- if (!process->is_alive()) {
- total_dead++;
- LOG(WARNING) << "Detected dead Python process (pid=" <<
process->get_child_pid()
- << ", version=" << version.to_string() << "),
recreating...";
-
- ProcessPtr new_process;
- Status s = fork(version, &new_process);
- if (s.ok()) {
- pool[i] = std::move(new_process);
- total_recreated++;
- LOG(INFO) << "Successfully recreated Python process for
version "
- << version.to_string();
+ const auto& process = pool[i];
+ if (!process || !process->is_alive()) {
+ died_process_indices.push_back(i);
+ }
+ }
+ for (size_t i = pool.size(); i < static_cast<size_t>(max_pool_size);
++i) {
+ died_process_indices.push_back(i);
+ }
+ }
+
+ if (died_process_indices.empty()) [[likely]] {
+ return 0;
+ }
+
+ int recreated = 0;
+ std::vector<ProcessPtr> processes_to_shutdown;
+ for (size_t index : died_process_indices) {
+ {
+ std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+ if (versioned_pool->state != PoolState::INITIALIZED) {
+ break;
+ }
+ }
+
+ ProcessPtr new_process;
+ Status status = fork(version, &new_process);
+ if (status.ok() && new_process) {
+ bool published = false;
+ {
+ std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+ auto& pool = versioned_pool->processes;
+ if (versioned_pool->state != PoolState::INITIALIZED) {
+ processes_to_shutdown.emplace_back(std::move(new_process));
+ } else if (index < pool.size()) {
+ if (!pool[index] || !pool[index]->is_alive()) [[likely]] {
+ pool[index] = std::move(new_process);
+ versioned_pool->has_available_process = true;
+ recreated++;
+ published = true;
+ } else {
+
processes_to_shutdown.emplace_back(std::move(new_process));
+ }
+ } else if (pool.size() < static_cast<size_t>(max_pool_size)) {
+ pool.emplace_back(std::move(new_process));
+ versioned_pool->has_available_process = true;
+ recreated++;
+ published = true;
} else {
- LOG(ERROR) << "Failed to recreate Python process for
version "
- << version.to_string() << ": " << s.to_string();
- pool.erase(pool.begin() + i);
- --i;
+ processes_to_shutdown.emplace_back(std::move(new_process));
}
}
+ if (published) {
+ versioned_pool->cv.notify_all();
+ }
+ } else {
+ LOG(ERROR) << "Failed to recreate Python process for version " <<
version.to_string()
+ << ": " << status.to_string();
}
}
- if (total_dead > 0) {
- LOG(INFO) << "Health check completed: checked=" << total_checked << ",
dead=" << total_dead
- << ", recreated=" << total_recreated;
+ {
+ std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+ auto& pool = versioned_pool->processes;
+
+ if (versioned_pool->state != PoolState::INITIALIZED) {
+ versioned_pool->has_available_process = false;
+ } else {
+ // Keep empty/dead slots instead of shrinking the vector. Init
workers are detached and
+ // publish by original slot index; shrinking here would make a
late successful init look
+ // out-of-range and discard a usable process.
+ versioned_pool->has_available_process = std::any_of(
+ pool.begin(), pool.end(),
+ [](const ProcessPtr& process) { return process &&
process->is_alive(); });
+ }
+ }
+ versioned_pool->cv.notify_all();
+ for (auto& process : processes_to_shutdown) {
+ process->shutdown();
}
+ return recreated;
}
void PythonServerManager::shutdown() {
- // Signal health check thread to stop
- _shutdown_flag.store(true, std::memory_order_release);
- _health_check_cv.notify_one();
+ std::vector<std::pair<PythonVersion,
std::shared_ptr<VersionedProcessPool>>> pools_to_shutdown;
+ {
+ std::lock_guard<std::mutex> lock(_pools_mutex);
+ _shutdown_flag.store(true, std::memory_order_release);
+ pools_to_shutdown.reserve(_process_pools.size());
+ for (auto& [version, versioned_pool] : _process_pools) {
+ pools_to_shutdown.emplace_back(version, std::move(versioned_pool));
+ }
+ _process_pools.clear();
+ }
- if (_health_check_thread && _health_check_thread->joinable()) {
- _health_check_thread->join();
- _health_check_thread.reset();
+ for (auto& [version, versioned_pool] : pools_to_shutdown) {
+ if (!versioned_pool) {
+ continue;
+ }
+ std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+ versioned_pool->state = PoolState::STOPPED;
+ versioned_pool->has_available_process = false;
+ versioned_pool->repairing = false;
+ versioned_pool->cv.notify_all();
+ }
+
+ std::unique_ptr<std::thread> health_check_thread;
+ {
+ std::lock_guard<std::mutex> lock(_health_check_mutex);
+ health_check_thread = std::move(_health_check_thread);
+ }
+ _health_check_cv.notify_one();
+ if (health_check_thread && health_check_thread->joinable()) {
+ health_check_thread->join();
}
// Shutdown all processes
- for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
+ std::vector<ProcessPtr> processes_to_shutdown;
+ for (auto& [version, versioned_pool] : pools_to_shutdown) {
+ if (!versioned_pool) {
+ continue;
+ }
std::lock_guard<std::mutex> lock(versioned_pool->mutex);
auto& pool = versioned_pool->processes;
for (auto& process : pool) {
if (process) {
- process->shutdown();
+ processes_to_shutdown.emplace_back(std::move(process));
}
}
pool.clear();
- versioned_pool->initialized = false;
+ versioned_pool->cv.notify_all();
}
-
- {
- std::lock_guard<std::mutex> lock(_pools_mutex);
- _process_pools.clear();
+ for (auto& process : processes_to_shutdown) {
+ process->shutdown();
}
}
diff --git a/be/src/udf/python/python_server.h
b/be/src/udf/python/python_server.h
index 78cef72e06d..5ee763d5f52 100644
--- a/be/src/udf/python/python_server.h
+++ b/be/src/udf/python/python_server.h
@@ -18,6 +18,7 @@
#pragma once
#include <atomic>
+#include <chrono>
#include <condition_variable>
#include <memory>
#include <mutex>
@@ -47,7 +48,7 @@ public:
std::shared_ptr<T>* client,
const std::shared_ptr<arrow::Schema>& data_schema =
nullptr);
- Status fork(const PythonVersion& version, ProcessPtr* process);
+ static Status fork(const PythonVersion& version, ProcessPtr* process);
// Clear Python module cache for a specific UDF location across all
processes
Status clear_module_cache(const std::string& location);
@@ -64,7 +65,11 @@ public:
void set_process_pool_for_test(const PythonVersion& version,
std::vector<ProcessPtr> processes,
bool initialized = true);
- std::vector<ProcessPtr>& process_pool_for_test(const PythonVersion&
version);
+ std::vector<ProcessPtr> process_pool_snapshot_for_test(const
PythonVersion& version);
+
+ bool process_pool_is_initializing_for_test(const PythonVersion& version);
+
+ bool process_pool_is_initialized_for_test(const PythonVersion& version);
Status broadcast_action_to_processes_for_test(const std::string&
action_type,
const std::string& body,
@@ -74,10 +79,28 @@ public:
#endif
private:
+ enum class PoolState {
+ UNINITIALIZED,
+ INITIALIZING,
+ // `Initialized` means:
+ // 1. All process slots have attempted initialization.
+ // 2. At least one live process is available for requests.
+ // 3. The health-check thread has been started.
+ INITIALIZED,
+ // Prevent the pool from being incorrectly reused after `shutdown()`
+ STOPPED,
+ };
+
struct VersionedProcessPool {
std::mutex mutex;
+ // Coordinates initialization and repair workers with foreground
requests.
+ std::condition_variable cv;
std::vector<ProcessPtr> processes;
- bool initialized = false;
+ PoolState state = PoolState::UNINITIALIZED;
+ // True when at least one process in the pool can serve requests.
+ bool has_available_process = false;
+ // True while a background repair is recreating dead or missing
processes.
+ bool repairing = false;
};
/**
@@ -104,6 +127,22 @@ private:
*/
void _check_and_recreate_processes();
+#ifdef BE_TEST
+ static constexpr std::chrono::milliseconds PROCESS_START_TIMEOUT {500};
+ static constexpr std::chrono::milliseconds PROCESS_TERMINATE_TIMEOUT {100};
+ static constexpr std::chrono::milliseconds PROCESS_POOL_INIT_TIMEOUT
{1000};
+ static constexpr std::chrono::milliseconds PROCESS_REPAIR_WAIT_TIMEOUT
{200};
+#else
+ static constexpr std::chrono::milliseconds PROCESS_START_TIMEOUT {5000};
+ static constexpr std::chrono::milliseconds PROCESS_TERMINATE_TIMEOUT
{1000};
+ // FE's default send-fragments RPC timeout is 30s. Keep BE's Python pool
wait below it so the
+ // caller sees SERVICE_UNAVAILABLE with Python context instead of a
generic RPC deadline error.
+ static constexpr std::chrono::milliseconds PROCESS_POOL_INIT_TIMEOUT
{20000};
+ static constexpr std::chrono::milliseconds PROCESS_REPAIR_WAIT_TIMEOUT
{1000};
+#endif
+ static int _repair_process_pool(const PythonVersion& version,
+ const
std::shared_ptr<VersionedProcessPool>& versioned_pool);
+
/**
* Read resident set size (RSS) for a single process from /proc/{pid}/statm
*/
@@ -114,11 +153,14 @@ private:
*/
void _refresh_memory_stats();
- std::shared_ptr<VersionedProcessPool> _get_or_create_process_pool(const
PythonVersion& version);
+ Result<std::shared_ptr<VersionedProcessPool>> _get_or_create_process_pool(
+ const PythonVersion& version);
std::vector<std::pair<PythonVersion,
std::shared_ptr<VersionedProcessPool>>>
_snapshot_process_pools();
Status _broadcast_action_to_processes(const std::string& action_type,
const std::string& body,
const std::string& log_name);
+ static bool _select_alive_process_from_pool(const std::vector<ProcessPtr>&
pool,
+ ProcessPtr* process);
std::unordered_map<PythonVersion, std::shared_ptr<VersionedProcessPool>>
_process_pools;
// Protects the version -> pool handle map only. Per-version process
operations are guarded
diff --git a/be/src/udf/python/python_udf_runtime.cpp
b/be/src/udf/python/python_udf_runtime.cpp
index 9d687c43f40..aa87360cbc3 100644
--- a/be/src/udf/python/python_udf_runtime.cpp
+++ b/be/src/udf/python/python_udf_runtime.cpp
@@ -18,15 +18,177 @@
#include "udf/python/python_udf_runtime.h"
#include <butil/fd_utility.h>
+#include <signal.h>
+#include <string.h>
#include <sys/wait.h>
#include <unistd.h>
+#include <algorithm>
#include <boost/process.hpp>
+#include <cerrno>
+#include <chrono>
+#include <condition_variable>
+#include <deque>
+#include <mutex>
+#include <thread>
+#ifdef BE_TEST
+#include <atomic>
+#endif
#include "common/logging.h"
+#include "runtime/thread_context.h"
namespace doris {
+#ifdef BE_TEST
+static constexpr std::chrono::milliseconds PROCESS_TERMINATE_TIMEOUT {100};
+#else
+static constexpr std::chrono::milliseconds PROCESS_TERMINATE_TIMEOUT {1000};
+#endif
+static constexpr std::chrono::milliseconds BACKGROUND_REAP_INTERVAL {1000};
+
+#ifdef BE_TEST
+static std::atomic<int> FORCED_CHILD_EXIT_TIMEOUTS {0};
+
+static bool consume_forced_child_exit_timeout() {
+ int remaining = FORCED_CHILD_EXIT_TIMEOUTS.load(std::memory_order_relaxed);
+ while (remaining > 0) {
+ if (FORCED_CHILD_EXIT_TIMEOUTS.compare_exchange_weak(remaining,
remaining - 1,
+
std::memory_order_relaxed)) {
+ return true;
+ }
+ }
+ return false;
+}
+#endif
+
+struct BackgroundChildReaper {
+ std::mutex mutex;
+ std::condition_variable cv;
+ std::deque<pid_t> pids;
+#ifdef BE_TEST
+ std::deque<pid_t> reaped_pids;
+#endif
+ std::thread thread;
+};
+
+static BackgroundChildReaper& background_child_reaper() {
+ static auto* reaper = new BackgroundChildReaper();
+ return *reaper;
+}
+
+void PythonUDFProcess::enqueue_child_for_reap(pid_t pid) {
+ if (pid <= 0) [[unlikely]] {
+ return;
+ }
+
+ auto& reaper = background_child_reaper();
+ {
+ std::lock_guard<std::mutex> lock(reaper.mutex);
+ if (std::find(reaper.pids.begin(), reaper.pids.end(), pid) !=
reaper.pids.end()) {
+ return;
+ }
+ reaper.pids.push_back(pid);
+ if (!reaper.thread.joinable()) {
+ // This thread only owns pids that were already SIGKILLed but
could not be reaped within
+ // the bounded shutdown wait. Such processes may be stuck in
uninterruptible I/O; if they
+ // exit later and nobody calls waitpid(), they stay as zombies
under BE. Keep reaping them
+ // asynchronously so foreground shutdown remains bounded without
dropping wait ownership.
+ reaper.thread = std::thread([]() {
+ SCOPED_INIT_THREAD_CONTEXT();
+ std::deque<pid_t> pending_pids;
+ while (true) {
+ auto& reaper_ref = background_child_reaper();
+ std::unique_lock<std::mutex> lock(reaper_ref.mutex);
+ if (pending_pids.empty()) {
+ reaper_ref.cv.wait(lock,
+ [&reaper_ref]() { return
!reaper_ref.pids.empty(); });
+ } else {
+ reaper_ref.cv.wait_for(lock, BACKGROUND_REAP_INTERVAL);
+ }
+ pending_pids.insert(pending_pids.end(),
reaper_ref.pids.begin(),
+ reaper_ref.pids.end());
+ reaper_ref.pids.clear();
+ std::deque<pid_t> pids;
+ pids.swap(pending_pids);
+ lock.unlock();
+
+ for (pid_t pending_pid : pids) {
+ int exit_status = 0;
+ auto wait_result = PythonUDFProcess::wait_child_exit(
+ pending_pid, std::chrono::milliseconds(0),
&exit_status);
+ if (wait_result == ChildExitWaitResult::EXITED ||
+ wait_result ==
ChildExitWaitResult::ALREADY_REAPED) {
+ LOG(INFO) << "Background reaped Python process
pid=" << pending_pid;
+#ifdef BE_TEST
+ {
+ std::lock_guard<std::mutex>
reaped_lock(reaper_ref.mutex);
+ reaper_ref.reaped_pids.push_back(pending_pid);
+ }
+ reaper_ref.cv.notify_all();
+#endif
+ } else if (wait_result ==
ChildExitWaitResult::TIMEOUT) {
+ pending_pids.push_back(pending_pid);
+ } else {
+ LOG(WARNING) << "Background failed to reap Python
process pid="
+ << pending_pid;
+ }
+ }
+ }
+ });
+ }
+ }
+ reaper.cv.notify_one();
+}
+
+#ifdef BE_TEST
+bool PythonUDFProcess::wait_background_reaped_for_test(pid_t pid,
+
std::chrono::milliseconds timeout) {
+ auto& reaper = background_child_reaper();
+ std::unique_lock<std::mutex> lock(reaper.mutex);
+ return reaper.cv.wait_for(lock, timeout, [&reaper, pid]() {
+ return std::find(reaper.reaped_pids.begin(), reaper.reaped_pids.end(),
pid) !=
+ reaper.reaped_pids.end();
+ });
+}
+
+void PythonUDFProcess::force_child_exit_timeouts_for_test(int count) {
+ FORCED_CHILD_EXIT_TIMEOUTS.store(count, std::memory_order_relaxed);
+}
+#endif
+
+PythonUDFProcess::ChildExitWaitResult PythonUDFProcess::wait_child_exit(
+ pid_t pid, std::chrono::milliseconds timeout, int* exit_status) {
+#ifdef BE_TEST
+ if (consume_forced_child_exit_timeout()) {
+ return ChildExitWaitResult::TIMEOUT;
+ }
+#endif
+ const auto deadline = std::chrono::steady_clock::now() + timeout;
+ while (true) {
+ pid_t ret = waitpid(pid, exit_status, WNOHANG);
+ if (ret == pid) {
+ return ChildExitWaitResult::EXITED;
+ }
+ if (ret < 0) {
+ if (errno == EINTR) {
+ // retry if interrupted
+ continue;
+ }
+ // Another owner may already have observed the child exit through
boost::process.
+ if (errno == ECHILD) {
+ return ChildExitWaitResult::ALREADY_REAPED;
+ }
+ LOG(WARNING) << "Failed to wait Python process pid=" << pid << ":
" << strerror(errno);
+ return ChildExitWaitResult::ERROR;
+ }
+ if (std::chrono::steady_clock::now() >= deadline) {
+ return ChildExitWaitResult::TIMEOUT;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+}
+
void PythonUDFProcess::remove_unix_socket() {
if (_uri.empty() || _unix_socket_file_path.empty()) return;
@@ -47,29 +209,51 @@ void PythonUDFProcess::remove_unix_socket() {
void PythonUDFProcess::shutdown() {
if (!_child.valid() || _is_shutdown) return;
- _child.terminate();
- bool graceful = false;
- constexpr std::chrono::milliseconds retry_interval(100); // 100ms
-
- for (int i = 0; i < TERMINATE_RETRY_TIMES; ++i) {
- if (!_child.running()) {
- graceful = true;
- break;
- }
- std::this_thread::sleep_for(retry_interval);
+ int exit_status = 0;
+ bool exited = !_child.running();
+ bool status_available = false;
+ bool already_reaped = false;
+ if (!exited) {
+ ::kill(_child_pid, SIGTERM);
+ auto wait_result = wait_child_exit(_child_pid,
PROCESS_TERMINATE_TIMEOUT, &exit_status);
+ exited = wait_result == ChildExitWaitResult::EXITED ||
+ wait_result == ChildExitWaitResult::ALREADY_REAPED;
+ status_available = wait_result == ChildExitWaitResult::EXITED;
+ already_reaped = wait_result == ChildExitWaitResult::ALREADY_REAPED;
+ } else {
+ auto wait_result = wait_child_exit(_child_pid,
std::chrono::milliseconds(0), &exit_status);
+ status_available = wait_result == ChildExitWaitResult::EXITED;
+ already_reaped = wait_result == ChildExitWaitResult::ALREADY_REAPED;
}
- if (!graceful) {
- LOG(WARNING) << "Python process did not terminate gracefully, sending
SIGKILL";
+ if (!exited) {
+ LOG(WARNING) << "Python process did not terminate gracefully, sending
SIGKILL, pid="
+ << _child_pid;
::kill(_child_pid, SIGKILL);
- _child.wait();
+ auto wait_result = wait_child_exit(_child_pid,
PROCESS_TERMINATE_TIMEOUT, &exit_status);
+ exited = wait_result == ChildExitWaitResult::EXITED ||
+ wait_result == ChildExitWaitResult::ALREADY_REAPED;
+ status_available = wait_result == ChildExitWaitResult::EXITED;
+ already_reaped = wait_result == ChildExitWaitResult::ALREADY_REAPED;
}
+ _child.detach();
- if (int exit_code = _child.exit_code(); exit_code > 128 && exit_code <=
255) {
- int signal = exit_code - 128;
- LOG(INFO) << "Python process was killed by signal " << signal;
+ if (!exited) [[unlikely]] {
+ LOG(WARNING) << "Python process did not exit after SIGKILL, enqueue
background reap, pid="
+ << _child_pid;
+ enqueue_child_for_reap(_child_pid);
+ } else if (already_reaped) {
+ LOG(INFO) << "Python process already reaped by another owner, pid=" <<
_child_pid;
+ } else if (!status_available) {
+ LOG(INFO) << "Python process exited but exit status is unavailable,
pid=" << _child_pid;
} else {
- LOG(INFO) << "Python process exited normally with code: " << exit_code;
+ if (WIFSIGNALED(exit_status)) {
+ LOG(INFO) << "Python process was killed by signal " <<
WTERMSIG(exit_status);
+ } else if (WIFEXITED(exit_status)) {
+ LOG(INFO) << "Python process exited normally with code: " <<
WEXITSTATUS(exit_status);
+ } else {
+ LOG(INFO) << "Python process exited";
+ }
}
_output_stream.close();
@@ -84,4 +268,4 @@ std::string PythonUDFProcess::to_string() const {
_child_pid, _uri, _unix_socket_file_path, _is_shutdown);
}
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/udf/python/python_udf_runtime.h
b/be/src/udf/python/python_udf_runtime.h
index 269b6d1a98e..2d635dc11a1 100644
--- a/be/src/udf/python/python_udf_runtime.h
+++ b/be/src/udf/python/python_udf_runtime.h
@@ -18,6 +18,7 @@
#pragma once
#include <boost/process.hpp>
+#include <chrono>
#include "udf/python/python_env.h"
@@ -75,6 +76,15 @@ public:
void shutdown();
+ enum class ChildExitWaitResult { EXITED, ALREADY_REAPED, TIMEOUT, ERROR };
+
+ static ChildExitWaitResult wait_child_exit(pid_t pid,
std::chrono::milliseconds timeout,
+ int* exit_status);
+
+ // Hand off a killed child that could not be reaped synchronously. The
background reaper keeps
+ // waitpid ownership so a later child exit will not become a zombie under
BE.
+ static void enqueue_child_for_reap(pid_t pid);
+
std::string to_string() const;
pid_t get_child_pid() const { return _child_pid; }
@@ -85,10 +95,13 @@ public:
#ifdef BE_TEST
void set_uri_for_test(std::string uri) { _uri = std::move(uri); }
+
+ static bool wait_background_reaped_for_test(pid_t pid,
std::chrono::milliseconds timeout);
+
+ static void force_child_exit_timeouts_for_test(int count);
#endif
private:
- constexpr static int TERMINATE_RETRY_TIMES = 10;
constexpr static size_t MAX_ACCUMULATED_LOG_SIZE = 65536;
bool _is_shutdown {false};
diff --git a/be/test/udf/python/python_server_test.cpp
b/be/test/udf/python/python_server_test.cpp
index f21e52e0730..9091d04971f 100644
--- a/be/test/udf/python/python_server_test.cpp
+++ b/be/test/udf/python/python_server_test.cpp
@@ -22,8 +22,10 @@
#include <sys/un.h>
#include <boost/process.hpp>
+#include <chrono>
#include <filesystem>
#include <fstream>
+#include <future>
#include <string>
#include <vector>
@@ -127,6 +129,55 @@ protected:
return python_path;
}
+ std::string create_fake_python_without_socket_creation(const std::string&
binary_name,
+ const std::string&
version) {
+ std::string bin_dir = test_dir_ + "/bin";
+ std::string python_path = bin_dir + "/" + binary_name;
+ fs::create_directories(bin_dir);
+
+ std::ofstream ofs(python_path);
+ ofs << "#!/bin/bash\n";
+ ofs << "if [ \"$1\" = \"--version\" ]; then\n";
+ ofs << " echo 'Python " << version << "'\n";
+ ofs << " exit 0\n";
+ ofs << "fi\n";
+ ofs << "trap '' TERM\n";
+ ofs << "while true; do sleep 1; done\n";
+ ofs.close();
+ fs::permissions(python_path, fs::perms::owner_all);
+
+ return python_path;
+ }
+
+ std::string create_fake_python_with_one_stuck_and_others_socket(const
std::string& binary_name,
+ const
std::string& version) {
+ std::string bin_dir = test_dir_ + "/bin";
+ std::string python_path = bin_dir + "/" + binary_name;
+ std::string first_start_dir = test_dir_ + "/first_python_start";
+ fs::create_directories(bin_dir);
+
+ std::ofstream ofs(python_path);
+ ofs << "#!/bin/bash\n";
+ ofs << "if [ \"$1\" = \"--version\" ]; then\n";
+ ofs << " echo 'Python " << version << "'\n";
+ ofs << " exit 0\n";
+ ofs << "fi\n";
+ ofs << "if mkdir \"" << first_start_dir << "\" 2>/dev/null; then\n";
+ ofs << " trap '' TERM\n";
+ ofs << " while true; do sleep 1; done\n";
+ ofs << "fi\n";
+ ofs << "SOCKET_PREFIX=\"$3\"\n";
+ ofs << "SOCKET_BASE=\"${SOCKET_PREFIX#grpc+unix://}\"\n";
+ ofs << "SOCKET_FILE=\"${SOCKET_BASE}_$$.sock\"\n";
+ ofs << "touch \"$SOCKET_FILE\"\n";
+ ofs << "trap 'rm -f \"$SOCKET_FILE\"; exit 0' TERM INT\n";
+ ofs << "while true; do sleep 1; done\n";
+ ofs.close();
+ fs::permissions(python_path, fs::perms::owner_all);
+
+ return python_path;
+ }
+
// Set DORIS_HOME and create flight server script directory
void setup_doris_home() {
setenv("DORIS_HOME", test_dir_.c_str(), 1);
@@ -162,10 +213,14 @@ TEST_F(PythonServerTest, SingletonReturnsSameInstance) {
// PythonServerManager::_get_process() - process retrieval test
// ============================================================================
-TEST_F(PythonServerTest, GetProcessFromEmptyPoolReturnsError) {
+TEST_F(PythonServerTest, EnsurePoolInitializedCanInitializeEmptyPoolForTest) {
PythonServerManager mgr;
- PythonVersion version("3.9.16", "/fake/path", "/fake/python");
+ setup_doris_home();
+ std::string python_path =
create_fake_python_with_socket_creation("3.9.16");
+ PythonVersion version("3.9.16", test_dir_, python_path);
+ config::max_python_process_num = 1;
+
mgr.set_process_pool_for_test(version, {});
auto pool_result = mgr._ensure_pool_initialized(version);
ASSERT_TRUE(pool_result.has_value()) << pool_result.error().to_string();
@@ -173,10 +228,11 @@ TEST_F(PythonServerTest,
GetProcessFromEmptyPoolReturnsError) {
ProcessPtr process;
Status status = mgr._get_process(version, pool_result.value(), &process);
- // Verify: empty pool should return an error before touching process slots.
- EXPECT_FALSE(status.ok());
- EXPECT_TRUE(status.to_string().find("pool is empty") != std::string::npos);
- EXPECT_EQ(process, nullptr);
+ EXPECT_TRUE(status.ok()) << status.to_string();
+ ASSERT_NE(process, nullptr);
+ EXPECT_TRUE(process->is_alive());
+
+ mgr.shutdown();
}
// ============================================================================
@@ -260,12 +316,61 @@ TEST_F(PythonServerTest,
ForkWithProcessThatExitsImmediatelyReturnsError) {
err_msg.find("start") != std::string::npos);
}
+TEST_F(PythonServerTest,
ForkWithoutSocketCreationReturnsAfterBoundedTerminate) {
+ setup_doris_home();
+ std::string python_path =
+
create_fake_python_without_socket_creation("python3.no_socket_direct",
"3.9.16");
+
+ PythonServerManager mgr;
+ PythonVersion version("3.9.16", test_dir_, python_path);
+
+ auto start = std::chrono::steady_clock::now();
+ ProcessPtr process;
+ Status status = mgr.fork(version, &process);
+ auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now() - start);
+
+ EXPECT_FALSE(status.ok());
+ EXPECT_EQ(process, nullptr);
+ EXPECT_NE(status.to_string().find("socket file not found"),
std::string::npos);
+ EXPECT_LT(elapsed.count(), 2000);
+}
+
+TEST_F(PythonServerTest,
ForkEnqueuesBackgroundReapWhenKilledStartFailureIsNotReaped) {
+ setup_doris_home();
+ std::string python_path =
+
create_fake_python_without_socket_creation("python3.no_socket_reap", "3.9.16");
+
+ PythonServerManager mgr;
+ PythonVersion version("3.9.16", test_dir_, python_path);
+
+ // SIGKILL not becoming reapable inside the bounded wait depends on kernel
state. Force only the
+ // wait results so this test covers PythonServerManager::fork() handing
waitpid ownership to the
+ // shared background reaper instead of detaching and losing the pid.
+ PythonUDFProcess::force_child_exit_timeouts_for_test(2);
+ ProcessPtr process;
+ Status status = mgr.fork(version, &process);
+ PythonUDFProcess::force_child_exit_timeouts_for_test(0);
+
+ EXPECT_FALSE(status.ok());
+ EXPECT_EQ(process, nullptr);
+ EXPECT_NE(status.to_string().find("process did not exit after SIGKILL"),
std::string::npos);
+
+ std::string status_text = status.to_string();
+ size_t pid_pos = status_text.find("pid=");
+ ASSERT_NE(pid_pos, std::string::npos) << status_text;
+ pid_t child_pid = static_cast<pid_t>(std::stol(status_text.substr(pid_pos
+ 4)));
+ EXPECT_TRUE(PythonUDFProcess::wait_background_reaped_for_test(child_pid,
+
std::chrono::milliseconds(5000)));
+}
+
// ============================================================================
// PythonServerManager::_ensure_pool_initialized() - pool initialization test
// ============================================================================
TEST_F(PythonServerTest, EnsurePoolInitializedWithInvalidVersionFails) {
PythonServerManager mgr;
+ config::max_python_process_num = 1;
PythonVersion invalid_version("3.99.99", "/non/existent/path",
"/non/existent/python");
@@ -273,9 +378,39 @@ TEST_F(PythonServerTest,
EnsurePoolInitializedWithInvalidVersionFails) {
// Verify: invalid version should cause initialization to fail
EXPECT_FALSE(result.has_value());
- // Error message should indicate all process creations failed
+ // Error message should indicate process creation failure or bounded
initialization timeout.
EXPECT_TRUE(result.error().to_string().find("Failed") != std::string::npos
||
- result.error().to_string().find("failed") !=
std::string::npos);
+ result.error().to_string().find("failed") != std::string::npos
||
+ result.error().to_string().find("Timed out") !=
std::string::npos);
+}
+
+TEST_F(PythonServerTest,
EnsurePoolInitializedReturnsImmediatelyWhenAllWorkersFail) {
+ PythonServerManager mgr;
+ config::max_python_process_num = 2;
+
+ PythonVersion invalid_version("3.9.16", test_dir_, test_dir_ +
"/missing_python");
+
+ auto start = std::chrono::steady_clock::now();
+ auto result = mgr._ensure_pool_initialized(invalid_version);
+ auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now() - start);
+
+ EXPECT_FALSE(result.has_value());
+ EXPECT_LT(elapsed.count(), 500);
+}
+
+TEST_F(PythonServerTest,
EnsurePoolInitializedAfterShutdownReturnsServiceUnavailable) {
+ PythonServerManager mgr;
+ mgr.shutdown();
+
+ setup_doris_home();
+ std::string python_path =
create_fake_python_with_socket_creation("3.9.16");
+ PythonVersion version("3.9.16", test_dir_, python_path);
+
+ auto result = mgr._ensure_pool_initialized(version);
+
+ EXPECT_FALSE(result.has_value());
+ EXPECT_NE(result.error().to_string().find("shutting down"),
std::string::npos);
}
// ============================================================================
@@ -302,6 +437,7 @@ TEST_F(PythonServerTest,
ShutdownCalledMultipleTimesDoesNotCrash) {
TEST_F(PythonServerTest, ShutdownAfterFailedInitializationDoesNotCrash) {
PythonServerManager mgr;
+ config::max_python_process_num = 1;
// Try initialization first (expected to fail)
PythonVersion invalid_version("3.99.99", "/bad/path", "/bad/python");
@@ -312,6 +448,27 @@ TEST_F(PythonServerTest,
ShutdownAfterFailedInitializationDoesNotCrash) {
EXPECT_NO_THROW(mgr.shutdown());
}
+TEST_F(PythonServerTest, GetProcessFromStoppedPoolReturnsUnavailable) {
+ setup_doris_home();
+ std::string python_path =
create_fake_python_with_socket_creation("3.9.16");
+
+ config::max_python_process_num = 1;
+
+ PythonServerManager mgr;
+ PythonVersion version("3.9.16", test_dir_, python_path);
+ auto pool_result = mgr._ensure_pool_initialized(version);
+ ASSERT_TRUE(pool_result.has_value()) << pool_result.error().to_string();
+
+ mgr.shutdown();
+
+ ProcessPtr process;
+ Status status = mgr._get_process(version, pool_result.value(), &process);
+
+ EXPECT_FALSE(status.ok());
+ EXPECT_EQ(process, nullptr);
+ EXPECT_NE(status.to_string().find("stopped"), std::string::npos);
+}
+
TEST_F(PythonServerTest, ClearUdafStateCacheWithoutProcessesIsNoOp) {
PythonServerManager mgr;
@@ -351,6 +508,7 @@ TEST_F(PythonServerTest,
BroadcastActionWithInvalidProcessUriReturnsError) {
TEST_F(PythonServerTest, GetClientWithInvalidVersionFails) {
PythonServerManager mgr;
+ config::max_python_process_num = 1;
PythonVersion invalid_version("3.9.16", "/invalid/path",
"/invalid/python");
PythonUDFMeta meta;
@@ -447,7 +605,7 @@ TEST_F(PythonServerTest, EnsurePoolInitializedSuccess) {
TEST_F(PythonServerTest,
EnsurePoolInitializedLogsProgressWhileWaitingForSlowProcess) {
setup_doris_home();
std::string python_path =
-
create_fake_python_with_delay_and_socket_creation("python3.delayed", "3.9.16",
200);
+
create_fake_python_with_delay_and_socket_creation("python3.delayed", "3.9.16",
50);
config::max_python_process_num = 1;
@@ -461,6 +619,67 @@ TEST_F(PythonServerTest,
EnsurePoolInitializedLogsProgressWhileWaitingForSlowPro
mgr.shutdown();
}
+TEST_F(PythonServerTest,
EnsurePoolInitializedRetriesAfterInitFailureWithBoundedWait) {
+ setup_doris_home();
+ std::string python_path =
+ create_fake_python_without_socket_creation("python3.no_socket",
"3.9.16");
+
+ config::max_python_process_num = 1;
+
+ PythonServerManager mgr;
+ PythonVersion version("3.9.16", test_dir_, python_path);
+
+ auto start = std::chrono::steady_clock::now();
+ auto result = mgr._ensure_pool_initialized(version);
+ auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now() - start);
+
+ EXPECT_FALSE(result.has_value());
+ EXPECT_LT(elapsed.count(), 2000);
+
+ start = std::chrono::steady_clock::now();
+ auto retry_result = mgr._ensure_pool_initialized(version);
+ elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now() - start);
+
+ EXPECT_FALSE(retry_result.has_value());
+ EXPECT_LT(elapsed.count(), 2000);
+
+ mgr.shutdown();
+}
+
+TEST_F(PythonServerTest,
EnsurePoolInitializedSucceedsWithOneStuckWorkerAndOneUsableWorker) {
+ setup_doris_home();
+ std::string python_path =
+
create_fake_python_with_one_stuck_and_others_socket("python3.mixed", "3.9.16");
+
+ config::max_python_process_num = 2;
+
+ PythonServerManager mgr;
+ PythonVersion version("3.9.16", test_dir_, python_path);
+
+ auto start = std::chrono::steady_clock::now();
+ auto result = mgr._ensure_pool_initialized(version);
+ auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now() - start);
+
+ ASSERT_TRUE(result.has_value()) << result.error().to_string();
+ EXPECT_LT(elapsed.count(), 2000);
+ EXPECT_TRUE(mgr.process_pool_is_initializing_for_test(version));
+
+ ProcessPtr process;
+ EXPECT_TRUE(mgr._get_process(version, result.value(), &process).ok());
+ ASSERT_NE(process, nullptr);
+ EXPECT_TRUE(process->is_alive());
+
+ for (int i = 0; i < 20 &&
!mgr.process_pool_is_initialized_for_test(version); ++i) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+ EXPECT_TRUE(mgr.process_pool_is_initialized_for_test(version));
+
+ mgr.shutdown();
+}
+
TEST_F(PythonServerTest, EnsurePoolInitializedIdempotent) {
setup_doris_home();
std::string python_path =
create_fake_python_with_socket_creation("3.9.16");
@@ -541,6 +760,8 @@ TEST_F(PythonServerTest,
GetProcessSkipsDeadProcessWhenAliveProcessExists) {
setup_doris_home();
std::string python_path =
create_fake_python_with_socket_creation("3.9.16");
+ config::max_python_process_num = 3;
+
PythonServerManager mgr;
PythonVersion version("3.9.16", test_dir_, python_path);
@@ -565,8 +786,10 @@ TEST_F(PythonServerTest,
GetProcessSkipsDeadProcessWhenAliveProcessExists) {
EXPECT_TRUE(status.ok()) << status.to_string();
EXPECT_EQ(selected, alive_process);
- EXPECT_FALSE(mgr.process_pool_for_test(version)[1]->is_alive());
- EXPECT_EQ(mgr.process_pool_for_test(version)[1]->get_child_pid(),
dead_pid);
+ auto pool_snapshot = mgr.process_pool_snapshot_for_test(version);
+ ASSERT_EQ(pool_snapshot.size(), 2);
+ EXPECT_FALSE(pool_snapshot[1]->is_alive());
+ EXPECT_EQ(pool_snapshot[1]->get_child_pid(), dead_pid);
mgr.shutdown();
}
@@ -685,9 +908,9 @@ TEST_F(PythonServerTest,
EnsurePoolInitializedForDifferentVersionsDoesNotShareVe
config::max_python_process_num = 1;
std::string python39_path =
- create_fake_python_with_delay_and_socket_creation("python3.9",
"3.9.16", 1200);
+ create_fake_python_with_delay_and_socket_creation("python3.9",
"3.9.16", 50);
std::string python310_path =
- create_fake_python_with_delay_and_socket_creation("python3.10",
"3.10.0", 1200);
+ create_fake_python_with_delay_and_socket_creation("python3.10",
"3.10.0", 50);
PythonServerManager mgr;
PythonVersion version39("3.9.16", test_dir_, python39_path);
@@ -706,9 +929,9 @@ TEST_F(PythonServerTest,
EnsurePoolInitializedForDifferentVersionsDoesNotShareVe
EXPECT_TRUE(result39.has_value()) << result39.error().to_string();
EXPECT_TRUE(result310.has_value()) << result310.error().to_string();
- // If both versions still contended on one manager-wide lock, the elapsed
time would
- // be close to two serialized 1.2s startups instead of a single startup
window.
- EXPECT_LT(elapsed.count(), 2200);
+ // Keep the assertion loose for ASAN/CI scheduling while still catching
full init-timeout
+ // serialization between versions.
+ EXPECT_LT(elapsed.count(), 2000);
mgr.shutdown();
}
@@ -721,6 +944,8 @@ TEST_F(PythonServerTest,
CheckAndRecreateProcessesRecreatesDeadProcess) {
setup_doris_home();
std::string python_path =
create_fake_python_with_socket_creation("3.9.16");
+ config::max_python_process_num = 3;
+
PythonServerManager mgr;
PythonVersion version("3.9.16", test_dir_, python_path);
@@ -740,22 +965,94 @@ TEST_F(PythonServerTest,
CheckAndRecreateProcessesRecreatesDeadProcess) {
mgr.check_and_recreate_processes_for_test();
- ASSERT_EQ(mgr.process_pool_for_test(version).size(), 3);
- EXPECT_EQ(mgr.process_pool_for_test(version)[0], alive_process);
- EXPECT_EQ(mgr.process_pool_for_test(version)[2], nullptr);
+ auto pool_snapshot = mgr.process_pool_snapshot_for_test(version);
+ ASSERT_EQ(pool_snapshot.size(), 3);
+ EXPECT_EQ(pool_snapshot[0], alive_process);
- ProcessPtr recreated = mgr.process_pool_for_test(version)[1];
+ ProcessPtr recreated = pool_snapshot[1];
ASSERT_NE(recreated, nullptr);
EXPECT_TRUE(recreated->is_alive());
EXPECT_NE(recreated->get_child_pid(), dead_pid_before);
+ ASSERT_NE(pool_snapshot[2], nullptr);
+ EXPECT_TRUE(pool_snapshot[2]->is_alive());
+
+ mgr.shutdown();
+}
+
+TEST_F(PythonServerTest, CheckAndRecreateProcessesSkipsRepairingPool) {
+ setup_doris_home();
+ std::string python_path =
create_fake_python_with_socket_creation("3.9.16");
+
+ config::max_python_process_num = 1;
+
+ PythonServerManager mgr;
+ PythonVersion version("3.9.16", test_dir_, python_path);
+
+ ProcessPtr dead_process;
+ ASSERT_TRUE(mgr.fork(version, &dead_process).ok());
+ ASSERT_NE(dead_process, nullptr);
+ pid_t dead_pid = dead_process->get_child_pid();
+ dead_process->shutdown();
+ ASSERT_FALSE(dead_process->is_alive());
+
+ mgr.set_process_pool_for_test(version, {dead_process});
+ auto pool_result = mgr._ensure_pool_initialized(version);
+ ASSERT_TRUE(pool_result.has_value()) << pool_result.error().to_string();
+ {
+ std::lock_guard<std::mutex> lock(pool_result.value()->mutex);
+ pool_result.value()->repairing = true;
+ }
+
+ mgr.check_and_recreate_processes_for_test();
+
+ auto pool_snapshot = mgr.process_pool_snapshot_for_test(version);
+ ASSERT_EQ(pool_snapshot.size(), 1);
+ ASSERT_NE(pool_snapshot[0], nullptr);
+ EXPECT_FALSE(pool_snapshot[0]->is_alive());
+ EXPECT_EQ(pool_snapshot[0]->get_child_pid(), dead_pid);
+ {
+ std::lock_guard<std::mutex> lock(pool_result.value()->mutex);
+ pool_result.value()->repairing = false;
+ }
mgr.shutdown();
}
-TEST_F(PythonServerTest,
CheckAndRecreateProcessesErasesDeadProcessWhenRecreateFails) {
+TEST_F(PythonServerTest, CheckAndRecreateProcessesSkipsUninitializedPool) {
setup_doris_home();
std::string python_path =
create_fake_python_with_socket_creation("3.9.16");
+ config::max_python_process_num = 1;
+
+ PythonServerManager mgr;
+ PythonVersion version("3.9.16", test_dir_, python_path);
+
+ ProcessPtr dead_process;
+ ASSERT_TRUE(mgr.fork(version, &dead_process).ok());
+ ASSERT_NE(dead_process, nullptr);
+ pid_t dead_pid = dead_process->get_child_pid();
+ dead_process->shutdown();
+ ASSERT_FALSE(dead_process->is_alive());
+
+ mgr.set_process_pool_for_test(version, {dead_process}, false);
+
+ mgr.check_and_recreate_processes_for_test();
+
+ auto pool_snapshot = mgr.process_pool_snapshot_for_test(version);
+ ASSERT_EQ(pool_snapshot.size(), 1);
+ ASSERT_NE(pool_snapshot[0], nullptr);
+ EXPECT_FALSE(pool_snapshot[0]->is_alive());
+ EXPECT_EQ(pool_snapshot[0]->get_child_pid(), dead_pid);
+
+ mgr.shutdown();
+}
+
+TEST_F(PythonServerTest,
CheckAndRecreateProcessesKeepsDeadSlotsWhenRecreateFails) {
+ setup_doris_home();
+ std::string python_path =
create_fake_python_with_socket_creation("3.9.16");
+
+ config::max_python_process_num = 2;
+
PythonServerManager mgr;
PythonVersion live_version("3.9.16", test_dir_, python_path);
@@ -776,9 +1073,32 @@ TEST_F(PythonServerTest,
CheckAndRecreateProcessesErasesDeadProcessWhenRecreateF
mgr.check_and_recreate_processes_for_test();
- EXPECT_TRUE(mgr.process_pool_for_test(invalid_version).empty());
+ auto pool_snapshot = mgr.process_pool_snapshot_for_test(invalid_version);
+ ASSERT_EQ(pool_snapshot.size(), 2);
+ EXPECT_FALSE(pool_snapshot[0]->is_alive());
+ EXPECT_FALSE(pool_snapshot[1]->is_alive());
mgr.shutdown();
}
+TEST_F(PythonServerTest, ReadProcessMemoryCurrentProcessSucceeds) {
+ PythonServerManager mgr;
+ size_t rss_bytes = 0;
+
+ Status status = mgr._read_process_memory(getpid(), &rss_bytes);
+
+ EXPECT_TRUE(status.ok()) << status.to_string();
+ EXPECT_GT(rss_bytes, 0);
+}
+
+TEST_F(PythonServerTest, ReadProcessMemoryInvalidPidFails) {
+ PythonServerManager mgr;
+ size_t rss_bytes = 0;
+
+ Status status = mgr._read_process_memory(-1, &rss_bytes);
+
+ EXPECT_FALSE(status.ok());
+ EXPECT_NE(status.to_string().find("/proc/-1/statm"), std::string::npos);
+}
+
} // namespace doris
diff --git a/be/test/udf/python/python_udf_runtime_test.cpp
b/be/test/udf/python/python_udf_runtime_test.cpp
index 99728c0500a..305dc9f5c06 100644
--- a/be/test/udf/python/python_udf_runtime_test.cpp
+++ b/be/test/udf/python/python_udf_runtime_test.cpp
@@ -20,6 +20,7 @@
#include <gtest/gtest.h>
#include <sys/socket.h>
#include <sys/un.h>
+#include <sys/wait.h>
#include <unistd.h>
#include <boost/process.hpp>
@@ -132,6 +133,73 @@ TEST_F(PythonUDFRuntimeTest, ProcessPtrIsSharedPtr) {
EXPECT_FALSE(ptr);
}
+TEST_F(PythonUDFRuntimeTest, WaitChildExitReturnsExitedForExitedChild) {
+ bp::ipstream output;
+ bp::child child("/bin/bash", "-c", "exit 7", bp::std_out > output);
+ ASSERT_TRUE(child.valid());
+
+ int exit_status = 0;
+ auto result = PythonUDFProcess::wait_child_exit(child.id(),
std::chrono::milliseconds(1000),
+ &exit_status);
+ child.detach();
+
+ EXPECT_EQ(result, PythonUDFProcess::ChildExitWaitResult::EXITED);
+ EXPECT_TRUE(WIFEXITED(exit_status));
+ EXPECT_EQ(WEXITSTATUS(exit_status), 7);
+}
+
+TEST_F(PythonUDFRuntimeTest, WaitChildExitReturnsTimeoutForRunningChild) {
+ bp::ipstream output;
+ bp::child child("/bin/sleep", "60", bp::std_out > output);
+ ASSERT_TRUE(child.valid());
+ ASSERT_TRUE(child.running());
+
+ int exit_status = 0;
+ auto result = PythonUDFProcess::wait_child_exit(child.id(),
std::chrono::milliseconds(20),
+ &exit_status);
+
+ EXPECT_EQ(result, PythonUDFProcess::ChildExitWaitResult::TIMEOUT);
+
+ child.terminate();
+ child.wait();
+}
+
+TEST_F(PythonUDFRuntimeTest, WaitChildExitReturnsAlreadyReapedForReapedChild) {
+ bp::ipstream output;
+ bp::child child("/bin/true", bp::std_out > output);
+ ASSERT_TRUE(child.valid());
+ pid_t child_pid = child.id();
+ child.wait();
+
+ int exit_status = 0;
+ auto result = PythonUDFProcess::wait_child_exit(child_pid,
std::chrono::milliseconds(0),
+ &exit_status);
+
+ EXPECT_EQ(result, PythonUDFProcess::ChildExitWaitResult::ALREADY_REAPED);
+}
+
+TEST_F(PythonUDFRuntimeTest, BackgroundReaperReapsQueuedChild) {
+ bp::ipstream output;
+ bp::child child("/bin/bash", "-c", "sleep 0.1; exit 0", bp::std_out >
output);
+ ASSERT_TRUE(child.valid());
+ pid_t child_pid = child.id();
+
+ // Do not try to force the real "SIGKILLed but still not reapable" case in
UT. That usually
+ // needs kernel-level uninterruptible sleep. The behavior we must
guarantee is that once such a
+ // pid is handed off, the background reaper keeps waitpid ownership until
the child exits.
+ child.detach();
+ PythonUDFProcess::enqueue_child_for_reap(child_pid);
+
+ bool reaped = PythonUDFProcess::wait_background_reaped_for_test(
+ child_pid, std::chrono::milliseconds(5000));
+ EXPECT_TRUE(reaped);
+
+ int exit_status = 0;
+ auto result = PythonUDFProcess::wait_child_exit(child_pid,
std::chrono::milliseconds(0),
+ &exit_status);
+ EXPECT_EQ(result, PythonUDFProcess::ChildExitWaitResult::ALREADY_REAPED);
+}
+
// Test socket file path generation for various PIDs
TEST_F(PythonUDFRuntimeTest, SocketPathGenerationEdgeCases) {
// Minimum PID
@@ -260,6 +328,27 @@ TEST_F(PythonUDFRuntimeTest, ShutdownWithStubbornProcess) {
EXPECT_FALSE(process.is_alive());
}
+TEST_F(PythonUDFRuntimeTest,
ShutdownEnqueuesBackgroundReapWhenSigkillWaitTimesOut) {
+ bp::ipstream output;
+ bp::child child("/bin/bash", "-c", "trap '' TERM; exec sleep 60",
bp::std_out > output);
+ ASSERT_TRUE(child.valid());
+ pid_t child_pid = child.id();
+
+ PythonUDFProcess process(std::move(child), std::move(output));
+ ASSERT_TRUE(process.is_alive());
+
+ // SIGKILL not becoming reapable inside a short bounded wait is rare and
depends on kernel
+ // state, so force only the wait results here. This covers the shutdown
handoff contract:
+ // a pid that was killed but not reaped synchronously must be owned by the
background reaper.
+ PythonUDFProcess::force_child_exit_timeouts_for_test(2);
+ process.shutdown();
+ PythonUDFProcess::force_child_exit_timeouts_for_test(0);
+
+ EXPECT_TRUE(process.is_shutdown());
+ EXPECT_TRUE(PythonUDFProcess::wait_background_reaped_for_test(child_pid,
+
std::chrono::milliseconds(5000)));
+}
+
// ============================================================================
// PythonUDFProcess remove_unix_socket() tests
// ============================================================================
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]