This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a1ce2cd5dcf [improve](streaming-job) support cdc_client JVM opts and 
adopt externally-managed cdc_client (#63898)
a1ce2cd5dcf is described below

commit a1ce2cd5dcf93fbe6155f1b001702c511591b6bf
Author: wudi <[email protected]>
AuthorDate: Thu Jun 4 16:08:03 2026 +0800

    [improve](streaming-job) support cdc_client JVM opts and adopt 
externally-managed cdc_client (#63898)
    
    ## What
    
    Two improvements to the BE-managed cdc_client lifecycle:
    
    ### 1. `cdc_client_java_opts` BE config + hardcoded OOM safety net
    
    New `be.conf` entry to pass extra JVM options to the BE-forked
    cdc_client process. The value is whitespace-tokenized and inserted into
    the child argv before `-jar`.
    
    **Example `be.conf`:**
    
    ```conf
    # single option
    cdc_client_java_opts = -Xmx512m
    
    # multiple options — whitespace-separated, NOT quoted
    cdc_client_java_opts = -Xmx512m -Xms256m -XX:+UseG1GC 
-Djdk.tls.client.protocols=TLSv1.2
    ```
    
    The resulting cdc_client child argv becomes (user opts first, hardcoded
    OOM flag last, then `-jar`):
    
    ```
    java -Xmx512m -Xms256m -XX:+UseG1GC -Dlog.path=<log_dir> 
-XX:+ExitOnOutOfMemoryError -jar cdc-client.jar --server.port=... ...
    ```
    
    > Do **not** wrap the value in quotes. The `be.conf` parser keeps the
    value verbatim (only trimming leading/trailing whitespace), so quotes
    would be passed into the JVM args and break startup. Leaving it empty
    (the default) keeps the previous behavior.
    
    `-XX:+ExitOnOutOfMemoryError` is **hardcoded** in BE (placed after the
    user opts, so JVM's last-wins rule prevents accidental disabling via
    `cdc_client_java_opts`). This guarantees every BE-forked cdc_client
    exits on OOM so BE can detect the dead child and re-fork — previously
    the JVM survived OOM in an unresponsive state and BE kept reporting "CDC
    client X unresponsive" without restarting.
    
    Existing clusters get the OOM flag automatically by picking up the new
    BE binary; no `be.conf` edit required.
    
    The startup uses `execv` instead of `execlp` to support variable-length
    argv. All heap-backed argv / path construction is done before `fork()`,
    and the child only performs async-signal-safe operations (`open`,
    `dup2`, `close`, `execv`, `_exit`) until `execv()` — this avoids
    deadlocking on libc/libstdc++ locks inherited from BE worker threads.
    
    `cdc_client_java_opts` is registered as `DEFINE_String` (immutable),
    consistent with `cdc_client_port` — admins change it via `be.conf` + BE
    restart. This prevents a data race between admin `set_config()` writes
    and `start_cdc_client()` reads.
    
    ### 2. Adopt externally-managed cdc_client
    
    `start_cdc_client()` now probes
    `127.0.0.1:cdc_client_port/actuator/health` before forking. If a healthy
    cdc_client is already listening (e.g. one started manually for debug /
    hotfix), BE adopts it and skips fork instead of fork-looping against a
    port it cannot bind. Edge cases:
    
    - Forked child binds the port, runs normally: unchanged (BE manages it).
    - BE-forked child died and user manually started a replacement on the
    same port: next RPC adopts the external instance.
    - User stops their external cdc_client: next RPC's probe fails, BE falls
    back to fork.
    - `fork()` returns success and health passes but the new child has
    already exited (port held by an external process answering health):
    treated as adoption rather than masking the dead PID as "Start success".
    
    A `_adopted_external` atomic edge-triggered flag throttles the "Adopting
    external cdc client" log so each mode transition prints exactly once.
    
    ## Tests
    
    - Existing `cdc_client_mgr_test.cpp` cases unchanged (all new lifecycle
    logic lives behind `#ifndef BE_TEST`).
    - Two new tests covering the `_adopted_external` flag default value and
    setter/getter round-trip.
    - Real adoption / probe / fallback path is not yet covered by unit tests
    because `check_cdc_client_health` is compiled out under `BE_TEST`. A
    follow-up PR will add a test seam (function-pointer indirection or local
    HTTP fixture) to exercise these paths.
    
    ## Test plan
    
    - [ ] Unit: `cdc_client_mgr_test`
    - [ ] Manual: kill BE-forked cdc_client, `nohup java -jar cdc-client.jar
    ...` on the same host; verify BE adopts it without fork-looping
    (`be.INFO` shows one-time `Adopting external cdc client on port 9096`).
    - [ ] Manual: trigger OOM in cdc_client; verify JVM exits and BE forks a
    healthy replacement.
---
 be/src/common/config.cpp                |  2 +
 be/src/common/config.h                  |  3 ++
 be/src/runtime/cdc_client_mgr.cpp       | 77 +++++++++++++++++++++++++++------
 be/src/runtime/cdc_client_mgr.h         |  4 ++
 be/test/runtime/cdc_client_mgr_test.cpp | 24 ++++++++++
 5 files changed, 96 insertions(+), 14 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 1b3be586faa..7bd2ced5796 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -73,6 +73,8 @@ DEFINE_Int32(arrow_flight_sql_port, "8050");
 
 DEFINE_Int32(cdc_client_port, "9096");
 
+DEFINE_String(cdc_client_java_opts, "");
+
 // If the external client cannot directly access priority_networks, set 
public_host to be accessible
 // to external client.
 // There are usually two usage scenarios:
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 661dc99d07a..b68783656e2 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -122,6 +122,9 @@ DECLARE_Int32(arrow_flight_sql_port);
 // port for cdc client scan oltp cdc data
 DECLARE_Int32(cdc_client_port);
 
+// JVM options passed to cdc_client (whitespace-separated). Inserted before 
-jar.
+DECLARE_String(cdc_client_java_opts);
+
 // If the external client cannot directly access priority_networks, set 
public_host to be accessible
 // to external client.
 // There are usually two usage scenarios:
diff --git a/be/src/runtime/cdc_client_mgr.cpp 
b/be/src/runtime/cdc_client_mgr.cpp
index f992aae41af..b37cadc980c 100644
--- a/be/src/runtime/cdc_client_mgr.cpp
+++ b/be/src/runtime/cdc_client_mgr.cpp
@@ -34,9 +34,12 @@
 
 #include <atomic>
 #include <chrono>
+#include <iterator>
 #include <mutex>
+#include <sstream>
 #include <string>
 #include <thread>
+#include <vector>
 
 #include "common/config.h"
 #include "common/logging.h"
@@ -149,10 +152,26 @@ Status 
CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
             _child_pid.store(0);
         }
 #endif
-    } else {
+    } else if (!_adopted_external.load()) {
         LOG(INFO) << "CDC client has never been started";
     }
 
+#ifndef BE_TEST
+    // Adopt an externally-managed cdc_client if the port already answers
+    // healthy (e.g. one started manually for debug / hotfix).
+    {
+        std::string adopt_response;
+        if (check_cdc_client_health(1, 0, adopt_response).ok()) {
+            if (!_adopted_external.exchange(true)) {
+                LOG(INFO) << "Adopting external cdc client on port "
+                          << doris::config::cdc_client_port;
+            }
+            return Status::OK();
+        }
+    }
+    _adopted_external.store(false);
+#endif
+
     const char* doris_home = getenv("DORIS_HOME");
     const char* log_dir = getenv("LOG_DIR");
     const std::string cdc_jar_path = std::string(doris_home) + 
"/lib/cdc_client/cdc-client.jar";
@@ -181,7 +200,35 @@ Status 
CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
     }
     std::string path(java_home);
     std::string java_bin = path + "/bin/java";
-    // Capture signal to prevent child process from becoming a zombie process
+
+    // Pre-build everything the child needs before fork(): heap allocation 
after
+    // fork() in a multi-threaded process can deadlock on inherited libc locks.
+    std::vector<std::string> argv_storage;
+    argv_storage.emplace_back("java");
+    const std::string user_java_opts = doris::config::cdc_client_java_opts;
+    if (!user_java_opts.empty()) {
+        std::istringstream iss(user_java_opts);
+        argv_storage.insert(argv_storage.end(), 
std::istream_iterator<std::string>(iss),
+                            std::istream_iterator<std::string>());
+    }
+    argv_storage.emplace_back(java_opts);
+    // OOM safety net (last-wins, user opts cannot disable).
+    argv_storage.emplace_back("-XX:+ExitOnOutOfMemoryError");
+    argv_storage.emplace_back("-jar");
+    argv_storage.emplace_back(cdc_jar_path);
+    argv_storage.emplace_back(cdc_jar_port);
+    argv_storage.emplace_back(backend_http_port);
+    argv_storage.emplace_back(cluster_token);
+
+    std::vector<char*> argv;
+    argv.reserve(argv_storage.size() + 1);
+    for (auto& s : argv_storage) {
+        argv.push_back(const_cast<char*>(s.c_str()));
+    }
+    argv.push_back(nullptr);
+
+    const std::string cdc_out_file = std::string(log_dir) + "/cdc-client.out";
+
     struct sigaction act;
     act.sa_flags = 0;
     act.sa_handler = handle_sigchld;
@@ -194,33 +241,25 @@ Status 
CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
 #else
     pid_t pid = fork();
     if (pid < 0) {
-        // Fork failed
         st = Status::InternalError("Fork cdc client failed.");
         st.to_protobuf(result->mutable_status());
         return st;
     } else if (pid == 0) {
-        // Child process
-        // When the parent process is killed, the child process also needs to 
exit
+        // Child: async-signal-safe operations only until execv().
 #ifndef __APPLE__
         prctl(PR_SET_PDEATHSIG, SIGKILL);
 #endif
-        // Redirect stdout and stderr to log out file
-        std::string cdc_out_file = std::string(log_dir) + "/cdc-client.out";
         int out_fd = open(cdc_out_file.c_str(), O_WRONLY | O_CREAT | O_APPEND 
| O_CLOEXEC, 0644);
         if (out_fd < 0) {
             perror("open cdc-client.out file failed");
-            exit(1);
+            _exit(1);
         }
         dup2(out_fd, STDOUT_FILENO);
         dup2(out_fd, STDERR_FILENO);
         close(out_fd);
-
-        // java -jar -Dlog.path=xx cdc-client.jar --server.port=9096 
--backend.http.port=8040
-        execlp(java_bin.c_str(), "java", java_opts.c_str(), "-jar", 
cdc_jar_path.c_str(),
-               cdc_jar_port.c_str(), backend_http_port.c_str(), 
cluster_token.c_str(), (char*)NULL);
-        // If execlp returns, it means it failed
+        execv(java_bin.c_str(), argv.data());
         perror("Cdc client child process error");
-        exit(1);
+        _exit(1);
     } else {
         // Parent process: save PID and wait for startup
         _child_pid.store(pid);
@@ -233,7 +272,17 @@ Status 
CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
             _child_pid.store(0);
             st = Status::InternalError("Start cdc client failed.");
             st.to_protobuf(result->mutable_status());
+        } else if (kill(pid, 0) != 0) {
+            // Port healthy but our child has exited: an external process is
+            // answering. Treat as adoption instead of masking dead PID as 
success.
+            _child_pid.store(0);
+            if (!_adopted_external.exchange(true)) {
+                LOG(INFO) << "Forked cdc client " << pid << " exited but port "
+                          << doris::config::cdc_client_port
+                          << " is healthy, adopting external instance";
+            }
         } else {
+            _adopted_external.store(false);
             LOG(INFO) << "Start cdc client success, pid=" << pid
                       << ", status=" << status.to_string() << ", response=" << 
health_response;
         }
diff --git a/be/src/runtime/cdc_client_mgr.h b/be/src/runtime/cdc_client_mgr.h
index 077097086da..b3b350154b6 100644
--- a/be/src/runtime/cdc_client_mgr.h
+++ b/be/src/runtime/cdc_client_mgr.h
@@ -53,11 +53,15 @@ public:
     pid_t get_child_pid() const { return _child_pid.load(); }
     // For testing only: set child PID directly
     void set_child_pid_for_test(pid_t pid) { _child_pid.store(pid); }
+    // For testing only: inspect / drive the adopt-external flag
+    bool get_adopted_external_for_test() const { return 
_adopted_external.load(); }
+    void set_adopted_external_for_test(bool v) { _adopted_external.store(v); }
 #endif
 
 private:
     std::mutex _start_mutex;
     std::atomic<pid_t> _child_pid {0};
+    std::atomic<bool> _adopted_external {false};
 };
 
 } // namespace doris
diff --git a/be/test/runtime/cdc_client_mgr_test.cpp 
b/be/test/runtime/cdc_client_mgr_test.cpp
index 3ab05c394d7..c68e71956ad 100644
--- a/be/test/runtime/cdc_client_mgr_test.cpp
+++ b/be/test/runtime/cdc_client_mgr_test.cpp
@@ -638,6 +638,30 @@ TEST_F(CdcClientMgrTest, StartWithPreExistingResultStatus) 
{
     EXPECT_EQ(result.status().status_code(), 999);
 }
 
+// Verify _adopted_external defaults to false and start_cdc_client (BE_TEST
+// short-circuit) does not alter it.
+TEST_F(CdcClientMgrTest, AdoptedExternalDefaultFalse) {
+    CdcClientMgr mgr;
+    EXPECT_FALSE(mgr.get_adopted_external_for_test());
+
+    PRequestCdcClientResult result;
+    Status status = mgr.start_cdc_client(&result);
+    EXPECT_TRUE(status.ok());
+    EXPECT_FALSE(mgr.get_adopted_external_for_test());
+}
+
+// Verify the _adopted_external flag round-trips through the setter/getter.
+TEST_F(CdcClientMgrTest, AdoptedExternalSetterRoundTrip) {
+    CdcClientMgr mgr;
+    EXPECT_FALSE(mgr.get_adopted_external_for_test());
+
+    mgr.set_adopted_external_for_test(true);
+    EXPECT_TRUE(mgr.get_adopted_external_for_test());
+
+    mgr.set_adopted_external_for_test(false);
+    EXPECT_FALSE(mgr.get_adopted_external_for_test());
+}
+
 // Test send_request_to_cdc_client with empty API
 TEST_F(CdcClientMgrTest, SendRequestEmptyApi) {
     CdcClientMgr mgr;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to