This is an automated email from the ASF dual-hosted git repository.

HappenLee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f3c79c01c7 [Fix](pyudf) make Python server pool selection alive-aware 
and version-isolated (#62620)
3f3c79c01c7 is described below

commit 3f3c79c01c7d386122058a33db8bcd1fc3b45c77
Author: linrrarity <[email protected]>
AuthorDate: Thu May 7 21:04:28 2026 +0800

    [Fix](pyudf) make Python server pool selection alive-aware and 
version-isolated (#62620)
    
    `PythonServerManager` does not check if the python process corresponding
    to the version is alive when retrieving the process, which may cause
    errors like:
    ```text
    java.lang.IllegalStateException: PYTHON_UDF_BLOCKED 
suite=python_udf_cross_feature_import_storage 
scenario=python_udf_cross_feature_import_storage.inline_runtime reason=inline 
probe failed. reason=errCode = 2, detailMessage = 
(172.20.49.73)[INTERNAL_ERROR]IOError: Flight stream finish failed with gRPC 
code 14, message: failed to connect to all addresses; last error: UNKNOWN: 
unix:/tmp/doris_python_udf_55799.sock: Connection refused
    ```
    
    After modification, before obtaining the Python process, it will check
    if the process is alive to ensure the availability of this feature.
---
 be/src/udf/python/python_server.cpp                | 153 +++++++++++----
 be/src/udf/python/python_server.h                  |  46 +++--
 be/test/udf/python/python_server_test.cpp          | 212 +++++++++++++++++----
 .../data/pythonudaf_p0/test_pythonudaf_drop.out    |   6 +
 .../data/pythonudf_p0/test_pythonudf_drop.out      |   6 +
 .../data/pythonudtf_p0/test_pythonudtf_drop.out    |   8 +
 .../pythonudaf_p0/test_pythonudaf_drop.groovy      |  39 +++-
 .../suites/pythonudf_p0/test_pythonudf_drop.groovy |  39 +++-
 .../pythonudtf_p0/test_pythonudtf_drop.groovy      |  51 ++++-
 9 files changed, 465 insertions(+), 95 deletions(-)

diff --git a/be/src/udf/python/python_server.cpp 
b/be/src/udf/python/python_server.cpp
index 646e1e79039..228cab8d905 100644
--- a/be/src/udf/python/python_server.cpp
+++ b/be/src/udf/python/python_server.cpp
@@ -27,6 +27,7 @@
 #include <boost/asio.hpp>
 #include <boost/process.hpp>
 #include <fstream>
+#include <future>
 
 #include "arrow/flight/client.h"
 #include "common/config.h"
@@ -37,32 +38,70 @@
 
 namespace doris {
 
-template <typename T>
+std::shared_ptr<PythonServerManager::VersionedProcessPool>
+PythonServerManager::_get_or_create_process_pool(const PythonVersion& version) 
{
+    std::lock_guard<std::mutex> lock(_pools_mutex);
+    auto& pool = _process_pools[version];
+    if (!pool) {
+        pool = std::make_shared<VersionedProcessPool>();
+    }
+    return pool;
+}
+
+std::vector<std::pair<PythonVersion, 
std::shared_ptr<PythonServerManager::VersionedProcessPool>>>
+PythonServerManager::_snapshot_process_pools() {
+    std::lock_guard<std::mutex> lock(_pools_mutex);
+    std::vector<std::pair<PythonVersion, 
std::shared_ptr<VersionedProcessPool>>> snapshot;
+    snapshot.reserve(_process_pools.size());
+    for (const auto& [version, pool] : _process_pools) {
+        snapshot.emplace_back(version, pool);
+    }
+    return snapshot;
+}
+
+#ifdef BE_TEST
+void PythonServerManager::set_process_pool_for_test(const PythonVersion& 
version,
+                                                    std::vector<ProcessPtr> 
processes,
+                                                    bool initialized) {
+    auto versioned_pool = _get_or_create_process_pool(version);
+    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+    versioned_pool->processes = std::move(processes);
+    versioned_pool->initialized = initialized;
+}
+
+std::vector<ProcessPtr>& PythonServerManager::process_pool_for_test(const 
PythonVersion& version) {
+    auto versioned_pool = _get_or_create_process_pool(version);
+    return versioned_pool->processes;
+}
+#endif
+
+template <typename ClientType>
 Status PythonServerManager::get_client(const PythonUDFMeta& func_meta, const 
PythonVersion& version,
-                                       std::shared_ptr<T>* client,
+                                       std::shared_ptr<ClientType>* client,
                                        const std::shared_ptr<arrow::Schema>& 
data_schema) {
-    // Ensure process pool is initialized for this version
-    RETURN_IF_ERROR(ensure_pool_initialized(version));
+    std::shared_ptr<VersionedProcessPool> versioned_pool =
+            DORIS_TRY(_ensure_pool_initialized(version));
 
     ProcessPtr process;
-    RETURN_IF_ERROR(get_process(version, &process));
+    RETURN_IF_ERROR(_get_process(version, versioned_pool, &process));
 
-    if constexpr (std::is_same_v<T, PythonUDAFClient>) {
-        RETURN_IF_ERROR(T::create(func_meta, std::move(process), data_schema, 
client));
+    if constexpr (std::is_same_v<ClientType, PythonUDAFClient>) {
+        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), 
data_schema, client));
     } else {
-        RETURN_IF_ERROR(T::create(func_meta, std::move(process), client));
+        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), 
client));
     }
 
     return Status::OK();
 }
 
-Status PythonServerManager::ensure_pool_initialized(const PythonVersion& 
version) {
-    std::lock_guard<std::mutex> lock(_pools_mutex);
+Result<std::shared_ptr<PythonServerManager::VersionedProcessPool>>
+PythonServerManager::_ensure_pool_initialized(const PythonVersion& version) {
+    auto versioned_pool = _get_or_create_process_pool(version);
+    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
 
     // Check if already initialized
-    if (_initialized_versions.count(version)) return Status::OK();
+    if (versioned_pool->initialized) return versioned_pool;
 
-    std::vector<ProcessPtr>& pool = _process_pools[version];
     // 0 means use CPU core count as default, otherwise use the specified value
     int max_pool_size = config::max_python_process_num > 0 ? 
config::max_python_process_num
                                                            : 
CpuInfo::num_cores();
@@ -91,7 +130,7 @@ Status PythonServerManager::ensure_pool_initialized(const 
PythonVersion& version
     for (int i = 0; i < max_pool_size; i++) {
         Status s = futures[i].get();
         if (s.ok() && temp_processes[i]) {
-            pool.push_back(std::move(temp_processes[i]));
+            
versioned_pool->processes.emplace_back(std::move(temp_processes[i]));
             success_count++;
         } else {
             failure_count++;
@@ -100,38 +139,64 @@ Status PythonServerManager::ensure_pool_initialized(const 
PythonVersion& version
         }
     }
 
-    if (pool.empty()) {
-        return Status::InternalError(
+    if (versioned_pool->processes.empty()) {
+        return ResultError(Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
                 "Failed to initialize Python process pool: all {} process 
creation attempts failed",
-                max_pool_size);
+                max_pool_size));
     }
 
     LOG(INFO) << "Python process pool initialized for version " << 
version.to_string()
               << ": created " << success_count << " processes"
               << (failure_count > 0 ? fmt::format(" ({} failed)", 
failure_count) : "");
 
-    _initialized_versions.insert(version);
+    versioned_pool->initialized = true;
     _start_health_check_thread();
 
-    return Status::OK();
+    return versioned_pool;
 }
 
-Status PythonServerManager::get_process(const PythonVersion& version, 
ProcessPtr* process) {
-    std::lock_guard<std::mutex> lock(_pools_mutex);
-    std::vector<ProcessPtr>& pool = _process_pools[version];
+Status PythonServerManager::_get_process(
+        const PythonVersion& version, const 
std::shared_ptr<VersionedProcessPool>& versioned_pool,
+        ProcessPtr* process) {
+    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+    std::vector<ProcessPtr>& pool = versioned_pool->processes;
 
     if (UNLIKELY(pool.empty())) {
         return Status::InternalError("Python process pool is empty for version 
{}",
                                      version.to_string());
     }
 
-    // Find process with minimum load (use_count - 1 gives active client count)
-    auto min_iter = std::min_element(
-            pool.begin(), pool.end(),
-            [](const ProcessPtr& a, const ProcessPtr& b) { return 
a.use_count() < b.use_count(); });
+    // Prefer an already-alive process and only use load balancing inside that 
alive subset.
+    // keep dead entries stay in the pool for the background health checker
+    // unless there is no alive process left for the current request.
+    auto min_alive_iter = std::min_element(pool.begin(), pool.end(),
+                                           [](const ProcessPtr& a, const 
ProcessPtr& b) {
+                                               const bool a_alive = a && 
a->is_alive();
+                                               const bool b_alive = b && 
b->is_alive();
+                                               if (a_alive != b_alive) {
+                                                   return a_alive > b_alive;
+                                               }
+                                               return a.use_count() < 
b.use_count();
+                                           });
+
+    if (min_alive_iter != pool.end() && *min_alive_iter && 
(*min_alive_iter)->is_alive()) {
+        *process = *min_alive_iter;
+        return Status::OK();
+    }
+
+    // Only reach here when the pool has no alive process at all. Try one 
foreground
+    // recovery so the caller has a chance to proceed; leave batch repair to 
health check.
+    auto& candidate = pool.front();
+    ProcessPtr replacement;
+    Status status = fork(version, &replacement);
+    if (!status.ok()) {
+        return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
+                "Python process pool has no available process for version {}, 
reason: {}",
+                version.to_string(), status.to_string());
+    }
 
-    // Return process with minimum load
-    *process = *min_iter;
+    candidate = std::move(replacement);
+    *process = candidate;
     return Status::OK();
 }
 
@@ -191,6 +256,7 @@ Status PythonServerManager::fork(const PythonVersion& 
version, ProcessPtr* proce
 }
 
 void PythonServerManager::_start_health_check_thread() {
+    std::lock_guard<std::mutex> lock(_health_check_mutex);
     if (_health_check_thread) return;
 
     LOG(INFO) << "Starting Python process health check thread (interval: 30 
seconds)";
@@ -217,13 +283,13 @@ void PythonServerManager::_start_health_check_thread() {
 }
 
 void PythonServerManager::_check_and_recreate_processes() {
-    std::lock_guard<std::mutex> lock(_pools_mutex);
-
     int total_checked = 0;
     int total_dead = 0;
     int total_recreated = 0;
 
-    for (auto& [version, pool] : _process_pools) {
+    for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
+        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+        auto& pool = versioned_pool->processes;
         for (size_t i = 0; i < pool.size(); ++i) {
             auto& process = pool[i];
             if (!process) continue;
@@ -268,15 +334,22 @@ void PythonServerManager::shutdown() {
     }
 
     // Shutdown all processes
-    std::lock_guard<std::mutex> lock(_pools_mutex);
-    for (auto& [version, pool] : _process_pools) {
+    for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
+        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+        auto& pool = versioned_pool->processes;
         for (auto& process : pool) {
             if (process) {
                 process->shutdown();
             }
         }
+        pool.clear();
+        versioned_pool->initialized = false;
+    }
+
+    {
+        std::lock_guard<std::mutex> lock(_pools_mutex);
+        _process_pools.clear();
     }
-    _process_pools.clear();
 }
 
 Status PythonServerManager::_read_process_memory(pid_t pid, size_t* rss_bytes) 
{
@@ -305,11 +378,11 @@ Status PythonServerManager::_read_process_memory(pid_t 
pid, size_t* rss_bytes) {
 }
 
 void PythonServerManager::_refresh_memory_stats() {
-    std::lock_guard<std::mutex> lock(_pools_mutex);
-
     int64_t total_rss = 0;
 
-    for (const auto& [version, pool] : _process_pools) {
+    for (const auto& [version, versioned_pool] : _snapshot_process_pools()) {
+        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+        const auto& pool = versioned_pool->processes;
         for (const auto& process : pool) {
             if (!process || !process->is_alive()) continue;
 
@@ -339,15 +412,15 @@ Status PythonServerManager::clear_module_cache(const 
std::string& location) {
         return Status::InvalidArgument("Empty location for 
clear_module_cache");
     }
 
-    std::lock_guard<std::mutex> lock(_pools_mutex);
-
     std::string body = fmt::format(R"({{"location": "{}"}})", location);
 
     int success_count = 0;
     int fail_count = 0;
     bool has_active_process = false;
 
-    for (auto& [version, pool] : _process_pools) {
+    for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
+        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+        auto& pool = versioned_pool->processes;
         for (auto& process : pool) {
             if (!process || !process->is_alive()) {
                 continue;
@@ -422,4 +495,4 @@ template Status 
PythonServerManager::get_client<PythonUDTFClient>(
         std::shared_ptr<PythonUDTFClient>* client,
         const std::shared_ptr<arrow::Schema>& data_schema);
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/udf/python/python_server.h 
b/be/src/udf/python/python_server.h
index 6427cb7e63c..7aa452740c7 100644
--- a/be/src/udf/python/python_server.h
+++ b/be/src/udf/python/python_server.h
@@ -20,7 +20,10 @@
 #include <atomic>
 #include <condition_variable>
 #include <memory>
+#include <mutex>
 #include <thread>
+#include <unordered_map>
+#include <vector>
 
 #include "common/status.h"
 #include "runtime/memory/mem_tracker.h"
@@ -46,25 +49,41 @@ public:
 
     Status fork(const PythonVersion& version, ProcessPtr* process);
 
-    Status get_process(const PythonVersion& version, ProcessPtr* process);
-
     // Clear Python module cache for a specific UDF location across all 
processes
     Status clear_module_cache(const std::string& location);
 
-    Status ensure_pool_initialized(const PythonVersion& version);
-
     void shutdown();
 
 #ifdef BE_TEST
     // For unit testing only.
     void check_and_recreate_processes_for_test() { 
_check_and_recreate_processes(); }
 
-    std::unordered_map<PythonVersion, std::vector<ProcessPtr>>& 
process_pools_for_test() {
-        return _process_pools;
-    }
+    void set_process_pool_for_test(const PythonVersion& version, 
std::vector<ProcessPtr> processes,
+                                   bool initialized = true);
+
+    std::vector<ProcessPtr>& process_pool_for_test(const PythonVersion& 
version);
 #endif
 
 private:
+    struct VersionedProcessPool {
+        std::mutex mutex;
+        std::vector<ProcessPtr> processes;
+        bool initialized = false;
+    };
+
+    /** 
+     * Lazily initialize and return the process pool for specific Python 
version. 
+     */
+    Result<std::shared_ptr<VersionedProcessPool>> _ensure_pool_initialized(
+            const PythonVersion& version);
+
+    /**
+     * Pick an available process from specific pool, recreating one on demand 
if needed.
+     */
+    Status _get_process(const PythonVersion& version,
+                        const std::shared_ptr<VersionedProcessPool>& 
versioned_pool,
+                        ProcessPtr* process);
+
     /**
      * Start health check background thread (called once by 
ensure_pool_initialized)
      * Thread periodically checks process health and refreshes memory stats
@@ -86,11 +105,14 @@ private:
      */
     void _refresh_memory_stats();
 
-    std::unordered_map<PythonVersion, std::vector<ProcessPtr>> _process_pools;
-    // Protects _process_pools access
+    std::shared_ptr<VersionedProcessPool> _get_or_create_process_pool(const 
PythonVersion& version);
+    std::vector<std::pair<PythonVersion, 
std::shared_ptr<VersionedProcessPool>>>
+    _snapshot_process_pools();
+
+    std::unordered_map<PythonVersion, std::shared_ptr<VersionedProcessPool>> 
_process_pools;
+    // Protects the version -> pool handle map only. Per-version process 
operations are guarded
+    // by VersionedProcessPool::mutex.
     std::mutex _pools_mutex;
-    // Track which versions have been initialized
-    std::unordered_set<PythonVersion> _initialized_versions;
     // Health check background thread
     std::unique_ptr<std::thread> _health_check_thread;
     std::atomic<bool> _shutdown_flag {false};
@@ -99,4 +121,4 @@ private:
     MemTracker _mem_tracker {"PythonUDFProcesses"};
 };
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/test/udf/python/python_server_test.cpp 
b/be/test/udf/python/python_server_test.cpp
index 40e4ab3a11a..557815f7506 100644
--- a/be/test/udf/python/python_server_test.cpp
+++ b/be/test/udf/python/python_server_test.cpp
@@ -23,6 +23,7 @@
 
 #include <filesystem>
 #include <fstream>
+#include <future>
 #include <string>
 
 #include "common/config.h"
@@ -99,6 +100,32 @@ protected:
         return python_path;
     }
 
+    std::string create_fake_python_with_delay_and_socket_creation(const 
std::string& binary_name,
+                                                                  const 
std::string& version,
+                                                                  int 
delay_ms) {
+        std::string bin_dir = test_dir_ + "/bin";
+        std::string python_path = bin_dir + "/" + binary_name;
+        fs::create_directories(bin_dir);
+
+        std::ofstream ofs(python_path);
+        ofs << "#!/bin/bash\n";
+        ofs << "if [ \"$1\" = \"--version\" ]; then\n";
+        ofs << "    echo 'Python " << version << "'\n";
+        ofs << "    exit 0\n";
+        ofs << "fi\n";
+        ofs << "sleep " << (delay_ms / 1000.0) << "\n";
+        ofs << "SOCKET_PREFIX=\"$3\"\n";
+        ofs << "SOCKET_BASE=\"${SOCKET_PREFIX#grpc+unix://}\"\n";
+        ofs << "SOCKET_FILE=\"${SOCKET_BASE}_$$.sock\"\n";
+        ofs << "touch \"$SOCKET_FILE\"\n";
+        ofs << "trap 'rm -f \"$SOCKET_FILE\"; exit 0' TERM INT\n";
+        ofs << "while true; do sleep 1; done\n";
+        ofs.close();
+        fs::permissions(python_path, fs::perms::owner_all);
+
+        return python_path;
+    }
+
     // Set DORIS_HOME and create flight server script directory
     void setup_doris_home() {
         setenv("DORIS_HOME", test_dir_.c_str(), 1);
@@ -124,18 +151,21 @@ TEST_F(PythonServerTest, SingletonReturnsSameInstance) {
 }
 
 // ============================================================================
-// PythonServerManager::get_process() - process retrieval test
+// PythonServerManager::_get_process() - process retrieval test
 // ============================================================================
 
 TEST_F(PythonServerTest, GetProcessFromEmptyPoolReturnsError) {
     PythonServerManager mgr;
 
     PythonVersion version("3.9.16", "/fake/path", "/fake/python");
-    ProcessPtr process;
+    mgr.set_process_pool_for_test(version, {});
+    auto pool_result = mgr._ensure_pool_initialized(version);
+    ASSERT_TRUE(pool_result.has_value()) << pool_result.error().to_string();
 
-    Status status = mgr.get_process(version, &process);
+    ProcessPtr process;
+    Status status = mgr._get_process(version, pool_result.value(), &process);
 
-    // Verify: empty pool should return an error with message containing "pool 
is empty"
+    // Verify: empty pool should return an error before touching process slots.
     EXPECT_FALSE(status.ok());
     EXPECT_TRUE(status.to_string().find("pool is empty") != std::string::npos);
     EXPECT_EQ(process, nullptr);
@@ -223,7 +253,7 @@ TEST_F(PythonServerTest, 
ForkWithProcessThatExitsImmediatelyReturnsError) {
 }
 
 // ============================================================================
-// PythonServerManager::ensure_pool_initialized() - pool initialization test
+// PythonServerManager::_ensure_pool_initialized() - pool initialization test
 // ============================================================================
 
 TEST_F(PythonServerTest, EnsurePoolInitializedWithInvalidVersionFails) {
@@ -231,13 +261,13 @@ TEST_F(PythonServerTest, 
EnsurePoolInitializedWithInvalidVersionFails) {
 
     PythonVersion invalid_version("3.99.99", "/non/existent/path", 
"/non/existent/python");
 
-    Status status = mgr.ensure_pool_initialized(invalid_version);
+    auto result = mgr._ensure_pool_initialized(invalid_version);
 
     // Verify: invalid version should cause initialization to fail
-    EXPECT_FALSE(status.ok());
+    EXPECT_FALSE(result.has_value());
     // Error message should indicate all process creations failed
-    EXPECT_TRUE(status.to_string().find("Failed") != std::string::npos ||
-                status.to_string().find("failed") != std::string::npos);
+    EXPECT_TRUE(result.error().to_string().find("Failed") != std::string::npos 
||
+                result.error().to_string().find("failed") != 
std::string::npos);
 }
 
 // ============================================================================
@@ -267,8 +297,8 @@ TEST_F(PythonServerTest, 
ShutdownAfterFailedInitializationDoesNotCrash) {
 
     // Try initialization first (expected to fail)
     PythonVersion invalid_version("3.99.99", "/bad/path", "/bad/python");
-    Status status = mgr.ensure_pool_initialized(invalid_version);
-    EXPECT_FALSE(status.ok());
+    auto result = mgr._ensure_pool_initialized(invalid_version);
+    EXPECT_FALSE(result.has_value());
 
     // Verify: calling shutdown after failed initialization does not crash
     EXPECT_NO_THROW(mgr.shutdown());
@@ -364,10 +394,10 @@ TEST_F(PythonServerTest, EnsurePoolInitializedSuccess) {
     PythonServerManager mgr;
     PythonVersion version("3.9.16", test_dir_, python_path);
 
-    Status status = mgr.ensure_pool_initialized(version);
+    auto result = mgr._ensure_pool_initialized(version);
 
     // Verify pool initialization succeeded
-    EXPECT_TRUE(status.ok()) << status.to_string();
+    EXPECT_TRUE(result.has_value()) << result.error().to_string();
 
     // Cleanup
     mgr.shutdown();
@@ -383,12 +413,12 @@ TEST_F(PythonServerTest, EnsurePoolInitializedIdempotent) 
{
     PythonVersion version("3.9.16", test_dir_, python_path);
 
     // First initialization
-    Status status1 = mgr.ensure_pool_initialized(version);
-    EXPECT_TRUE(status1.ok()) << status1.to_string();
+    auto result1 = mgr._ensure_pool_initialized(version);
+    EXPECT_TRUE(result1.has_value()) << result1.error().to_string();
 
     // Second initialization should return immediately (version already 
initialized)
-    Status status2 = mgr.ensure_pool_initialized(version);
-    EXPECT_TRUE(status2.ok()) << status2.to_string();
+    auto result2 = mgr._ensure_pool_initialized(version);
+    EXPECT_TRUE(result2.has_value()) << result2.error().to_string();
 
     mgr.shutdown();
 }
@@ -403,12 +433,12 @@ TEST_F(PythonServerTest, GetProcessFromInitializedPool) {
     PythonVersion version("3.9.16", test_dir_, python_path);
 
     // Initialize the pool first
-    Status init_status = mgr.ensure_pool_initialized(version);
-    EXPECT_TRUE(init_status.ok()) << init_status.to_string();
+    auto init_result = mgr._ensure_pool_initialized(version);
+    EXPECT_TRUE(init_result.has_value()) << init_result.error().to_string();
 
     // Get a process
     ProcessPtr process;
-    Status status = mgr.get_process(version, &process);
+    Status status = mgr._get_process(version, init_result.value(), &process);
 
     EXPECT_TRUE(status.ok()) << status.to_string();
     EXPECT_NE(process, nullptr);
@@ -417,6 +447,72 @@ TEST_F(PythonServerTest, GetProcessFromInitializedPool) {
     mgr.shutdown();
 }
 
+TEST_F(PythonServerTest, GetProcessRecreatesDeadProcessWhenNoAliveProcess) {
+    setup_doris_home();
+    std::string python_path = 
create_fake_python_with_socket_creation("3.9.16");
+
+    config::max_python_process_num = 1;
+
+    PythonServerManager mgr;
+    PythonVersion version("3.9.16", test_dir_, python_path);
+
+    auto pool_result = mgr._ensure_pool_initialized(version);
+    ASSERT_TRUE(pool_result.has_value()) << pool_result.error().to_string();
+
+    ProcessPtr first_process;
+    ASSERT_TRUE(mgr._get_process(version, pool_result.value(), 
&first_process).ok());
+    ASSERT_NE(first_process, nullptr);
+    ASSERT_TRUE(first_process->is_alive());
+    pid_t first_pid = first_process->get_child_pid();
+
+    first_process->shutdown();
+    ASSERT_FALSE(first_process->is_alive());
+
+    ProcessPtr replacement;
+    Status status = mgr._get_process(version, pool_result.value(), 
&replacement);
+
+    EXPECT_TRUE(status.ok()) << status.to_string();
+    ASSERT_NE(replacement, nullptr);
+    EXPECT_TRUE(replacement->is_alive());
+    EXPECT_NE(replacement->get_child_pid(), first_pid);
+
+    mgr.shutdown();
+}
+
+TEST_F(PythonServerTest, GetProcessSkipsDeadProcessWhenAliveProcessExists) {
+    setup_doris_home();
+    std::string python_path = 
create_fake_python_with_socket_creation("3.9.16");
+
+    PythonServerManager mgr;
+    PythonVersion version("3.9.16", test_dir_, python_path);
+
+    ProcessPtr alive_process;
+    ASSERT_TRUE(mgr.fork(version, &alive_process).ok());
+    ASSERT_NE(alive_process, nullptr);
+    ASSERT_TRUE(alive_process->is_alive());
+
+    ProcessPtr dead_process;
+    ASSERT_TRUE(mgr.fork(version, &dead_process).ok());
+    ASSERT_NE(dead_process, nullptr);
+    pid_t dead_pid = dead_process->get_child_pid();
+    dead_process->shutdown();
+    ASSERT_FALSE(dead_process->is_alive());
+
+    mgr.set_process_pool_for_test(version, {alive_process, dead_process});
+    auto pool_result = mgr._ensure_pool_initialized(version);
+    ASSERT_TRUE(pool_result.has_value()) << pool_result.error().to_string();
+
+    ProcessPtr selected;
+    Status status = mgr._get_process(version, pool_result.value(), &selected);
+
+    EXPECT_TRUE(status.ok()) << status.to_string();
+    EXPECT_EQ(selected, alive_process);
+    EXPECT_FALSE(mgr.process_pool_for_test(version)[1]->is_alive());
+    EXPECT_EQ(mgr.process_pool_for_test(version)[1]->get_child_pid(), 
dead_pid);
+
+    mgr.shutdown();
+}
+
 TEST_F(PythonServerTest, GetProcessLoadBalancing) {
     setup_doris_home();
     std::string python_path = 
create_fake_python_with_socket_creation("3.9.16");
@@ -427,15 +523,15 @@ TEST_F(PythonServerTest, GetProcessLoadBalancing) {
     PythonServerManager mgr;
     PythonVersion version("3.9.16", test_dir_, python_path);
 
-    Status init_status = mgr.ensure_pool_initialized(version);
-    EXPECT_TRUE(init_status.ok()) << init_status.to_string();
+    auto init_result = mgr._ensure_pool_initialized(version);
+    EXPECT_TRUE(init_result.has_value()) << init_result.error().to_string();
 
     // Get multiple processes to verify load balancing
     ProcessPtr p1, p2, p3, p4;
-    EXPECT_TRUE(mgr.get_process(version, &p1).ok());
-    EXPECT_TRUE(mgr.get_process(version, &p2).ok());
-    EXPECT_TRUE(mgr.get_process(version, &p3).ok());
-    EXPECT_TRUE(mgr.get_process(version, &p4).ok());
+    EXPECT_TRUE(mgr._get_process(version, init_result.value(), &p1).ok());
+    EXPECT_TRUE(mgr._get_process(version, init_result.value(), &p2).ok());
+    EXPECT_TRUE(mgr._get_process(version, init_result.value(), &p3).ok());
+    EXPECT_TRUE(mgr._get_process(version, init_result.value(), &p4).ok());
 
     // With 2 processes, load balancing distributes requests across different 
processes
     // p1 and p2 may be same or different processes
@@ -455,12 +551,12 @@ TEST_F(PythonServerTest, ShutdownWithRunningProcesses) {
     PythonVersion version("3.9.16", test_dir_, python_path);
 
     // Initialize the pool
-    Status init_status = mgr.ensure_pool_initialized(version);
-    EXPECT_TRUE(init_status.ok()) << init_status.to_string();
+    auto init_result = mgr._ensure_pool_initialized(version);
+    EXPECT_TRUE(init_result.has_value()) << init_result.error().to_string();
 
     // Get a process reference
     ProcessPtr process;
-    EXPECT_TRUE(mgr.get_process(version, &process).ok());
+    EXPECT_TRUE(mgr._get_process(version, init_result.value(), &process).ok());
     EXPECT_TRUE(process->is_alive());
 
     // Shutdown should terminate all processes
@@ -509,13 +605,15 @@ TEST_F(PythonServerTest, MultipleVersionPools) {
     PythonVersion version310("3.10.0", test_dir_, python310_path);
 
     // Initialize pools for two versions
-    EXPECT_TRUE(mgr.ensure_pool_initialized(version39).ok());
-    EXPECT_TRUE(mgr.ensure_pool_initialized(version310).ok());
+    auto pool39_result = mgr._ensure_pool_initialized(version39);
+    auto pool310_result = mgr._ensure_pool_initialized(version310);
+    EXPECT_TRUE(pool39_result.has_value()) << 
pool39_result.error().to_string();
+    EXPECT_TRUE(pool310_result.has_value()) << 
pool310_result.error().to_string();
 
     // Retrieve processes from both pools
     ProcessPtr p39, p310;
-    EXPECT_TRUE(mgr.get_process(version39, &p39).ok());
-    EXPECT_TRUE(mgr.get_process(version310, &p310).ok());
+    EXPECT_TRUE(mgr._get_process(version39, pool39_result.value(), &p39).ok());
+    EXPECT_TRUE(mgr._get_process(version310, pool310_result.value(), 
&p310).ok());
 
     // Verify they are different processes
     EXPECT_NE(p39->get_child_pid(), p310->get_child_pid());
@@ -523,6 +621,40 @@ TEST_F(PythonServerTest, MultipleVersionPools) {
     mgr.shutdown();
 }
 
+TEST_F(PythonServerTest, 
EnsurePoolInitializedForDifferentVersionsDoesNotShareVersionLock) {
+    setup_doris_home();
+
+    config::max_python_process_num = 1;
+
+    std::string python39_path =
+            create_fake_python_with_delay_and_socket_creation("python3.9", 
"3.9.16", 1200);
+    std::string python310_path =
+            create_fake_python_with_delay_and_socket_creation("python3.10", 
"3.10.0", 1200);
+
+    PythonServerManager mgr;
+    PythonVersion version39("3.9.16", test_dir_, python39_path);
+    PythonVersion version310("3.10.0", test_dir_, python310_path);
+
+    auto start = std::chrono::steady_clock::now();
+    auto future39 = std::async(std::launch::async,
+                               [&]() { return 
mgr._ensure_pool_initialized(version39); });
+    auto future310 = std::async(std::launch::async,
+                                [&]() { return 
mgr._ensure_pool_initialized(version310); });
+
+    auto result39 = future39.get();
+    auto result310 = future310.get();
+    auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
+            std::chrono::steady_clock::now() - start);
+
+    EXPECT_TRUE(result39.has_value()) << result39.error().to_string();
+    EXPECT_TRUE(result310.has_value()) << result310.error().to_string();
+    // If both versions still contended on one manager-wide lock, the elapsed 
time would
+    // be close to two serialized 1.2s startups instead of a single startup 
window.
+    EXPECT_LT(elapsed.count(), 2200);
+
+    mgr.shutdown();
+}
+
 // ============================================================================
 // PythonServerManager::_check_and_recreate_processes() - health-check 
recreation test
 // ============================================================================
@@ -546,15 +678,15 @@ TEST_F(PythonServerTest, 
CheckAndRecreateProcessesRecreatesDeadProcess) {
     dead_process->shutdown();
     ASSERT_FALSE(dead_process->is_alive());
 
-    mgr.process_pools_for_test()[version] = {alive_process, dead_process, 
nullptr};
+    mgr.set_process_pool_for_test(version, {alive_process, dead_process, 
nullptr});
 
     mgr.check_and_recreate_processes_for_test();
 
-    ASSERT_EQ(mgr.process_pools_for_test()[version].size(), 3);
-    EXPECT_EQ(mgr.process_pools_for_test()[version][0], alive_process);
-    EXPECT_EQ(mgr.process_pools_for_test()[version][2], nullptr);
+    ASSERT_EQ(mgr.process_pool_for_test(version).size(), 3);
+    EXPECT_EQ(mgr.process_pool_for_test(version)[0], alive_process);
+    EXPECT_EQ(mgr.process_pool_for_test(version)[2], nullptr);
 
-    ProcessPtr recreated = mgr.process_pools_for_test()[version][1];
+    ProcessPtr recreated = mgr.process_pool_for_test(version)[1];
     ASSERT_NE(recreated, nullptr);
     EXPECT_TRUE(recreated->is_alive());
     EXPECT_NE(recreated->get_child_pid(), dead_pid_before);
@@ -582,11 +714,11 @@ TEST_F(PythonServerTest, 
CheckAndRecreateProcessesErasesDeadProcessWhenRecreateF
     ASSERT_FALSE(dead_process_2->is_alive());
 
     PythonVersion invalid_version("3.9.16", test_dir_, test_dir_ + 
"/bin/nonexistent_python");
-    mgr.process_pools_for_test()[invalid_version] = {dead_process_1, 
dead_process_2};
+    mgr.set_process_pool_for_test(invalid_version, {dead_process_1, 
dead_process_2});
 
     mgr.check_and_recreate_processes_for_test();
 
-    EXPECT_TRUE(mgr.process_pools_for_test()[invalid_version].empty());
+    EXPECT_TRUE(mgr.process_pool_for_test(invalid_version).empty());
 
     mgr.shutdown();
 }
diff --git a/regression-test/data/pythonudaf_p0/test_pythonudaf_drop.out 
b/regression-test/data/pythonudaf_p0/test_pythonudaf_drop.out
index 79e35e30ee5..8c1eb081162 100644
--- a/regression-test/data/pythonudaf_p0/test_pythonudaf_drop.out
+++ b/regression-test/data/pythonudaf_p0/test_pythonudaf_drop.out
@@ -8,3 +8,9 @@
 -- !py_udaf_drop_3 --
 6
 
+-- !py_udaf_drop_4 --
+6
+
+-- !py_udaf_drop_5 --
+6
+
diff --git a/regression-test/data/pythonudf_p0/test_pythonudf_drop.out 
b/regression-test/data/pythonudf_p0/test_pythonudf_drop.out
index 254ebe44809..84fa346f641 100644
--- a/regression-test/data/pythonudf_p0/test_pythonudf_drop.out
+++ b/regression-test/data/pythonudf_p0/test_pythonudf_drop.out
@@ -8,3 +8,9 @@
 -- !py_udf_drop_3 --
 8
 
+-- !py_udf_drop_4 --
+32
+
+-- !py_udf_drop_5 --
+33
+
diff --git a/regression-test/data/pythonudtf_p0/test_pythonudtf_drop.out 
b/regression-test/data/pythonudtf_p0/test_pythonudtf_drop.out
index 6f1159a95d1..c86ee5bdc5c 100644
--- a/regression-test/data/pythonudtf_p0/test_pythonudtf_drop.out
+++ b/regression-test/data/pythonudtf_p0/test_pythonudtf_drop.out
@@ -11,3 +11,11 @@
 1
 2
 
+-- !py_udtf_drop_4 --
+1
+2
+
+-- !py_udtf_drop_5 --
+1
+2
+
diff --git a/regression-test/suites/pythonudaf_p0/test_pythonudaf_drop.groovy 
b/regression-test/suites/pythonudaf_p0/test_pythonudaf_drop.groovy
index 964413828ab..4b64921676f 100644
--- a/regression-test/suites/pythonudaf_p0/test_pythonudaf_drop.groovy
+++ b/regression-test/suites/pythonudaf_p0/test_pythonudaf_drop.groovy
@@ -15,10 +15,23 @@
 // specific language governing permissions and limitations
 // under the License.
 
-suite('test_pythonudaf_drop') {
+suite('test_pythonudaf_drop', "nonConcurrent") {
     def runtime_version = getPythonUdfRuntimeVersion()
     def zipA = 
"""${context.file.parent}/udaf_scripts/python_udaf_drop_a/python_udaf_drop_test.zip"""
     def zipB = 
"""${context.file.parent}/udaf_scripts/python_udaf_drop_b/python_udaf_drop_test.zip"""
+    def localDorisHome = System.getenv("DORIS_HOME")
+    def localUdfRoot = localDorisHome != null ? "${localDorisHome}/lib/udf" : 
"/tmp"
+    def backendId_to_backendIP = [:]
+    def backendId_to_backendHttpPort = [:]
+    getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort)
+
+    def execOnBackend = { be_ip, localCmd, remoteCmd ->
+        if (be_ip == "127.0.0.1" || be_ip == "localhost") {
+            cmd(localCmd)
+        } else {
+            sshExec("root", be_ip, remoteCmd, false)
+        }
+    }
 
     scp_udf_file_to_all_be(zipA)
     scp_udf_file_to_all_be(zipB)
@@ -88,9 +101,33 @@ suite('test_pythonudaf_drop') {
             sql '''SELECT py_drop_sum_a(v) FROM py_udaf_drop_tbl;'''
             exception 'Can not found function'
         }
+
+        // Case 3: kill Python servers between two aggregate queries, next 
CREATE handshake should recover
+        sql '''DROP FUNCTION IF EXISTS py_drop_sum_reconnect(INT)'''
+        sql """
+            CREATE AGGREGATE FUNCTION py_drop_sum_reconnect(INT) RETURNS 
BIGINT PROPERTIES (
+                "type" = "PYTHON_UDF",
+                "file" = "file://${zipA}",
+                "symbol" = "drop_udaf.SumAgg",
+                "runtime_version" = "${runtime_version}"
+            )
+        """
+
+        qt_py_udaf_drop_4 '''SELECT py_drop_sum_reconnect(v) FROM 
py_udaf_drop_tbl;'''
+
+        backendId_to_backendIP.values().each { be_ip ->
+            execOnBackend(
+                be_ip,
+                "pkill -f 'python_server.py grpc+unix:///tmp/doris_python_udf' 
|| true",
+                "pkill -f 'python_server.py grpc+unix:///tmp/doris_python_udf' 
|| true")
+        }
+
+        qt_py_udaf_drop_5 '''SELECT py_drop_sum_reconnect(v) FROM 
py_udaf_drop_tbl;'''
+        try_sql('DROP FUNCTION IF EXISTS py_drop_sum_reconnect(INT);')
     } finally {
         try_sql('DROP FUNCTION IF EXISTS py_drop_sum_once(INT);')
         try_sql('DROP FUNCTION IF EXISTS py_drop_sum_a(INT);')
         try_sql('DROP FUNCTION IF EXISTS py_drop_sum_b(INT);')
+        try_sql('DROP FUNCTION IF EXISTS py_drop_sum_reconnect(INT);')
     }
 }
diff --git a/regression-test/suites/pythonudf_p0/test_pythonudf_drop.groovy 
b/regression-test/suites/pythonudf_p0/test_pythonudf_drop.groovy
index ab103c21f25..3c2c5f9258b 100644
--- a/regression-test/suites/pythonudf_p0/test_pythonudf_drop.groovy
+++ b/regression-test/suites/pythonudf_p0/test_pythonudf_drop.groovy
@@ -15,10 +15,23 @@
 // specific language governing permissions and limitations
 // under the License.
 
-suite("test_pythonudf_drop") {
+suite("test_pythonudf_drop", "nonConcurrent") {
     def runtime_version = getPythonUdfRuntimeVersion()
     def zipA = 
"""${context.file.parent}/udf_scripts/python_udf_drop_a/python_udf_drop_test.zip"""
     def zipB = 
"""${context.file.parent}/udf_scripts/python_udf_drop_b/python_udf_drop_test.zip"""
+    def localDorisHome = System.getenv("DORIS_HOME")
+    def localUdfRoot = localDorisHome != null ? "${localDorisHome}/lib/udf" : 
"/tmp"
+    def backendId_to_backendIP = [:]
+    def backendId_to_backendHttpPort = [:]
+    getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort)
+
+    def execOnBackend = { be_ip, localCmd, remoteCmd ->
+        if (be_ip == "127.0.0.1" || be_ip == "localhost") {
+            cmd(localCmd)
+        } else {
+            sshExec("root", be_ip, remoteCmd, false)
+        }
+    }
 
     scp_udf_file_to_all_be(zipA)
     scp_udf_file_to_all_be(zipB)
@@ -88,9 +101,33 @@ suite("test_pythonudf_drop") {
             sql """SELECT py_drop_a(1);"""
             exception "Can not found function"
         }
+
+        // Case 3: kill Python servers between two queries, next client 
handshake should recover
+        sql """DROP FUNCTION IF EXISTS py_drop_reconnect(INT)"""
+        sql """
+            CREATE FUNCTION py_drop_reconnect(INT) RETURNS INT PROPERTIES (
+                "type" = "PYTHON_UDF",
+                "file" = "file://${zipA}",
+                "symbol" = "drop_udf.evaluate",
+                "runtime_version" = "${runtime_version}"
+            )
+        """
+
+        qt_py_udf_drop_4 """SELECT py_drop_reconnect(31);"""
+
+        backendId_to_backendIP.values().each { be_ip ->
+            execOnBackend(
+                be_ip,
+                "pkill -f 'python_server.py grpc+unix:///tmp/doris_python_udf' 
|| true",
+                "pkill -f 'python_server.py grpc+unix:///tmp/doris_python_udf' 
|| true")
+        }
+
+        qt_py_udf_drop_5 """SELECT py_drop_reconnect(32);"""
+        try_sql("DROP FUNCTION IF EXISTS py_drop_reconnect(INT);")
     } finally {
         try_sql("DROP FUNCTION IF EXISTS py_drop_once(INT);")
         try_sql("DROP FUNCTION IF EXISTS py_drop_a(INT);")
         try_sql("DROP FUNCTION IF EXISTS py_drop_b(INT);")
+        try_sql("DROP FUNCTION IF EXISTS py_drop_reconnect(INT);")
     }
 }
diff --git a/regression-test/suites/pythonudtf_p0/test_pythonudtf_drop.groovy 
b/regression-test/suites/pythonudtf_p0/test_pythonudtf_drop.groovy
index 1f454243fb0..04abde6c146 100644
--- a/regression-test/suites/pythonudtf_p0/test_pythonudtf_drop.groovy
+++ b/regression-test/suites/pythonudtf_p0/test_pythonudtf_drop.groovy
@@ -15,10 +15,23 @@
 // specific language governing permissions and limitations
 // under the License.
 
-suite("test_pythonudtf_drop") {
+suite("test_pythonudtf_drop", "nonConcurrent") {
     def runtime_version = getPythonUdfRuntimeVersion()
     def zipA = 
"""${context.file.parent}/udtf_scripts/python_udtf_drop_a/python_udtf_drop_test.zip"""
     def zipB = 
"""${context.file.parent}/udtf_scripts/python_udtf_drop_b/python_udtf_drop_test.zip"""
+    def localDorisHome = System.getenv("DORIS_HOME")
+    def localUdfRoot = localDorisHome != null ? "${localDorisHome}/lib/udf" : 
"/tmp"
+    def backendId_to_backendIP = [:]
+    def backendId_to_backendHttpPort = [:]
+    getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort)
+
+    def execOnBackend = { be_ip, localCmd, remoteCmd ->
+        if (be_ip == "127.0.0.1" || be_ip == "localhost") {
+            cmd(localCmd)
+        } else {
+            sshExec("root", be_ip, remoteCmd, false)
+        }
+    }
 
     scp_udf_file_to_all_be(zipA)
     scp_udf_file_to_all_be(zipB)
@@ -122,9 +135,45 @@ suite("test_pythonudtf_drop") {
             """
             exception "Can not found function"
         }
+
+        // Case 4: kill Python servers between two table-function queries, 
next handshake should recover
+        sql """DROP FUNCTION IF EXISTS py_drop_t_reconnect(INT)"""
+        sql """
+            CREATE TABLES FUNCTION py_drop_t_reconnect(INT)
+            RETURNS ARRAY<INT>
+            PROPERTIES (
+                "type" = "PYTHON_UDF",
+                "file" = "file://${zipA}",
+                "symbol" = "drop_udtf.process",
+                "runtime_version" = "${runtime_version}"
+            )
+        """
+
+        qt_py_udtf_drop_4 """
+            SELECT c
+            FROM py_udtf_drop_tbl
+            LATERAL VIEW py_drop_t_reconnect(v) tmp AS c
+            ORDER BY c;
+        """
+
+        backendId_to_backendIP.values().each { be_ip ->
+            execOnBackend(
+                be_ip,
+                "pkill -f 'python_server.py grpc+unix:///tmp/doris_python_udf' 
|| true",
+                "pkill -f 'python_server.py grpc+unix:///tmp/doris_python_udf' 
|| true")
+        }
+
+        qt_py_udtf_drop_5 """
+            SELECT c
+            FROM py_udtf_drop_tbl
+            LATERAL VIEW py_drop_t_reconnect(v) tmp AS c
+            ORDER BY c;
+        """
+        try_sql("DROP FUNCTION IF EXISTS py_drop_t_reconnect(INT);")
     } finally {
         try_sql("DROP FUNCTION IF EXISTS py_drop_t_once(INT);")
         try_sql("DROP FUNCTION IF EXISTS py_drop_t_a(INT);")
         try_sql("DROP FUNCTION IF EXISTS py_drop_t_b(INT);")
+        try_sql("DROP FUNCTION IF EXISTS py_drop_t_reconnect(INT);")
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to