This is an automated email from the ASF dual-hosted git repository.

hello-stephen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ece7b5ddc6 [Enhancement](pyudf) Enhance Python process poll init and 
repair (#64039)
3ece7b5ddc6 is described below

commit 3ece7b5ddc6b59700ea584a98538c97962c16d23
Author: linrrarity <[email protected]>
AuthorDate: Tue Jun 9 14:16:42 2026 +0800

    [Enhancement](pyudf) Enhance Python process poll init and repair (#64039)
    
    Problem Summary:
    
    Python UDF process pool initialization previously required the whole
    pool to finish initialization before BE could continue serving the
    query.
    
    In abnormal environments, Python process startup may hang or take a very
    long time in paths such as:
    
    - `fork` / child process creation
    - waiting for the Python Flight socket to appear
    - terminating and waiting for a failed child process to exit
    
    When one process slot gets stuck, the whole process pool initialization
    can be blocked. As a result, FE may hit the send fragments RPC timeout
    before BE returns a meaningful Python UDF error: `RpcException, msg:
    timeout when waiting for send fragments rpc, query timeout:900, left
    timeout for this operation:30`.
    
    be.log:
    ```text
    Initializing Python process pool for version 3.8.19 with 8 processes
    Python process pool initialization progress for version 3.8.19: 
waiting_slot=4/8, success=3, failed=0, elapsed_ms=20508
    Python process pool initialization progress for version 3.8.19: 
waiting_slot=4/8, success=3, failed=0, elapsed_ms=40508
    Python process pool initialization progress for version 3.8.19: 
waiting_slot=4/8, success=3, failed=0, elapsed_ms=60508
    Python process pool initialization progress for version 3.8.19: 
waiting_slot=4/8, success=3, failed=0, elapsed_ms=80508
    Python process pool initialization progress for version 3.8.19: 
waiting_slot=4/8, success=3, failed=0, elapsed_ms=100508
    Python process pool initialization progress for version 3.8.19: 
waiting_slot=4/8, success=3, failed=0, elapsed_ms=120508
    ```
    ### Solution
    
    Change Python process pool initialization from "wait until all processes
    are created" to "return once at least one usable process is available".
    
    The pool no longer treats full-size initialization as a prerequisite for
    serving queries. Once one Python process is alive, the current query can
    proceed. Missing or failed process slots are repaired asynchronously by
    the existing health check / repair path.
    
    - Bound process pool initialization time, so BE can return
    `SERVICE_UNAVAILABLE` before FE send fragments RPC timeout.
    - Allow partial pool availability: initialization succeeds as long as
    one usable Python process exists.
    - Mark the first initialization round as completed after success or
    timeout, then rely on health check / repair to fill missing slots.
    - Add bounded wait/reap logic for Python child shutdown to avoid
    blocking indefinitely in `wait`.
    - Protect late init / repair workers from writing back after shutdown,
    and discard late duplicate processes safely.
    - Share repair guarding between foreground repair and health check to
    avoid duplicate repair pressure.
---
 be/src/udf/python/python_server.cpp            | 570 ++++++++++++++++++-------
 be/src/udf/python/python_server.h              |  50 ++-
 be/src/udf/python/python_udf_runtime.cpp       | 220 +++++++++-
 be/src/udf/python/python_udf_runtime.h         |  15 +-
 be/test/udf/python/python_server_test.cpp      | 364 +++++++++++++++-
 be/test/udf/python/python_udf_runtime_test.cpp |  89 ++++
 6 files changed, 1104 insertions(+), 204 deletions(-)

diff --git a/be/src/udf/python/python_server.cpp 
b/be/src/udf/python/python_server.cpp
index 5ea1ef41409..cf78591a2ef 100644
--- a/be/src/udf/python/python_server.cpp
+++ b/be/src/udf/python/python_server.cpp
@@ -21,19 +21,22 @@
 #include <butil/fd_utility.h>
 #include <dirent.h>
 #include <fmt/core.h>
+#include <signal.h>
 #include <sys/poll.h>
 #include <sys/stat.h>
+#include <unistd.h>
 
+#include <algorithm>
 #include <boost/asio.hpp>
 #include <boost/process.hpp>
 #include <chrono>
 #include <fstream>
-#include <future>
 #include <thread>
 
 #include "arrow/flight/client.h"
 #include "common/config.h"
 #include "common/status.h"
+#include "runtime/thread_context.h"
 #include "udf/python/python_udaf_client.h"
 #include "udf/python/python_udf_client.h"
 #include "udf/python/python_udtf_client.h"
@@ -41,9 +44,15 @@
 
 namespace doris {
 
-std::shared_ptr<PythonServerManager::VersionedProcessPool>
+Result<std::shared_ptr<PythonServerManager::VersionedProcessPool>>
 PythonServerManager::_get_or_create_process_pool(const PythonVersion& version) 
{
     std::lock_guard<std::mutex> lock(_pools_mutex);
+    // shutdown() owns the manager lifecycle. Once it starts, creating a new 
pool would let detached
+    // init workers publish Python processes that the manager no longer tracks.
+    if (_shutdown_flag.load(std::memory_order_acquire)) {
+        return ResultError(Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
+                "Python server manager is shutting down"));
+    }
     auto& pool = _process_pools[version];
     if (!pool) {
         pool = std::make_shared<VersionedProcessPool>();
@@ -66,18 +75,53 @@ PythonServerManager::_snapshot_process_pools() {
 void PythonServerManager::set_process_pool_for_test(const PythonVersion& 
version,
                                                     std::vector<ProcessPtr> 
processes,
                                                     bool initialized) {
-    auto versioned_pool = _get_or_create_process_pool(version);
+    auto versioned_pool = _get_or_create_process_pool(version).value();
     std::lock_guard<std::mutex> lock(versioned_pool->mutex);
     versioned_pool->processes = std::move(processes);
-    versioned_pool->initialized = initialized;
+    versioned_pool->state = initialized ? PoolState::INITIALIZED : 
PoolState::UNINITIALIZED;
+    versioned_pool->has_available_process =
+            std::any_of(versioned_pool->processes.begin(), 
versioned_pool->processes.end(),
+                        [](const ProcessPtr& process) { return process && 
process->is_alive(); });
 }
 
-std::vector<ProcessPtr>& PythonServerManager::process_pool_for_test(const 
PythonVersion& version) {
-    auto versioned_pool = _get_or_create_process_pool(version);
+std::vector<ProcessPtr> PythonServerManager::process_pool_snapshot_for_test(
+        const PythonVersion& version) {
+    auto versioned_pool = _get_or_create_process_pool(version).value();
+    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
     return versioned_pool->processes;
 }
+
+bool PythonServerManager::process_pool_is_initializing_for_test(const 
PythonVersion& version) {
+    auto versioned_pool = _get_or_create_process_pool(version).value();
+    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+    return versioned_pool->state == PoolState::INITIALIZING;
+}
+
+bool PythonServerManager::process_pool_is_initialized_for_test(const 
PythonVersion& version) {
+    auto versioned_pool = _get_or_create_process_pool(version).value();
+    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+    return versioned_pool->state == PoolState::INITIALIZED;
+}
 #endif
 
+bool PythonServerManager::_select_alive_process_from_pool(const 
std::vector<ProcessPtr>& pool,
+                                                          ProcessPtr* process) 
{
+    auto alive_iter = std::min_element(pool.begin(), pool.end(),
+                                       [](const ProcessPtr& a, const 
ProcessPtr& b) {
+                                           const bool a_alive = a && 
a->is_alive();
+                                           const bool b_alive = b && 
b->is_alive();
+                                           if (a_alive != b_alive) {
+                                               return a_alive > b_alive;
+                                           }
+                                           return a.use_count() < 
b.use_count();
+                                       });
+    if (alive_iter == pool.end() || !*alive_iter || 
!(*alive_iter)->is_alive()) {
+        return false;
+    }
+    *process = *alive_iter;
+    return true;
+}
+
 template <typename ClientType>
 Status PythonServerManager::get_client(const PythonUDFMeta& func_meta, const 
PythonVersion& version,
                                        std::shared_ptr<ClientType>* client,
@@ -99,130 +143,199 @@ Status PythonServerManager::get_client(const 
PythonUDFMeta& func_meta, const Pyt
 
 Result<std::shared_ptr<PythonServerManager::VersionedProcessPool>>
 PythonServerManager::_ensure_pool_initialized(const PythonVersion& version) {
-    auto versioned_pool = _get_or_create_process_pool(version);
-    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
-
-    // Check if already initialized
-    if (versioned_pool->initialized) return versioned_pool;
-
-    // 0 means use CPU core count as default, otherwise use the specified value
-    int max_pool_size = config::max_python_process_num > 0 ? 
config::max_python_process_num
-                                                           : 
CpuInfo::num_cores();
-
-    LOG(INFO) << "Initializing Python process pool for version " << 
version.to_string() << " with "
-              << max_pool_size
-              << " processes (config::max_python_process_num=" << 
config::max_python_process_num
-              << ", CPU cores=" << CpuInfo::num_cores() << ")";
-
-    std::vector<std::future<Status>> futures;
-    std::vector<ProcessPtr> temp_processes(max_pool_size);
+    auto versioned_pool_result = _get_or_create_process_pool(version);
+    if (!versioned_pool_result.has_value()) {
+        return ResultError(versioned_pool_result.error());
+    }
+    auto versioned_pool = versioned_pool_result.value();
+    const int max_pool_size = config::max_python_process_num > 0 ? 
config::max_python_process_num
+                                                                 : 
CpuInfo::num_cores();
 
-    for (int i = 0; i < max_pool_size; i++) {
-        futures.push_back(std::async(std::launch::async, [this, &version, i, 
&temp_processes]() {
-            ProcessPtr process;
-            Status s = fork(version, &process);
-            if (s.ok()) {
-                temp_processes[i] = std::move(process);
-            }
-            return s;
-        }));
+    std::unique_lock<std::mutex> lock(versioned_pool->mutex);
+    if (versioned_pool->state == PoolState::INITIALIZED) {
+        return versioned_pool;
     }
 
-    int success_count = 0;
-    int failure_count = 0;
-    const auto init_start_time = std::chrono::steady_clock::now();
-#ifdef BE_TEST
-    constexpr auto progress_log_interval = std::chrono::milliseconds(50);
-#else
-    constexpr auto progress_log_interval = std::chrono::seconds(20);
-#endif
-    for (int i = 0; i < max_pool_size; i++) {
-        // Print init log every 20s until the current slot is ready.
-        while (futures[i].wait_for(progress_log_interval) != 
std::future_status::ready) {
-            const auto now = std::chrono::steady_clock::now();
-            const auto total_elapsed_ms =
-                    std::chrono::duration_cast<std::chrono::milliseconds>(now 
- init_start_time)
-                            .count();
-            LOG(INFO) << "Python process pool initialization progress for 
version "
-                      << version.to_string() << ": waiting_slot=" << (i + 1) 
<< "/" << max_pool_size
-                      << ", success=" << success_count << ", failed=" << 
failure_count
-                      << ", elapsed_ms=" << total_elapsed_ms;
+    if (versioned_pool->state != PoolState::STOPPED) {
+        if (versioned_pool->state != PoolState::INITIALIZING) {
+            versioned_pool->state = PoolState::INITIALIZING;
+            versioned_pool->has_available_process = false;
+            versioned_pool->processes.resize(max_pool_size);
+            auto init_finished_count = std::make_shared<std::atomic<int>>(0);
+
+            LOG(INFO) << "Initializing Python process pool for version " << 
version.to_string()
+                      << " with " << max_pool_size << " processes 
(config::max_python_process_num="
+                      << config::max_python_process_num << ", CPU cores=" << 
CpuInfo::num_cores()
+                      << ")";
+
+            std::thread([this, versioned_pool, init_finished_count, 
max_pool_size]() {
+                SCOPED_INIT_THREAD_CONTEXT();
+                std::unique_lock<std::mutex> lock(versioned_pool->mutex);
+                versioned_pool->cv.wait_for(
+                        lock, PROCESS_POOL_INIT_TIMEOUT,
+                        [&versioned_pool, init_finished_count, 
max_pool_size]() {
+                            return versioned_pool->state != 
PoolState::INITIALIZING ||
+                                   
init_finished_count->load(std::memory_order_acquire) >=
+                                           max_pool_size;
+                        });
+                if (versioned_pool->state == PoolState::INITIALIZING) {
+                    if (versioned_pool->has_available_process) {
+                        // Keep this under the pool lock. shutdown() must 
acquire the same lock
+                        // before it can destroy manager-owned health-check 
state.
+                        _start_health_check_thread();
+                        versioned_pool->state = PoolState::INITIALIZED;
+                    } else {
+                        versioned_pool->state = PoolState::UNINITIALIZED;
+                    }
+                }
+                versioned_pool->cv.notify_all();
+            }).detach();
+
+            for (int i = 0; i < max_pool_size; ++i) {
+                std::thread([version, versioned_pool, i, max_pool_size, 
init_finished_count]() {
+                    SCOPED_INIT_THREAD_CONTEXT();
+                    ProcessPtr process;
+                    Status status = PythonServerManager::fork(version, 
&process);
+                    const bool ok = status.ok() && process;
+                    ProcessPtr process_to_shutdown;
+                    {
+                        std::lock_guard<std::mutex> 
lock(versioned_pool->mutex);
+                        // shutdown() and repair can race with detached init 
workers after timeout.
+                        // Late successful forks only fill slots that are 
still empty or dead.
+                        if (ok &&
+                            (versioned_pool->state == PoolState::INITIALIZING 
||
+                             versioned_pool->state == PoolState::INITIALIZED) 
&&
+                            i < versioned_pool->processes.size() &&
+                            (!versioned_pool->processes[i] ||
+                             !versioned_pool->processes[i]->is_alive())) {
+                            versioned_pool->processes[i] = std::move(process);
+                            versioned_pool->has_available_process = true;
+                        } else if (ok) {
+                            process_to_shutdown = std::move(process);
+                        } else [[unlikely]] {
+                            LOG(WARNING) << "Failed to create Python process " 
<< (i + 1) << "/"
+                                         << max_pool_size << " for version " 
<< version.to_string()
+                                         << ": " << status.to_string();
+                        }
+                    }
+                    init_finished_count->fetch_add(1, 
std::memory_order_acq_rel);
+                    versioned_pool->cv.notify_all();
+                    if (process_to_shutdown) {
+                        process_to_shutdown->shutdown();
+                    }
+                }).detach();
+            }
         }
 
-        Status s = futures[i].get();
-        if (s.ok() && temp_processes[i]) {
-            
versioned_pool->processes.emplace_back(std::move(temp_processes[i]));
-            success_count++;
-        } else {
-            failure_count++;
-            LOG(WARNING) << "Failed to create Python process " << (i + 1) << 
"/" << max_pool_size
-                         << ": " << s.to_string();
+        // Wait only for the first usable process. INITIALIZED is set later by 
the last init worker
+        // after every slot has attempted initialization.
+        versioned_pool->cv.wait_for(lock, PROCESS_POOL_INIT_TIMEOUT, 
[&versioned_pool]() {
+            return versioned_pool->has_available_process ||
+                   versioned_pool->state == PoolState::STOPPED ||
+                   versioned_pool->state != PoolState::INITIALIZING;
+        });
+        if (versioned_pool->has_available_process) {
+            return versioned_pool;
         }
+        versioned_pool->cv.notify_all();
     }
 
-    if (versioned_pool->processes.empty()) {
-        return ResultError(Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
-                "Failed to initialize Python process pool: all {} process 
creation attempts failed",
-                max_pool_size));
-    }
-
-    const auto total_elapsed_ms = 
std::chrono::duration_cast<std::chrono::milliseconds>(
-                                          std::chrono::steady_clock::now() - 
init_start_time)
-                                          .count();
-    LOG(INFO) << "Python process pool initialized for version " << 
version.to_string()
-              << ": created " << success_count << " processes"
-              << (failure_count > 0 ? fmt::format(" ({} failed)", 
failure_count) : "")
-              << ", elapsed_ms=" << total_elapsed_ms;
-
-    versioned_pool->initialized = true;
-    _start_health_check_thread();
-
-    return versioned_pool;
+    return ResultError(Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
+            "Failed to initialize Python process pool for version {}: no 
process became available "
+            "within {} ms",
+            version.to_string(), PROCESS_POOL_INIT_TIMEOUT.count()));
 }
 
 Status PythonServerManager::_get_process(
         const PythonVersion& version, const 
std::shared_ptr<VersionedProcessPool>& versioned_pool,
         ProcessPtr* process) {
-    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
-    std::vector<ProcessPtr>& pool = versioned_pool->processes;
+    {
+        std::unique_lock<std::mutex> lock(versioned_pool->mutex);
+        std::vector<ProcessPtr>& pool = versioned_pool->processes;
 
-    if (UNLIKELY(pool.empty())) {
-        return Status::InternalError("Python process pool is empty for version 
{}",
-                                     version.to_string());
-    }
+        if (versioned_pool->state == PoolState::STOPPED) {
+            versioned_pool->has_available_process = false;
+            return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
+                    "Python process pool has stopped for version {}", 
version.to_string());
+        }
 
-    // Prefer an already-alive process and only use load balancing inside that 
alive subset.
-    // keep dead entries stay in the pool for the background health checker
-    // unless there is no alive process left for the current request.
-    auto min_alive_iter = std::min_element(pool.begin(), pool.end(),
-                                           [](const ProcessPtr& a, const 
ProcessPtr& b) {
-                                               const bool a_alive = a && 
a->is_alive();
-                                               const bool b_alive = b && 
b->is_alive();
-                                               if (a_alive != b_alive) {
-                                                   return a_alive > b_alive;
-                                               }
-                                               return a.use_count() < 
b.use_count();
-                                           });
-
-    if (min_alive_iter != pool.end() && *min_alive_iter && 
(*min_alive_iter)->is_alive()) {
-        *process = *min_alive_iter;
-        return Status::OK();
-    }
+        if (_select_alive_process_from_pool(pool, process)) [[likely]] {
+            versioned_pool->has_available_process = true;
+            return Status::OK();
+        }
+        versioned_pool->has_available_process = false;
+
+        if (versioned_pool->state == PoolState::INITIALIZING) {
+            versioned_pool->cv.wait_for(lock, PROCESS_REPAIR_WAIT_TIMEOUT, 
[&versioned_pool]() {
+                return std::any_of(versioned_pool->processes.begin(),
+                                   versioned_pool->processes.end(),
+                                   [](const ProcessPtr& p) { return p && 
p->is_alive(); }) ||
+                       versioned_pool->state != PoolState::INITIALIZING;
+            });
+            if (versioned_pool->state == PoolState::STOPPED) {
+                return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
+                        "Python process pool has stopped for version {}", 
version.to_string());
+            }
+            if (_select_alive_process_from_pool(pool, process)) {
+                versioned_pool->has_available_process = true;
+                return Status::OK();
+            }
+            versioned_pool->has_available_process = false;
+            return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
+                    "Python process pool is initializing but has no available 
process for version "
+                    "{} after waiting {} ms",
+                    version.to_string(), PROCESS_REPAIR_WAIT_TIMEOUT.count());
+        }
+
+        if (versioned_pool->state != PoolState::INITIALIZED) {
+            return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
+                    "Python process pool is not initialized for version {}", 
version.to_string());
+        }
+
+        if (!versioned_pool->repairing) {
+            versioned_pool->repairing = true;
+            // Repair is done in the background because fork can be slow. The 
current request still
+            // waits briefly below so a transient all-dead pool can recover 
without failing.
+            std::thread([version, versioned_pool]() {
+                SCOPED_INIT_THREAD_CONTEXT();
+                int recreated = 
PythonServerManager::_repair_process_pool(version, versioned_pool);
+                {
+                    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+                    versioned_pool->repairing = false;
+                }
+                versioned_pool->cv.notify_all();
+                if (recreated > 0) {
+                    LOG(INFO) << "Repaired Python process pool for version " 
<< version.to_string()
+                              << ": recreated=" << recreated;
+                }
+            }).detach();
+        }
+
+        // Keep the request recoverable in the common case where the Python 
runtime can fork
+        // normally and only the existing pool entries died. The wait is short 
so a wedged fork path
+        // still returns SERVICE_UNAVAILABLE promptly.
+        versioned_pool->cv.wait_for(lock, PROCESS_REPAIR_WAIT_TIMEOUT, 
[&versioned_pool]() {
+            return std::any_of(versioned_pool->processes.begin(), 
versioned_pool->processes.end(),
+                               [](const ProcessPtr& p) { return p && 
p->is_alive(); }) ||
+                   versioned_pool->state == PoolState::STOPPED;
+        });
+        if (versioned_pool->state == PoolState::STOPPED) {
+            versioned_pool->has_available_process = false;
+            return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
+                    "Python process pool has stopped for version {}", 
version.to_string());
+        }
 
-    // Only reach here when the pool has no alive process at all. Try one 
foreground
-    // recovery so the caller has a chance to proceed; leave batch repair to 
health check.
-    auto& candidate = pool.front();
-    ProcessPtr replacement;
-    Status status = fork(version, &replacement);
-    if (!status.ok()) {
-        return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
-                "Python process pool has no available process for version {}, 
reason: {}",
-                version.to_string(), status.to_string());
+        if (_select_alive_process_from_pool(pool, process)) {
+            versioned_pool->has_available_process = true;
+            return Status::OK();
+        }
+        versioned_pool->has_available_process = false;
     }
 
-    candidate = std::move(replacement);
-    *process = candidate;
-    return Status::OK();
+    return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
+            "Python process pool has no available process for version {} after 
waiting repair for "
+            "{} ms",
+            version.to_string(), PROCESS_REPAIR_WAIT_TIMEOUT.count());
 }
 
 Status PythonServerManager::fork(const PythonVersion& version, ProcessPtr* 
process) {
@@ -243,13 +356,14 @@ Status PythonServerManager::fork(const PythonVersion& 
version, ProcessPtr* proce
                     }
                 }));
 
-        // Wait for socket file to be created (indicates server is ready)
-        std::string expected_socket_path = get_unix_socket_file_path(c.id());
+        // Bound socket readiness: a child process can start but never create 
the Flight socket.
+        // Without this, pool initialization can block until FE reports 
send-fragments RPC timeout.
+        pid_t child_pid = c.id();
+        std::string expected_socket_path = 
get_unix_socket_file_path(child_pid);
         bool started_successfully = false;
         std::chrono::steady_clock::time_point start = 
std::chrono::steady_clock::now();
-        const auto timeout = std::chrono::milliseconds(5000);
 
-        while (std::chrono::steady_clock::now() - start < timeout) {
+        while (std::chrono::steady_clock::now() - start < 
PROCESS_START_TIMEOUT) {
             struct stat buffer;
             if (stat(expected_socket_path.c_str(), &buffer) == 0) {
                 started_successfully = true;
@@ -259,14 +373,43 @@ Status PythonServerManager::fork(const PythonVersion& 
version, ProcessPtr* proce
             if (!c.running()) {
                 break;
             }
-            std::this_thread::sleep_for(std::chrono::milliseconds(1));
+            std::this_thread::sleep_for(std::chrono::milliseconds(100));
         }
 
         if (!started_successfully) {
+            int exit_status = 0;
             if (c.running()) {
-                c.terminate();
-                c.wait();
+                // Don't use the `wait` of boost::process, but use the 
operating system signal and waitpid with timeout instead.
+                // Because boost::process may block the initialization/repair 
thread for a long time,
+                // exceeding the timeout limit expected by the process pool.
+                ::kill(child_pid, SIGTERM);
+                auto wait_result = PythonUDFProcess::wait_child_exit(
+                        child_pid, PROCESS_TERMINATE_TIMEOUT, &exit_status);
+                if (wait_result == 
PythonUDFProcess::ChildExitWaitResult::TIMEOUT ||
+                    wait_result == 
PythonUDFProcess::ChildExitWaitResult::ERROR) {
+                    LOG(WARNING) << "Python server start timeout and terminate 
timeout exceeded,"
+                                 << " sending SIGKILL to pid=" << child_pid;
+                    ::kill(child_pid, SIGKILL);
+                    wait_result = PythonUDFProcess::wait_child_exit(
+                            child_pid, PROCESS_TERMINATE_TIMEOUT, 
&exit_status);
+                    if (wait_result == 
PythonUDFProcess::ChildExitWaitResult::TIMEOUT ||
+                        wait_result == 
PythonUDFProcess::ChildExitWaitResult::ERROR) [[unlikely]] {
+                        // The child was SIGKILLed but not reaped within the 
bounded wait. Do not
+                        // drop waitpid ownership after detach; otherwise a 
later exit can leave a
+                        // zombie Python process under BE.
+                        PythonUDFProcess::enqueue_child_for_reap(child_pid);
+                        c.detach();
+                        return Status::InternalError(
+                                "Python server start failed: process did not 
exit after SIGKILL, "
+                                "pid={}",
+                                child_pid);
+                    }
+                }
+            } else {
+                PythonUDFProcess::wait_child_exit(child_pid, 
std::chrono::milliseconds(0),
+                                                  &exit_status);
             }
+            c.detach();
             return Status::InternalError("Python server start failed: socket 
file not found at {}",
                                          expected_socket_path);
         }
@@ -282,7 +425,7 @@ Status PythonServerManager::fork(const PythonVersion& 
version, ProcessPtr* proce
 
 void PythonServerManager::_start_health_check_thread() {
     std::lock_guard<std::mutex> lock(_health_check_mutex);
-    if (_health_check_thread) return;
+    if (_health_check_thread || 
_shutdown_flag.load(std::memory_order_acquire)) return;
 
     LOG(INFO) << "Starting Python process health check thread (interval: 30 
seconds)";
 
@@ -308,72 +451,181 @@ void PythonServerManager::_start_health_check_thread() {
 }
 
 void PythonServerManager::_check_and_recreate_processes() {
-    int total_checked = 0;
-    int total_dead = 0;
     int total_recreated = 0;
-
     for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
+        {
+            std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+            if (versioned_pool->state != PoolState::INITIALIZED || 
versioned_pool->repairing) {
+                continue;
+            }
+            // Share the same repair guard with foreground requests. Otherwise 
health check and
+            // _get_process() can fork the same empty/dead slots at the same 
time under failures.
+            versioned_pool->repairing = true;
+        }
+        int recreated = _repair_process_pool(version, versioned_pool);
+        {
+            std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+            versioned_pool->repairing = false;
+        }
+        versioned_pool->cv.notify_all();
+        total_recreated += recreated;
+    }
+
+    if (total_recreated > 0) {
+        LOG(INFO) << "Health check completed: recreated=" << total_recreated;
+    }
+}
+
+int PythonServerManager::_repair_process_pool(
+        const PythonVersion& version, const 
std::shared_ptr<VersionedProcessPool>& versioned_pool) {
+    const int max_pool_size = config::max_python_process_num > 0 ? 
config::max_python_process_num
+                                                                 : 
CpuInfo::num_cores();
+    std::vector<size_t> died_process_indices;
+    {
         std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+        if (versioned_pool->state != PoolState::INITIALIZED) {
+            return 0;
+        }
+
         auto& pool = versioned_pool->processes;
+        died_process_indices.reserve(std::max<size_t>(pool.size(), 
max_pool_size));
+        // Need to fix the following two cases
+        // 1. Existing processes have died
+        // 2. Process pool not filled due to fork timeouts and other issues
         for (size_t i = 0; i < pool.size(); ++i) {
-            auto& process = pool[i];
-            if (!process) continue;
-
-            total_checked++;
-            if (!process->is_alive()) {
-                total_dead++;
-                LOG(WARNING) << "Detected dead Python process (pid=" << 
process->get_child_pid()
-                             << ", version=" << version.to_string() << "), 
recreating...";
-
-                ProcessPtr new_process;
-                Status s = fork(version, &new_process);
-                if (s.ok()) {
-                    pool[i] = std::move(new_process);
-                    total_recreated++;
-                    LOG(INFO) << "Successfully recreated Python process for 
version "
-                              << version.to_string();
+            const auto& process = pool[i];
+            if (!process || !process->is_alive()) {
+                died_process_indices.push_back(i);
+            }
+        }
+        for (size_t i = pool.size(); i < static_cast<size_t>(max_pool_size); 
++i) {
+            died_process_indices.push_back(i);
+        }
+    }
+
+    if (died_process_indices.empty()) [[likely]] {
+        return 0;
+    }
+
+    int recreated = 0;
+    std::vector<ProcessPtr> processes_to_shutdown;
+    for (size_t index : died_process_indices) {
+        {
+            std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+            if (versioned_pool->state != PoolState::INITIALIZED) {
+                break;
+            }
+        }
+
+        ProcessPtr new_process;
+        Status status = fork(version, &new_process);
+        if (status.ok() && new_process) {
+            bool published = false;
+            {
+                std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+                auto& pool = versioned_pool->processes;
+                if (versioned_pool->state != PoolState::INITIALIZED) {
+                    processes_to_shutdown.emplace_back(std::move(new_process));
+                } else if (index < pool.size()) {
+                    if (!pool[index] || !pool[index]->is_alive()) [[likely]] {
+                        pool[index] = std::move(new_process);
+                        versioned_pool->has_available_process = true;
+                        recreated++;
+                        published = true;
+                    } else {
+                        
processes_to_shutdown.emplace_back(std::move(new_process));
+                    }
+                } else if (pool.size() < static_cast<size_t>(max_pool_size)) {
+                    pool.emplace_back(std::move(new_process));
+                    versioned_pool->has_available_process = true;
+                    recreated++;
+                    published = true;
                 } else {
-                    LOG(ERROR) << "Failed to recreate Python process for 
version "
-                               << version.to_string() << ": " << s.to_string();
-                    pool.erase(pool.begin() + i);
-                    --i;
+                    processes_to_shutdown.emplace_back(std::move(new_process));
                 }
             }
+            if (published) {
+                versioned_pool->cv.notify_all();
+            }
+        } else {
+            LOG(ERROR) << "Failed to recreate Python process for version " << 
version.to_string()
+                       << ": " << status.to_string();
         }
     }
 
-    if (total_dead > 0) {
-        LOG(INFO) << "Health check completed: checked=" << total_checked << ", 
dead=" << total_dead
-                  << ", recreated=" << total_recreated;
+    {
+        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+        auto& pool = versioned_pool->processes;
+
+        if (versioned_pool->state != PoolState::INITIALIZED) {
+            versioned_pool->has_available_process = false;
+        } else {
+            // Keep empty/dead slots instead of shrinking the vector. Init 
workers are detached and
+            // publish by original slot index; shrinking here would make a 
late successful init look
+            // out-of-range and discard a usable process.
+            versioned_pool->has_available_process = std::any_of(
+                    pool.begin(), pool.end(),
+                    [](const ProcessPtr& process) { return process && 
process->is_alive(); });
+        }
+    }
+    versioned_pool->cv.notify_all();
+    for (auto& process : processes_to_shutdown) {
+        process->shutdown();
     }
+    return recreated;
 }
 
 void PythonServerManager::shutdown() {
-    // Signal health check thread to stop
-    _shutdown_flag.store(true, std::memory_order_release);
-    _health_check_cv.notify_one();
+    std::vector<std::pair<PythonVersion, 
std::shared_ptr<VersionedProcessPool>>> pools_to_shutdown;
+    {
+        std::lock_guard<std::mutex> lock(_pools_mutex);
+        _shutdown_flag.store(true, std::memory_order_release);
+        pools_to_shutdown.reserve(_process_pools.size());
+        for (auto& [version, versioned_pool] : _process_pools) {
+            pools_to_shutdown.emplace_back(version, std::move(versioned_pool));
+        }
+        _process_pools.clear();
+    }
 
-    if (_health_check_thread && _health_check_thread->joinable()) {
-        _health_check_thread->join();
-        _health_check_thread.reset();
+    for (auto& [version, versioned_pool] : pools_to_shutdown) {
+        if (!versioned_pool) {
+            continue;
+        }
+        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+        versioned_pool->state = PoolState::STOPPED;
+        versioned_pool->has_available_process = false;
+        versioned_pool->repairing = false;
+        versioned_pool->cv.notify_all();
+    }
+
+    std::unique_ptr<std::thread> health_check_thread;
+    {
+        std::lock_guard<std::mutex> lock(_health_check_mutex);
+        health_check_thread = std::move(_health_check_thread);
+    }
+    _health_check_cv.notify_one();
+    if (health_check_thread && health_check_thread->joinable()) {
+        health_check_thread->join();
     }
 
     // Shutdown all processes
-    for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
+    std::vector<ProcessPtr> processes_to_shutdown;
+    for (auto& [version, versioned_pool] : pools_to_shutdown) {
+        if (!versioned_pool) {
+            continue;
+        }
         std::lock_guard<std::mutex> lock(versioned_pool->mutex);
         auto& pool = versioned_pool->processes;
         for (auto& process : pool) {
             if (process) {
-                process->shutdown();
+                processes_to_shutdown.emplace_back(std::move(process));
             }
         }
         pool.clear();
-        versioned_pool->initialized = false;
+        versioned_pool->cv.notify_all();
     }
-
-    {
-        std::lock_guard<std::mutex> lock(_pools_mutex);
-        _process_pools.clear();
+    for (auto& process : processes_to_shutdown) {
+        process->shutdown();
     }
 }
 
diff --git a/be/src/udf/python/python_server.h 
b/be/src/udf/python/python_server.h
index 78cef72e06d..5ee763d5f52 100644
--- a/be/src/udf/python/python_server.h
+++ b/be/src/udf/python/python_server.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <atomic>
+#include <chrono>
 #include <condition_variable>
 #include <memory>
 #include <mutex>
@@ -47,7 +48,7 @@ public:
                       std::shared_ptr<T>* client,
                       const std::shared_ptr<arrow::Schema>& data_schema = 
nullptr);
 
-    Status fork(const PythonVersion& version, ProcessPtr* process);
+    static Status fork(const PythonVersion& version, ProcessPtr* process);
 
     // Clear Python module cache for a specific UDF location across all 
processes
     Status clear_module_cache(const std::string& location);
@@ -64,7 +65,11 @@ public:
     void set_process_pool_for_test(const PythonVersion& version, 
std::vector<ProcessPtr> processes,
                                    bool initialized = true);
 
-    std::vector<ProcessPtr>& process_pool_for_test(const PythonVersion& 
version);
+    std::vector<ProcessPtr> process_pool_snapshot_for_test(const 
PythonVersion& version);
+
+    bool process_pool_is_initializing_for_test(const PythonVersion& version);
+
+    bool process_pool_is_initialized_for_test(const PythonVersion& version);
 
     Status broadcast_action_to_processes_for_test(const std::string& 
action_type,
                                                   const std::string& body,
@@ -74,10 +79,28 @@ public:
 #endif
 
 private:
+    enum class PoolState {
+        UNINITIALIZED,
+        INITIALIZING,
+        // `Initialized` means:
+        // 1. All process slots have attempted initialization.
+        // 2. At least one live process is available for requests.
+        // 3. The health-check thread has been started.
+        INITIALIZED,
+        // Prevent the pool from being incorrectly reused after `shutdown()`
+        STOPPED,
+    };
+
     struct VersionedProcessPool {
         std::mutex mutex;
+        // Coordinates initialization and repair workers with foreground 
requests.
+        std::condition_variable cv;
         std::vector<ProcessPtr> processes;
-        bool initialized = false;
+        PoolState state = PoolState::UNINITIALIZED;
+        // True when at least one process in the pool can serve requests.
+        bool has_available_process = false;
+        // True while a background repair is recreating dead or missing 
processes.
+        bool repairing = false;
     };
 
     /** 
@@ -104,6 +127,22 @@ private:
      */
     void _check_and_recreate_processes();
 
+#ifdef BE_TEST
+    static constexpr std::chrono::milliseconds PROCESS_START_TIMEOUT {500};
+    static constexpr std::chrono::milliseconds PROCESS_TERMINATE_TIMEOUT {100};
+    static constexpr std::chrono::milliseconds PROCESS_POOL_INIT_TIMEOUT 
{1000};
+    static constexpr std::chrono::milliseconds PROCESS_REPAIR_WAIT_TIMEOUT 
{200};
+#else
+    static constexpr std::chrono::milliseconds PROCESS_START_TIMEOUT {5000};
+    static constexpr std::chrono::milliseconds PROCESS_TERMINATE_TIMEOUT 
{1000};
+    // FE's default send-fragments RPC timeout is 30s. Keep BE's Python pool 
wait below it so the
+    // caller sees SERVICE_UNAVAILABLE with Python context instead of a 
generic RPC deadline error.
+    static constexpr std::chrono::milliseconds PROCESS_POOL_INIT_TIMEOUT 
{20000};
+    static constexpr std::chrono::milliseconds PROCESS_REPAIR_WAIT_TIMEOUT 
{1000};
+#endif
+    static int _repair_process_pool(const PythonVersion& version,
+                                    const 
std::shared_ptr<VersionedProcessPool>& versioned_pool);
+
     /**
      * Read resident set size (RSS) for a single process from /proc/{pid}/statm
      */
@@ -114,11 +153,14 @@ private:
      */
     void _refresh_memory_stats();
 
-    std::shared_ptr<VersionedProcessPool> _get_or_create_process_pool(const 
PythonVersion& version);
+    Result<std::shared_ptr<VersionedProcessPool>> _get_or_create_process_pool(
+            const PythonVersion& version);
     std::vector<std::pair<PythonVersion, 
std::shared_ptr<VersionedProcessPool>>>
     _snapshot_process_pools();
     Status _broadcast_action_to_processes(const std::string& action_type, 
const std::string& body,
                                           const std::string& log_name);
+    static bool _select_alive_process_from_pool(const std::vector<ProcessPtr>& 
pool,
+                                                ProcessPtr* process);
 
     std::unordered_map<PythonVersion, std::shared_ptr<VersionedProcessPool>> 
_process_pools;
     // Protects the version -> pool handle map only. Per-version process 
operations are guarded
diff --git a/be/src/udf/python/python_udf_runtime.cpp 
b/be/src/udf/python/python_udf_runtime.cpp
index 9d687c43f40..aa87360cbc3 100644
--- a/be/src/udf/python/python_udf_runtime.cpp
+++ b/be/src/udf/python/python_udf_runtime.cpp
@@ -18,15 +18,177 @@
 #include "udf/python/python_udf_runtime.h"
 
 #include <butil/fd_utility.h>
+#include <signal.h>
+#include <string.h>
 #include <sys/wait.h>
 #include <unistd.h>
 
+#include <algorithm>
 #include <boost/process.hpp>
+#include <cerrno>
+#include <chrono>
+#include <condition_variable>
+#include <deque>
+#include <mutex>
+#include <thread>
+#ifdef BE_TEST
+#include <atomic>
+#endif
 
 #include "common/logging.h"
+#include "runtime/thread_context.h"
 
 namespace doris {
 
+#ifdef BE_TEST
+static constexpr std::chrono::milliseconds PROCESS_TERMINATE_TIMEOUT {100};
+#else
+static constexpr std::chrono::milliseconds PROCESS_TERMINATE_TIMEOUT {1000};
+#endif
+static constexpr std::chrono::milliseconds BACKGROUND_REAP_INTERVAL {1000};
+
+#ifdef BE_TEST
+static std::atomic<int> FORCED_CHILD_EXIT_TIMEOUTS {0};
+
+static bool consume_forced_child_exit_timeout() {
+    int remaining = FORCED_CHILD_EXIT_TIMEOUTS.load(std::memory_order_relaxed);
+    while (remaining > 0) {
+        if (FORCED_CHILD_EXIT_TIMEOUTS.compare_exchange_weak(remaining, 
remaining - 1,
+                                                             
std::memory_order_relaxed)) {
+            return true;
+        }
+    }
+    return false;
+}
+#endif
+
+struct BackgroundChildReaper {
+    std::mutex mutex;
+    std::condition_variable cv;
+    std::deque<pid_t> pids;
+#ifdef BE_TEST
+    std::deque<pid_t> reaped_pids;
+#endif
+    std::thread thread;
+};
+
+static BackgroundChildReaper& background_child_reaper() {
+    static auto* reaper = new BackgroundChildReaper();
+    return *reaper;
+}
+
+void PythonUDFProcess::enqueue_child_for_reap(pid_t pid) {
+    if (pid <= 0) [[unlikely]] {
+        return;
+    }
+
+    auto& reaper = background_child_reaper();
+    {
+        std::lock_guard<std::mutex> lock(reaper.mutex);
+        if (std::find(reaper.pids.begin(), reaper.pids.end(), pid) != 
reaper.pids.end()) {
+            return;
+        }
+        reaper.pids.push_back(pid);
+        if (!reaper.thread.joinable()) {
+            // This thread only owns pids that were already SIGKILLed but 
could not be reaped within
+            // the bounded shutdown wait. Such processes may be stuck in 
uninterruptible I/O; if they
+            // exit later and nobody calls waitpid(), they stay as zombies 
under BE. Keep reaping them
+            // asynchronously so foreground shutdown remains bounded without 
dropping wait ownership.
+            reaper.thread = std::thread([]() {
+                SCOPED_INIT_THREAD_CONTEXT();
+                std::deque<pid_t> pending_pids;
+                while (true) {
+                    auto& reaper_ref = background_child_reaper();
+                    std::unique_lock<std::mutex> lock(reaper_ref.mutex);
+                    if (pending_pids.empty()) {
+                        reaper_ref.cv.wait(lock,
+                                           [&reaper_ref]() { return 
!reaper_ref.pids.empty(); });
+                    } else {
+                        reaper_ref.cv.wait_for(lock, BACKGROUND_REAP_INTERVAL);
+                    }
+                    pending_pids.insert(pending_pids.end(), 
reaper_ref.pids.begin(),
+                                        reaper_ref.pids.end());
+                    reaper_ref.pids.clear();
+                    std::deque<pid_t> pids;
+                    pids.swap(pending_pids);
+                    lock.unlock();
+
+                    for (pid_t pending_pid : pids) {
+                        int exit_status = 0;
+                        auto wait_result = PythonUDFProcess::wait_child_exit(
+                                pending_pid, std::chrono::milliseconds(0), 
&exit_status);
+                        if (wait_result == ChildExitWaitResult::EXITED ||
+                            wait_result == 
ChildExitWaitResult::ALREADY_REAPED) {
+                            LOG(INFO) << "Background reaped Python process 
pid=" << pending_pid;
+#ifdef BE_TEST
+                            {
+                                std::lock_guard<std::mutex> 
reaped_lock(reaper_ref.mutex);
+                                reaper_ref.reaped_pids.push_back(pending_pid);
+                            }
+                            reaper_ref.cv.notify_all();
+#endif
+                        } else if (wait_result == 
ChildExitWaitResult::TIMEOUT) {
+                            pending_pids.push_back(pending_pid);
+                        } else {
+                            LOG(WARNING) << "Background failed to reap Python 
process pid="
+                                         << pending_pid;
+                        }
+                    }
+                }
+            });
+        }
+    }
+    reaper.cv.notify_one();
+}
+
+#ifdef BE_TEST
+bool PythonUDFProcess::wait_background_reaped_for_test(pid_t pid,
+                                                       
std::chrono::milliseconds timeout) {
+    auto& reaper = background_child_reaper();
+    std::unique_lock<std::mutex> lock(reaper.mutex);
+    return reaper.cv.wait_for(lock, timeout, [&reaper, pid]() {
+        return std::find(reaper.reaped_pids.begin(), reaper.reaped_pids.end(), 
pid) !=
+               reaper.reaped_pids.end();
+    });
+}
+
+void PythonUDFProcess::force_child_exit_timeouts_for_test(int count) {
+    FORCED_CHILD_EXIT_TIMEOUTS.store(count, std::memory_order_relaxed);
+}
+#endif
+
+PythonUDFProcess::ChildExitWaitResult PythonUDFProcess::wait_child_exit(
+        pid_t pid, std::chrono::milliseconds timeout, int* exit_status) {
+#ifdef BE_TEST
+    if (consume_forced_child_exit_timeout()) {
+        return ChildExitWaitResult::TIMEOUT;
+    }
+#endif
+    const auto deadline = std::chrono::steady_clock::now() + timeout;
+    while (true) {
+        pid_t ret = waitpid(pid, exit_status, WNOHANG);
+        if (ret == pid) {
+            return ChildExitWaitResult::EXITED;
+        }
+        if (ret < 0) {
+            if (errno == EINTR) {
+                // retry if interrupted
+                continue;
+            }
+            // Another owner may already have observed the child exit through 
boost::process.
+            if (errno == ECHILD) {
+                return ChildExitWaitResult::ALREADY_REAPED;
+            }
+            LOG(WARNING) << "Failed to wait Python process pid=" << pid << ": 
" << strerror(errno);
+            return ChildExitWaitResult::ERROR;
+        }
+        if (std::chrono::steady_clock::now() >= deadline) {
+            return ChildExitWaitResult::TIMEOUT;
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+}
+
 void PythonUDFProcess::remove_unix_socket() {
     if (_uri.empty() || _unix_socket_file_path.empty()) return;
 
@@ -47,29 +209,51 @@ void PythonUDFProcess::remove_unix_socket() {
 void PythonUDFProcess::shutdown() {
     if (!_child.valid() || _is_shutdown) return;
 
-    _child.terminate();
-    bool graceful = false;
-    constexpr std::chrono::milliseconds retry_interval(100); // 100ms
-
-    for (int i = 0; i < TERMINATE_RETRY_TIMES; ++i) {
-        if (!_child.running()) {
-            graceful = true;
-            break;
-        }
-        std::this_thread::sleep_for(retry_interval);
+    int exit_status = 0;
+    bool exited = !_child.running();
+    bool status_available = false;
+    bool already_reaped = false;
+    if (!exited) {
+        ::kill(_child_pid, SIGTERM);
+        auto wait_result = wait_child_exit(_child_pid, 
PROCESS_TERMINATE_TIMEOUT, &exit_status);
+        exited = wait_result == ChildExitWaitResult::EXITED ||
+                 wait_result == ChildExitWaitResult::ALREADY_REAPED;
+        status_available = wait_result == ChildExitWaitResult::EXITED;
+        already_reaped = wait_result == ChildExitWaitResult::ALREADY_REAPED;
+    } else {
+        auto wait_result = wait_child_exit(_child_pid, 
std::chrono::milliseconds(0), &exit_status);
+        status_available = wait_result == ChildExitWaitResult::EXITED;
+        already_reaped = wait_result == ChildExitWaitResult::ALREADY_REAPED;
     }
 
-    if (!graceful) {
-        LOG(WARNING) << "Python process did not terminate gracefully, sending 
SIGKILL";
+    if (!exited) {
+        LOG(WARNING) << "Python process did not terminate gracefully, sending 
SIGKILL, pid="
+                     << _child_pid;
         ::kill(_child_pid, SIGKILL);
-        _child.wait();
+        auto wait_result = wait_child_exit(_child_pid, 
PROCESS_TERMINATE_TIMEOUT, &exit_status);
+        exited = wait_result == ChildExitWaitResult::EXITED ||
+                 wait_result == ChildExitWaitResult::ALREADY_REAPED;
+        status_available = wait_result == ChildExitWaitResult::EXITED;
+        already_reaped = wait_result == ChildExitWaitResult::ALREADY_REAPED;
     }
+    _child.detach();
 
-    if (int exit_code = _child.exit_code(); exit_code > 128 && exit_code <= 
255) {
-        int signal = exit_code - 128;
-        LOG(INFO) << "Python process was killed by signal " << signal;
+    if (!exited) [[unlikely]] {
+        LOG(WARNING) << "Python process did not exit after SIGKILL, enqueue 
background reap, pid="
+                     << _child_pid;
+        enqueue_child_for_reap(_child_pid);
+    } else if (already_reaped) {
+        LOG(INFO) << "Python process already reaped by another owner, pid=" << 
_child_pid;
+    } else if (!status_available) {
+        LOG(INFO) << "Python process exited but exit status is unavailable, 
pid=" << _child_pid;
     } else {
-        LOG(INFO) << "Python process exited normally with code: " << exit_code;
+        if (WIFSIGNALED(exit_status)) {
+            LOG(INFO) << "Python process was killed by signal " << 
WTERMSIG(exit_status);
+        } else if (WIFEXITED(exit_status)) {
+            LOG(INFO) << "Python process exited normally with code: " << 
WEXITSTATUS(exit_status);
+        } else {
+            LOG(INFO) << "Python process exited";
+        }
     }
 
     _output_stream.close();
@@ -84,4 +268,4 @@ std::string PythonUDFProcess::to_string() const {
             _child_pid, _uri, _unix_socket_file_path, _is_shutdown);
 }
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/udf/python/python_udf_runtime.h 
b/be/src/udf/python/python_udf_runtime.h
index 269b6d1a98e..2d635dc11a1 100644
--- a/be/src/udf/python/python_udf_runtime.h
+++ b/be/src/udf/python/python_udf_runtime.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <boost/process.hpp>
+#include <chrono>
 
 #include "udf/python/python_env.h"
 
@@ -75,6 +76,15 @@ public:
 
     void shutdown();
 
+    enum class ChildExitWaitResult { EXITED, ALREADY_REAPED, TIMEOUT, ERROR };
+
+    static ChildExitWaitResult wait_child_exit(pid_t pid, 
std::chrono::milliseconds timeout,
+                                               int* exit_status);
+
+    // Hand off a killed child that could not be reaped synchronously. The 
background reaper keeps
+    // waitpid ownership so a later child exit will not become a zombie under 
BE.
+    static void enqueue_child_for_reap(pid_t pid);
+
     std::string to_string() const;
 
     pid_t get_child_pid() const { return _child_pid; }
@@ -85,10 +95,13 @@ public:
 
 #ifdef BE_TEST
     void set_uri_for_test(std::string uri) { _uri = std::move(uri); }
+
+    static bool wait_background_reaped_for_test(pid_t pid, 
std::chrono::milliseconds timeout);
+
+    static void force_child_exit_timeouts_for_test(int count);
 #endif
 
 private:
-    constexpr static int TERMINATE_RETRY_TIMES = 10;
     constexpr static size_t MAX_ACCUMULATED_LOG_SIZE = 65536;
 
     bool _is_shutdown {false};
diff --git a/be/test/udf/python/python_server_test.cpp 
b/be/test/udf/python/python_server_test.cpp
index f21e52e0730..9091d04971f 100644
--- a/be/test/udf/python/python_server_test.cpp
+++ b/be/test/udf/python/python_server_test.cpp
@@ -22,8 +22,10 @@
 #include <sys/un.h>
 
 #include <boost/process.hpp>
+#include <chrono>
 #include <filesystem>
 #include <fstream>
+#include <future>
 #include <string>
 #include <vector>
 
@@ -127,6 +129,55 @@ protected:
         return python_path;
     }
 
+    std::string create_fake_python_without_socket_creation(const std::string& 
binary_name,
+                                                           const std::string& 
version) {
+        std::string bin_dir = test_dir_ + "/bin";
+        std::string python_path = bin_dir + "/" + binary_name;
+        fs::create_directories(bin_dir);
+
+        std::ofstream ofs(python_path);
+        ofs << "#!/bin/bash\n";
+        ofs << "if [ \"$1\" = \"--version\" ]; then\n";
+        ofs << "    echo 'Python " << version << "'\n";
+        ofs << "    exit 0\n";
+        ofs << "fi\n";
+        ofs << "trap '' TERM\n";
+        ofs << "while true; do sleep 1; done\n";
+        ofs.close();
+        fs::permissions(python_path, fs::perms::owner_all);
+
+        return python_path;
+    }
+
+    std::string create_fake_python_with_one_stuck_and_others_socket(const 
std::string& binary_name,
+                                                                    const 
std::string& version) {
+        std::string bin_dir = test_dir_ + "/bin";
+        std::string python_path = bin_dir + "/" + binary_name;
+        std::string first_start_dir = test_dir_ + "/first_python_start";
+        fs::create_directories(bin_dir);
+
+        std::ofstream ofs(python_path);
+        ofs << "#!/bin/bash\n";
+        ofs << "if [ \"$1\" = \"--version\" ]; then\n";
+        ofs << "    echo 'Python " << version << "'\n";
+        ofs << "    exit 0\n";
+        ofs << "fi\n";
+        ofs << "if mkdir \"" << first_start_dir << "\" 2>/dev/null; then\n";
+        ofs << "    trap '' TERM\n";
+        ofs << "    while true; do sleep 1; done\n";
+        ofs << "fi\n";
+        ofs << "SOCKET_PREFIX=\"$3\"\n";
+        ofs << "SOCKET_BASE=\"${SOCKET_PREFIX#grpc+unix://}\"\n";
+        ofs << "SOCKET_FILE=\"${SOCKET_BASE}_$$.sock\"\n";
+        ofs << "touch \"$SOCKET_FILE\"\n";
+        ofs << "trap 'rm -f \"$SOCKET_FILE\"; exit 0' TERM INT\n";
+        ofs << "while true; do sleep 1; done\n";
+        ofs.close();
+        fs::permissions(python_path, fs::perms::owner_all);
+
+        return python_path;
+    }
+
     // Set DORIS_HOME and create flight server script directory
     void setup_doris_home() {
         setenv("DORIS_HOME", test_dir_.c_str(), 1);
@@ -162,10 +213,14 @@ TEST_F(PythonServerTest, SingletonReturnsSameInstance) {
 // PythonServerManager::_get_process() - process retrieval test
 // ============================================================================
 
-TEST_F(PythonServerTest, GetProcessFromEmptyPoolReturnsError) {
+TEST_F(PythonServerTest, EnsurePoolInitializedCanInitializeEmptyPoolForTest) {
     PythonServerManager mgr;
 
-    PythonVersion version("3.9.16", "/fake/path", "/fake/python");
+    setup_doris_home();
+    std::string python_path = 
create_fake_python_with_socket_creation("3.9.16");
+    PythonVersion version("3.9.16", test_dir_, python_path);
+    config::max_python_process_num = 1;
+
     mgr.set_process_pool_for_test(version, {});
     auto pool_result = mgr._ensure_pool_initialized(version);
     ASSERT_TRUE(pool_result.has_value()) << pool_result.error().to_string();
@@ -173,10 +228,11 @@ TEST_F(PythonServerTest, 
GetProcessFromEmptyPoolReturnsError) {
     ProcessPtr process;
     Status status = mgr._get_process(version, pool_result.value(), &process);
 
-    // Verify: empty pool should return an error before touching process slots.
-    EXPECT_FALSE(status.ok());
-    EXPECT_TRUE(status.to_string().find("pool is empty") != std::string::npos);
-    EXPECT_EQ(process, nullptr);
+    EXPECT_TRUE(status.ok()) << status.to_string();
+    ASSERT_NE(process, nullptr);
+    EXPECT_TRUE(process->is_alive());
+
+    mgr.shutdown();
 }
 
 // ============================================================================
@@ -260,12 +316,61 @@ TEST_F(PythonServerTest, 
ForkWithProcessThatExitsImmediatelyReturnsError) {
                 err_msg.find("start") != std::string::npos);
 }
 
+TEST_F(PythonServerTest, 
ForkWithoutSocketCreationReturnsAfterBoundedTerminate) {
+    setup_doris_home();
+    std::string python_path =
+            
create_fake_python_without_socket_creation("python3.no_socket_direct", 
"3.9.16");
+
+    PythonServerManager mgr;
+    PythonVersion version("3.9.16", test_dir_, python_path);
+
+    auto start = std::chrono::steady_clock::now();
+    ProcessPtr process;
+    Status status = mgr.fork(version, &process);
+    auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
+            std::chrono::steady_clock::now() - start);
+
+    EXPECT_FALSE(status.ok());
+    EXPECT_EQ(process, nullptr);
+    EXPECT_NE(status.to_string().find("socket file not found"), 
std::string::npos);
+    EXPECT_LT(elapsed.count(), 2000);
+}
+
+TEST_F(PythonServerTest, 
ForkEnqueuesBackgroundReapWhenKilledStartFailureIsNotReaped) {
+    setup_doris_home();
+    std::string python_path =
+            
create_fake_python_without_socket_creation("python3.no_socket_reap", "3.9.16");
+
+    PythonServerManager mgr;
+    PythonVersion version("3.9.16", test_dir_, python_path);
+
+    // SIGKILL not becoming reapable inside the bounded wait depends on kernel 
state. Force only the
+    // wait results so this test covers PythonServerManager::fork() handing 
waitpid ownership to the
+    // shared background reaper instead of detaching and losing the pid.
+    PythonUDFProcess::force_child_exit_timeouts_for_test(2);
+    ProcessPtr process;
+    Status status = mgr.fork(version, &process);
+    PythonUDFProcess::force_child_exit_timeouts_for_test(0);
+
+    EXPECT_FALSE(status.ok());
+    EXPECT_EQ(process, nullptr);
+    EXPECT_NE(status.to_string().find("process did not exit after SIGKILL"), 
std::string::npos);
+
+    std::string status_text = status.to_string();
+    size_t pid_pos = status_text.find("pid=");
+    ASSERT_NE(pid_pos, std::string::npos) << status_text;
+    pid_t child_pid = static_cast<pid_t>(std::stol(status_text.substr(pid_pos 
+ 4)));
+    EXPECT_TRUE(PythonUDFProcess::wait_background_reaped_for_test(child_pid,
+                                                                  
std::chrono::milliseconds(5000)));
+}
+
 // ============================================================================
 // PythonServerManager::_ensure_pool_initialized() - pool initialization test
 // ============================================================================
 
 TEST_F(PythonServerTest, EnsurePoolInitializedWithInvalidVersionFails) {
     PythonServerManager mgr;
+    config::max_python_process_num = 1;
 
     PythonVersion invalid_version("3.99.99", "/non/existent/path", 
"/non/existent/python");
 
@@ -273,9 +378,39 @@ TEST_F(PythonServerTest, 
EnsurePoolInitializedWithInvalidVersionFails) {
 
     // Verify: invalid version should cause initialization to fail
     EXPECT_FALSE(result.has_value());
-    // Error message should indicate all process creations failed
+    // Error message should indicate process creation failure or bounded 
initialization timeout.
     EXPECT_TRUE(result.error().to_string().find("Failed") != std::string::npos 
||
-                result.error().to_string().find("failed") != 
std::string::npos);
+                result.error().to_string().find("failed") != std::string::npos 
||
+                result.error().to_string().find("Timed out") != 
std::string::npos);
+}
+
+TEST_F(PythonServerTest, 
EnsurePoolInitializedReturnsImmediatelyWhenAllWorkersFail) {
+    PythonServerManager mgr;
+    config::max_python_process_num = 2;
+
+    PythonVersion invalid_version("3.9.16", test_dir_, test_dir_ + 
"/missing_python");
+
+    auto start = std::chrono::steady_clock::now();
+    auto result = mgr._ensure_pool_initialized(invalid_version);
+    auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
+            std::chrono::steady_clock::now() - start);
+
+    EXPECT_FALSE(result.has_value());
+    EXPECT_LT(elapsed.count(), 500);
+}
+
+TEST_F(PythonServerTest, 
EnsurePoolInitializedAfterShutdownReturnsServiceUnavailable) {
+    PythonServerManager mgr;
+    mgr.shutdown();
+
+    setup_doris_home();
+    std::string python_path = 
create_fake_python_with_socket_creation("3.9.16");
+    PythonVersion version("3.9.16", test_dir_, python_path);
+
+    auto result = mgr._ensure_pool_initialized(version);
+
+    EXPECT_FALSE(result.has_value());
+    EXPECT_NE(result.error().to_string().find("shutting down"), 
std::string::npos);
 }
 
 // ============================================================================
@@ -302,6 +437,7 @@ TEST_F(PythonServerTest, 
ShutdownCalledMultipleTimesDoesNotCrash) {
 
 TEST_F(PythonServerTest, ShutdownAfterFailedInitializationDoesNotCrash) {
     PythonServerManager mgr;
+    config::max_python_process_num = 1;
 
     // Try initialization first (expected to fail)
     PythonVersion invalid_version("3.99.99", "/bad/path", "/bad/python");
@@ -312,6 +448,27 @@ TEST_F(PythonServerTest, 
ShutdownAfterFailedInitializationDoesNotCrash) {
     EXPECT_NO_THROW(mgr.shutdown());
 }
 
+TEST_F(PythonServerTest, GetProcessFromStoppedPoolReturnsUnavailable) {
+    setup_doris_home();
+    std::string python_path = 
create_fake_python_with_socket_creation("3.9.16");
+
+    config::max_python_process_num = 1;
+
+    PythonServerManager mgr;
+    PythonVersion version("3.9.16", test_dir_, python_path);
+    auto pool_result = mgr._ensure_pool_initialized(version);
+    ASSERT_TRUE(pool_result.has_value()) << pool_result.error().to_string();
+
+    mgr.shutdown();
+
+    ProcessPtr process;
+    Status status = mgr._get_process(version, pool_result.value(), &process);
+
+    EXPECT_FALSE(status.ok());
+    EXPECT_EQ(process, nullptr);
+    EXPECT_NE(status.to_string().find("stopped"), std::string::npos);
+}
+
 TEST_F(PythonServerTest, ClearUdafStateCacheWithoutProcessesIsNoOp) {
     PythonServerManager mgr;
 
@@ -351,6 +508,7 @@ TEST_F(PythonServerTest, 
BroadcastActionWithInvalidProcessUriReturnsError) {
 
 TEST_F(PythonServerTest, GetClientWithInvalidVersionFails) {
     PythonServerManager mgr;
+    config::max_python_process_num = 1;
 
     PythonVersion invalid_version("3.9.16", "/invalid/path", 
"/invalid/python");
     PythonUDFMeta meta;
@@ -447,7 +605,7 @@ TEST_F(PythonServerTest, EnsurePoolInitializedSuccess) {
 TEST_F(PythonServerTest, 
EnsurePoolInitializedLogsProgressWhileWaitingForSlowProcess) {
     setup_doris_home();
     std::string python_path =
-            
create_fake_python_with_delay_and_socket_creation("python3.delayed", "3.9.16", 
200);
+            
create_fake_python_with_delay_and_socket_creation("python3.delayed", "3.9.16", 
50);
 
     config::max_python_process_num = 1;
 
@@ -461,6 +619,67 @@ TEST_F(PythonServerTest, 
EnsurePoolInitializedLogsProgressWhileWaitingForSlowPro
     mgr.shutdown();
 }
 
+TEST_F(PythonServerTest, 
EnsurePoolInitializedRetriesAfterInitFailureWithBoundedWait) {
+    setup_doris_home();
+    std::string python_path =
+            create_fake_python_without_socket_creation("python3.no_socket", 
"3.9.16");
+
+    config::max_python_process_num = 1;
+
+    PythonServerManager mgr;
+    PythonVersion version("3.9.16", test_dir_, python_path);
+
+    auto start = std::chrono::steady_clock::now();
+    auto result = mgr._ensure_pool_initialized(version);
+    auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
+            std::chrono::steady_clock::now() - start);
+
+    EXPECT_FALSE(result.has_value());
+    EXPECT_LT(elapsed.count(), 2000);
+
+    start = std::chrono::steady_clock::now();
+    auto retry_result = mgr._ensure_pool_initialized(version);
+    elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
+            std::chrono::steady_clock::now() - start);
+
+    EXPECT_FALSE(retry_result.has_value());
+    EXPECT_LT(elapsed.count(), 2000);
+
+    mgr.shutdown();
+}
+
+TEST_F(PythonServerTest, 
EnsurePoolInitializedSucceedsWithOneStuckWorkerAndOneUsableWorker) {
+    setup_doris_home();
+    std::string python_path =
+            
create_fake_python_with_one_stuck_and_others_socket("python3.mixed", "3.9.16");
+
+    config::max_python_process_num = 2;
+
+    PythonServerManager mgr;
+    PythonVersion version("3.9.16", test_dir_, python_path);
+
+    auto start = std::chrono::steady_clock::now();
+    auto result = mgr._ensure_pool_initialized(version);
+    auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
+            std::chrono::steady_clock::now() - start);
+
+    ASSERT_TRUE(result.has_value()) << result.error().to_string();
+    EXPECT_LT(elapsed.count(), 2000);
+    EXPECT_TRUE(mgr.process_pool_is_initializing_for_test(version));
+
+    ProcessPtr process;
+    EXPECT_TRUE(mgr._get_process(version, result.value(), &process).ok());
+    ASSERT_NE(process, nullptr);
+    EXPECT_TRUE(process->is_alive());
+
+    for (int i = 0; i < 20 && 
!mgr.process_pool_is_initialized_for_test(version); ++i) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+    }
+    EXPECT_TRUE(mgr.process_pool_is_initialized_for_test(version));
+
+    mgr.shutdown();
+}
+
 TEST_F(PythonServerTest, EnsurePoolInitializedIdempotent) {
     setup_doris_home();
     std::string python_path = 
create_fake_python_with_socket_creation("3.9.16");
@@ -541,6 +760,8 @@ TEST_F(PythonServerTest, 
GetProcessSkipsDeadProcessWhenAliveProcessExists) {
     setup_doris_home();
     std::string python_path = 
create_fake_python_with_socket_creation("3.9.16");
 
+    config::max_python_process_num = 3;
+
     PythonServerManager mgr;
     PythonVersion version("3.9.16", test_dir_, python_path);
 
@@ -565,8 +786,10 @@ TEST_F(PythonServerTest, 
GetProcessSkipsDeadProcessWhenAliveProcessExists) {
 
     EXPECT_TRUE(status.ok()) << status.to_string();
     EXPECT_EQ(selected, alive_process);
-    EXPECT_FALSE(mgr.process_pool_for_test(version)[1]->is_alive());
-    EXPECT_EQ(mgr.process_pool_for_test(version)[1]->get_child_pid(), 
dead_pid);
+    auto pool_snapshot = mgr.process_pool_snapshot_for_test(version);
+    ASSERT_EQ(pool_snapshot.size(), 2);
+    EXPECT_FALSE(pool_snapshot[1]->is_alive());
+    EXPECT_EQ(pool_snapshot[1]->get_child_pid(), dead_pid);
 
     mgr.shutdown();
 }
@@ -685,9 +908,9 @@ TEST_F(PythonServerTest, 
EnsurePoolInitializedForDifferentVersionsDoesNotShareVe
     config::max_python_process_num = 1;
 
     std::string python39_path =
-            create_fake_python_with_delay_and_socket_creation("python3.9", 
"3.9.16", 1200);
+            create_fake_python_with_delay_and_socket_creation("python3.9", 
"3.9.16", 50);
     std::string python310_path =
-            create_fake_python_with_delay_and_socket_creation("python3.10", 
"3.10.0", 1200);
+            create_fake_python_with_delay_and_socket_creation("python3.10", 
"3.10.0", 50);
 
     PythonServerManager mgr;
     PythonVersion version39("3.9.16", test_dir_, python39_path);
@@ -706,9 +929,9 @@ TEST_F(PythonServerTest, 
EnsurePoolInitializedForDifferentVersionsDoesNotShareVe
 
     EXPECT_TRUE(result39.has_value()) << result39.error().to_string();
     EXPECT_TRUE(result310.has_value()) << result310.error().to_string();
-    // If both versions still contended on one manager-wide lock, the elapsed 
time would
-    // be close to two serialized 1.2s startups instead of a single startup 
window.
-    EXPECT_LT(elapsed.count(), 2200);
+    // Keep the assertion loose for ASAN/CI scheduling while still catching 
full init-timeout
+    // serialization between versions.
+    EXPECT_LT(elapsed.count(), 2000);
 
     mgr.shutdown();
 }
@@ -721,6 +944,8 @@ TEST_F(PythonServerTest, 
CheckAndRecreateProcessesRecreatesDeadProcess) {
     setup_doris_home();
     std::string python_path = 
create_fake_python_with_socket_creation("3.9.16");
 
+    config::max_python_process_num = 3;
+
     PythonServerManager mgr;
     PythonVersion version("3.9.16", test_dir_, python_path);
 
@@ -740,22 +965,94 @@ TEST_F(PythonServerTest, 
CheckAndRecreateProcessesRecreatesDeadProcess) {
 
     mgr.check_and_recreate_processes_for_test();
 
-    ASSERT_EQ(mgr.process_pool_for_test(version).size(), 3);
-    EXPECT_EQ(mgr.process_pool_for_test(version)[0], alive_process);
-    EXPECT_EQ(mgr.process_pool_for_test(version)[2], nullptr);
+    auto pool_snapshot = mgr.process_pool_snapshot_for_test(version);
+    ASSERT_EQ(pool_snapshot.size(), 3);
+    EXPECT_EQ(pool_snapshot[0], alive_process);
 
-    ProcessPtr recreated = mgr.process_pool_for_test(version)[1];
+    ProcessPtr recreated = pool_snapshot[1];
     ASSERT_NE(recreated, nullptr);
     EXPECT_TRUE(recreated->is_alive());
     EXPECT_NE(recreated->get_child_pid(), dead_pid_before);
+    ASSERT_NE(pool_snapshot[2], nullptr);
+    EXPECT_TRUE(pool_snapshot[2]->is_alive());
+
+    mgr.shutdown();
+}
+
+TEST_F(PythonServerTest, CheckAndRecreateProcessesSkipsRepairingPool) {
+    setup_doris_home();
+    std::string python_path = 
create_fake_python_with_socket_creation("3.9.16");
+
+    config::max_python_process_num = 1;
+
+    PythonServerManager mgr;
+    PythonVersion version("3.9.16", test_dir_, python_path);
+
+    ProcessPtr dead_process;
+    ASSERT_TRUE(mgr.fork(version, &dead_process).ok());
+    ASSERT_NE(dead_process, nullptr);
+    pid_t dead_pid = dead_process->get_child_pid();
+    dead_process->shutdown();
+    ASSERT_FALSE(dead_process->is_alive());
+
+    mgr.set_process_pool_for_test(version, {dead_process});
+    auto pool_result = mgr._ensure_pool_initialized(version);
+    ASSERT_TRUE(pool_result.has_value()) << pool_result.error().to_string();
+    {
+        std::lock_guard<std::mutex> lock(pool_result.value()->mutex);
+        pool_result.value()->repairing = true;
+    }
+
+    mgr.check_and_recreate_processes_for_test();
+
+    auto pool_snapshot = mgr.process_pool_snapshot_for_test(version);
+    ASSERT_EQ(pool_snapshot.size(), 1);
+    ASSERT_NE(pool_snapshot[0], nullptr);
+    EXPECT_FALSE(pool_snapshot[0]->is_alive());
+    EXPECT_EQ(pool_snapshot[0]->get_child_pid(), dead_pid);
+    {
+        std::lock_guard<std::mutex> lock(pool_result.value()->mutex);
+        pool_result.value()->repairing = false;
+    }
 
     mgr.shutdown();
 }
 
-TEST_F(PythonServerTest, 
CheckAndRecreateProcessesErasesDeadProcessWhenRecreateFails) {
+TEST_F(PythonServerTest, CheckAndRecreateProcessesSkipsUninitializedPool) {
     setup_doris_home();
     std::string python_path = 
create_fake_python_with_socket_creation("3.9.16");
 
+    config::max_python_process_num = 1;
+
+    PythonServerManager mgr;
+    PythonVersion version("3.9.16", test_dir_, python_path);
+
+    ProcessPtr dead_process;
+    ASSERT_TRUE(mgr.fork(version, &dead_process).ok());
+    ASSERT_NE(dead_process, nullptr);
+    pid_t dead_pid = dead_process->get_child_pid();
+    dead_process->shutdown();
+    ASSERT_FALSE(dead_process->is_alive());
+
+    mgr.set_process_pool_for_test(version, {dead_process}, false);
+
+    mgr.check_and_recreate_processes_for_test();
+
+    auto pool_snapshot = mgr.process_pool_snapshot_for_test(version);
+    ASSERT_EQ(pool_snapshot.size(), 1);
+    ASSERT_NE(pool_snapshot[0], nullptr);
+    EXPECT_FALSE(pool_snapshot[0]->is_alive());
+    EXPECT_EQ(pool_snapshot[0]->get_child_pid(), dead_pid);
+
+    mgr.shutdown();
+}
+
+TEST_F(PythonServerTest, 
CheckAndRecreateProcessesKeepsDeadSlotsWhenRecreateFails) {
+    setup_doris_home();
+    std::string python_path = 
create_fake_python_with_socket_creation("3.9.16");
+
+    config::max_python_process_num = 2;
+
     PythonServerManager mgr;
     PythonVersion live_version("3.9.16", test_dir_, python_path);
 
@@ -776,9 +1073,32 @@ TEST_F(PythonServerTest, 
CheckAndRecreateProcessesErasesDeadProcessWhenRecreateF
 
     mgr.check_and_recreate_processes_for_test();
 
-    EXPECT_TRUE(mgr.process_pool_for_test(invalid_version).empty());
+    auto pool_snapshot = mgr.process_pool_snapshot_for_test(invalid_version);
+    ASSERT_EQ(pool_snapshot.size(), 2);
+    EXPECT_FALSE(pool_snapshot[0]->is_alive());
+    EXPECT_FALSE(pool_snapshot[1]->is_alive());
 
     mgr.shutdown();
 }
 
+TEST_F(PythonServerTest, ReadProcessMemoryCurrentProcessSucceeds) {
+    PythonServerManager mgr;
+    size_t rss_bytes = 0;
+
+    Status status = mgr._read_process_memory(getpid(), &rss_bytes);
+
+    EXPECT_TRUE(status.ok()) << status.to_string();
+    EXPECT_GT(rss_bytes, 0);
+}
+
+TEST_F(PythonServerTest, ReadProcessMemoryInvalidPidFails) {
+    PythonServerManager mgr;
+    size_t rss_bytes = 0;
+
+    Status status = mgr._read_process_memory(-1, &rss_bytes);
+
+    EXPECT_FALSE(status.ok());
+    EXPECT_NE(status.to_string().find("/proc/-1/statm"), std::string::npos);
+}
+
 } // namespace doris
diff --git a/be/test/udf/python/python_udf_runtime_test.cpp 
b/be/test/udf/python/python_udf_runtime_test.cpp
index 99728c0500a..305dc9f5c06 100644
--- a/be/test/udf/python/python_udf_runtime_test.cpp
+++ b/be/test/udf/python/python_udf_runtime_test.cpp
@@ -20,6 +20,7 @@
 #include <gtest/gtest.h>
 #include <sys/socket.h>
 #include <sys/un.h>
+#include <sys/wait.h>
 #include <unistd.h>
 
 #include <boost/process.hpp>
@@ -132,6 +133,73 @@ TEST_F(PythonUDFRuntimeTest, ProcessPtrIsSharedPtr) {
     EXPECT_FALSE(ptr);
 }
 
+TEST_F(PythonUDFRuntimeTest, WaitChildExitReturnsExitedForExitedChild) {
+    bp::ipstream output;
+    bp::child child("/bin/bash", "-c", "exit 7", bp::std_out > output);
+    ASSERT_TRUE(child.valid());
+
+    int exit_status = 0;
+    auto result = PythonUDFProcess::wait_child_exit(child.id(), 
std::chrono::milliseconds(1000),
+                                                    &exit_status);
+    child.detach();
+
+    EXPECT_EQ(result, PythonUDFProcess::ChildExitWaitResult::EXITED);
+    EXPECT_TRUE(WIFEXITED(exit_status));
+    EXPECT_EQ(WEXITSTATUS(exit_status), 7);
+}
+
+TEST_F(PythonUDFRuntimeTest, WaitChildExitReturnsTimeoutForRunningChild) {
+    bp::ipstream output;
+    bp::child child("/bin/sleep", "60", bp::std_out > output);
+    ASSERT_TRUE(child.valid());
+    ASSERT_TRUE(child.running());
+
+    int exit_status = 0;
+    auto result = PythonUDFProcess::wait_child_exit(child.id(), 
std::chrono::milliseconds(20),
+                                                    &exit_status);
+
+    EXPECT_EQ(result, PythonUDFProcess::ChildExitWaitResult::TIMEOUT);
+
+    child.terminate();
+    child.wait();
+}
+
+TEST_F(PythonUDFRuntimeTest, WaitChildExitReturnsAlreadyReapedForReapedChild) {
+    bp::ipstream output;
+    bp::child child("/bin/true", bp::std_out > output);
+    ASSERT_TRUE(child.valid());
+    pid_t child_pid = child.id();
+    child.wait();
+
+    int exit_status = 0;
+    auto result = PythonUDFProcess::wait_child_exit(child_pid, 
std::chrono::milliseconds(0),
+                                                    &exit_status);
+
+    EXPECT_EQ(result, PythonUDFProcess::ChildExitWaitResult::ALREADY_REAPED);
+}
+
+TEST_F(PythonUDFRuntimeTest, BackgroundReaperReapsQueuedChild) {
+    bp::ipstream output;
+    bp::child child("/bin/bash", "-c", "sleep 0.1; exit 0", bp::std_out > 
output);
+    ASSERT_TRUE(child.valid());
+    pid_t child_pid = child.id();
+
+    // Do not try to force the real "SIGKILLed but still not reapable" case in 
UT. That usually
+    // needs kernel-level uninterruptible sleep. The behavior we must 
guarantee is that once such a
+    // pid is handed off, the background reaper keeps waitpid ownership until 
the child exits.
+    child.detach();
+    PythonUDFProcess::enqueue_child_for_reap(child_pid);
+
+    bool reaped = PythonUDFProcess::wait_background_reaped_for_test(
+            child_pid, std::chrono::milliseconds(5000));
+    EXPECT_TRUE(reaped);
+
+    int exit_status = 0;
+    auto result = PythonUDFProcess::wait_child_exit(child_pid, 
std::chrono::milliseconds(0),
+                                                    &exit_status);
+    EXPECT_EQ(result, PythonUDFProcess::ChildExitWaitResult::ALREADY_REAPED);
+}
+
 // Test socket file path generation for various PIDs
 TEST_F(PythonUDFRuntimeTest, SocketPathGenerationEdgeCases) {
     // Minimum PID
@@ -260,6 +328,27 @@ TEST_F(PythonUDFRuntimeTest, ShutdownWithStubbornProcess) {
     EXPECT_FALSE(process.is_alive());
 }
 
+TEST_F(PythonUDFRuntimeTest, 
ShutdownEnqueuesBackgroundReapWhenSigkillWaitTimesOut) {
+    bp::ipstream output;
+    bp::child child("/bin/bash", "-c", "trap '' TERM; exec sleep 60", 
bp::std_out > output);
+    ASSERT_TRUE(child.valid());
+    pid_t child_pid = child.id();
+
+    PythonUDFProcess process(std::move(child), std::move(output));
+    ASSERT_TRUE(process.is_alive());
+
+    // SIGKILL not becoming reapable inside a short bounded wait is rare and 
depends on kernel
+    // state, so force only the wait results here. This covers the shutdown 
handoff contract:
+    // a pid that was killed but not reaped synchronously must be owned by the 
background reaper.
+    PythonUDFProcess::force_child_exit_timeouts_for_test(2);
+    process.shutdown();
+    PythonUDFProcess::force_child_exit_timeouts_for_test(0);
+
+    EXPECT_TRUE(process.is_shutdown());
+    EXPECT_TRUE(PythonUDFProcess::wait_background_reaped_for_test(child_pid,
+                                                                  
std::chrono::milliseconds(5000)));
+}
+
 // ============================================================================
 // PythonUDFProcess remove_unix_socket() tests
 // ============================================================================


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to