This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a1ce2cd5dcf [improve](streaming-job) support cdc_client JVM opts and
adopt externally-managed cdc_client (#63898)
a1ce2cd5dcf is described below
commit a1ce2cd5dcf93fbe6155f1b001702c511591b6bf
Author: wudi <[email protected]>
AuthorDate: Thu Jun 4 16:08:03 2026 +0800
[improve](streaming-job) support cdc_client JVM opts and adopt
externally-managed cdc_client (#63898)
## What
Two improvements to the BE-managed cdc_client lifecycle:
### 1. `cdc_client_java_opts` BE config + hardcoded OOM safety net
New `be.conf` entry to pass extra JVM options to the BE-forked
cdc_client process. The value is whitespace-tokenized and inserted into
the child argv before `-jar`.
**Example `be.conf`:**
```conf
# single option
cdc_client_java_opts = -Xmx512m
# multiple options — whitespace-separated, NOT quoted
cdc_client_java_opts = -Xmx512m -Xms256m -XX:+UseG1GC
-Djdk.tls.client.protocols=TLSv1.2
```
The resulting cdc_client child argv becomes (user opts first, hardcoded
OOM flag last, then `-jar`):
```
java -Xmx512m -Xms256m -XX:+UseG1GC -Dlog.path=<log_dir>
-XX:+ExitOnOutOfMemoryError -jar cdc-client.jar --server.port=... ...
```
> Do **not** wrap the value in quotes. The `be.conf` parser keeps the
value verbatim (only trimming leading/trailing whitespace), so quotes
would be passed into the JVM args and break startup. Leaving it empty
(the default) keeps the previous behavior.
`-XX:+ExitOnOutOfMemoryError` is **hardcoded** in BE (placed after the
user opts, so JVM's last-wins rule prevents accidental disabling via
`cdc_client_java_opts`). This guarantees every BE-forked cdc_client
exits on OOM so BE can detect the dead child and re-fork — previously
the JVM survived OOM in an unresponsive state and BE kept reporting "CDC
client X unresponsive" without restarting.
Existing clusters get the OOM flag automatically by picking up the new
BE binary; no `be.conf` edit required.
The startup uses `execv` instead of `execlp` to support variable-length
argv. All heap-backed argv / path construction is done before `fork()`,
and the child only performs async-signal-safe operations (`open`,
`dup2`, `close`, `execv`, `_exit`) until `execv()` — this avoids
deadlocking on libc/libstdc++ locks inherited from BE worker threads.
`cdc_client_java_opts` is registered as `DEFINE_String` (immutable),
consistent with `cdc_client_port` — admins change it via `be.conf` + BE
restart. This prevents a data race between admin `set_config()` writes
and `start_cdc_client()` reads.
### 2. Adopt externally-managed cdc_client
`start_cdc_client()` now probes
`127.0.0.1:cdc_client_port/actuator/health` before forking. If a healthy
cdc_client is already listening (e.g. one started manually for debug /
hotfix), BE adopts it and skips fork instead of fork-looping against a
port it cannot bind. Edge cases:
- Forked child binds the port, runs normally: unchanged (BE manages it).
- BE-forked child died and user manually started a replacement on the
same port: next RPC adopts the external instance.
- User stops their external cdc_client: next RPC's probe fails, BE falls
back to fork.
- `fork()` returns success and health passes but the new child has
already exited (port held by an external process answering health):
treated as adoption rather than masking the dead PID as "Start success".
A `_adopted_external` atomic edge-triggered flag throttles the "Adopting
external cdc client" log so each mode transition prints exactly once.
## Tests
- Existing `cdc_client_mgr_test.cpp` cases unchanged (all new lifecycle
logic lives behind `#ifndef BE_TEST`).
- Two new tests covering the `_adopted_external` flag default value and
setter/getter round-trip.
- Real adoption / probe / fallback path is not yet covered by unit tests
because `check_cdc_client_health` is compiled out under `BE_TEST`. A
follow-up PR will add a test seam (function-pointer indirection or local
HTTP fixture) to exercise these paths.
## Test plan
- [ ] Unit: `cdc_client_mgr_test`
- [ ] Manual: kill BE-forked cdc_client, `nohup java -jar cdc-client.jar
...` on the same host; verify BE adopts it without fork-looping
(`be.INFO` shows one-time `Adopting external cdc client on port 9096`).
- [ ] Manual: trigger OOM in cdc_client; verify JVM exits and BE forks a
healthy replacement.
---
be/src/common/config.cpp | 2 +
be/src/common/config.h | 3 ++
be/src/runtime/cdc_client_mgr.cpp | 77 +++++++++++++++++++++++++++------
be/src/runtime/cdc_client_mgr.h | 4 ++
be/test/runtime/cdc_client_mgr_test.cpp | 24 ++++++++++
5 files changed, 96 insertions(+), 14 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 1b3be586faa..7bd2ced5796 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -73,6 +73,8 @@ DEFINE_Int32(arrow_flight_sql_port, "8050");
DEFINE_Int32(cdc_client_port, "9096");
+DEFINE_String(cdc_client_java_opts, "");
+
// If the external client cannot directly access priority_networks, set
public_host to be accessible
// to external client.
// There are usually two usage scenarios:
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 661dc99d07a..b68783656e2 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -122,6 +122,9 @@ DECLARE_Int32(arrow_flight_sql_port);
// port for cdc client scan oltp cdc data
DECLARE_Int32(cdc_client_port);
+// JVM options passed to cdc_client (whitespace-separated). Inserted before
-jar.
+DECLARE_String(cdc_client_java_opts);
+
// If the external client cannot directly access priority_networks, set
public_host to be accessible
// to external client.
// There are usually two usage scenarios:
diff --git a/be/src/runtime/cdc_client_mgr.cpp
b/be/src/runtime/cdc_client_mgr.cpp
index f992aae41af..b37cadc980c 100644
--- a/be/src/runtime/cdc_client_mgr.cpp
+++ b/be/src/runtime/cdc_client_mgr.cpp
@@ -34,9 +34,12 @@
#include <atomic>
#include <chrono>
+#include <iterator>
#include <mutex>
+#include <sstream>
#include <string>
#include <thread>
+#include <vector>
#include "common/config.h"
#include "common/logging.h"
@@ -149,10 +152,26 @@ Status
CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
_child_pid.store(0);
}
#endif
- } else {
+ } else if (!_adopted_external.load()) {
LOG(INFO) << "CDC client has never been started";
}
+#ifndef BE_TEST
+ // Adopt an externally-managed cdc_client if the port already answers
+ // healthy (e.g. one started manually for debug / hotfix).
+ {
+ std::string adopt_response;
+ if (check_cdc_client_health(1, 0, adopt_response).ok()) {
+ if (!_adopted_external.exchange(true)) {
+ LOG(INFO) << "Adopting external cdc client on port "
+ << doris::config::cdc_client_port;
+ }
+ return Status::OK();
+ }
+ }
+ _adopted_external.store(false);
+#endif
+
const char* doris_home = getenv("DORIS_HOME");
const char* log_dir = getenv("LOG_DIR");
const std::string cdc_jar_path = std::string(doris_home) +
"/lib/cdc_client/cdc-client.jar";
@@ -181,7 +200,35 @@ Status
CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
}
std::string path(java_home);
std::string java_bin = path + "/bin/java";
- // Capture signal to prevent child process from becoming a zombie process
+
+ // Pre-build everything the child needs before fork(): heap allocation
after
+ // fork() in a multi-threaded process can deadlock on inherited libc locks.
+ std::vector<std::string> argv_storage;
+ argv_storage.emplace_back("java");
+ const std::string user_java_opts = doris::config::cdc_client_java_opts;
+ if (!user_java_opts.empty()) {
+ std::istringstream iss(user_java_opts);
+ argv_storage.insert(argv_storage.end(),
std::istream_iterator<std::string>(iss),
+ std::istream_iterator<std::string>());
+ }
+ argv_storage.emplace_back(java_opts);
+ // OOM safety net (last-wins, user opts cannot disable).
+ argv_storage.emplace_back("-XX:+ExitOnOutOfMemoryError");
+ argv_storage.emplace_back("-jar");
+ argv_storage.emplace_back(cdc_jar_path);
+ argv_storage.emplace_back(cdc_jar_port);
+ argv_storage.emplace_back(backend_http_port);
+ argv_storage.emplace_back(cluster_token);
+
+ std::vector<char*> argv;
+ argv.reserve(argv_storage.size() + 1);
+ for (auto& s : argv_storage) {
+ argv.push_back(const_cast<char*>(s.c_str()));
+ }
+ argv.push_back(nullptr);
+
+ const std::string cdc_out_file = std::string(log_dir) + "/cdc-client.out";
+
struct sigaction act;
act.sa_flags = 0;
act.sa_handler = handle_sigchld;
@@ -194,33 +241,25 @@ Status
CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
#else
pid_t pid = fork();
if (pid < 0) {
- // Fork failed
st = Status::InternalError("Fork cdc client failed.");
st.to_protobuf(result->mutable_status());
return st;
} else if (pid == 0) {
- // Child process
- // When the parent process is killed, the child process also needs to
exit
+ // Child: async-signal-safe operations only until execv().
#ifndef __APPLE__
prctl(PR_SET_PDEATHSIG, SIGKILL);
#endif
- // Redirect stdout and stderr to log out file
- std::string cdc_out_file = std::string(log_dir) + "/cdc-client.out";
int out_fd = open(cdc_out_file.c_str(), O_WRONLY | O_CREAT | O_APPEND
| O_CLOEXEC, 0644);
if (out_fd < 0) {
perror("open cdc-client.out file failed");
- exit(1);
+ _exit(1);
}
dup2(out_fd, STDOUT_FILENO);
dup2(out_fd, STDERR_FILENO);
close(out_fd);
-
- // java -jar -Dlog.path=xx cdc-client.jar --server.port=9096
--backend.http.port=8040
- execlp(java_bin.c_str(), "java", java_opts.c_str(), "-jar",
cdc_jar_path.c_str(),
- cdc_jar_port.c_str(), backend_http_port.c_str(),
cluster_token.c_str(), (char*)NULL);
- // If execlp returns, it means it failed
+ execv(java_bin.c_str(), argv.data());
perror("Cdc client child process error");
- exit(1);
+ _exit(1);
} else {
// Parent process: save PID and wait for startup
_child_pid.store(pid);
@@ -233,7 +272,17 @@ Status
CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
_child_pid.store(0);
st = Status::InternalError("Start cdc client failed.");
st.to_protobuf(result->mutable_status());
+ } else if (kill(pid, 0) != 0) {
+ // Port healthy but our child has exited: an external process is
+ // answering. Treat as adoption instead of masking dead PID as
success.
+ _child_pid.store(0);
+ if (!_adopted_external.exchange(true)) {
+ LOG(INFO) << "Forked cdc client " << pid << " exited but port "
+ << doris::config::cdc_client_port
+ << " is healthy, adopting external instance";
+ }
} else {
+ _adopted_external.store(false);
LOG(INFO) << "Start cdc client success, pid=" << pid
<< ", status=" << status.to_string() << ", response=" <<
health_response;
}
diff --git a/be/src/runtime/cdc_client_mgr.h b/be/src/runtime/cdc_client_mgr.h
index 077097086da..b3b350154b6 100644
--- a/be/src/runtime/cdc_client_mgr.h
+++ b/be/src/runtime/cdc_client_mgr.h
@@ -53,11 +53,15 @@ public:
pid_t get_child_pid() const { return _child_pid.load(); }
// For testing only: set child PID directly
void set_child_pid_for_test(pid_t pid) { _child_pid.store(pid); }
+ // For testing only: inspect / drive the adopt-external flag
+ bool get_adopted_external_for_test() const { return
_adopted_external.load(); }
+ void set_adopted_external_for_test(bool v) { _adopted_external.store(v); }
#endif
private:
std::mutex _start_mutex;
std::atomic<pid_t> _child_pid {0};
+ std::atomic<bool> _adopted_external {false};
};
} // namespace doris
diff --git a/be/test/runtime/cdc_client_mgr_test.cpp
b/be/test/runtime/cdc_client_mgr_test.cpp
index 3ab05c394d7..c68e71956ad 100644
--- a/be/test/runtime/cdc_client_mgr_test.cpp
+++ b/be/test/runtime/cdc_client_mgr_test.cpp
@@ -638,6 +638,30 @@ TEST_F(CdcClientMgrTest, StartWithPreExistingResultStatus)
{
EXPECT_EQ(result.status().status_code(), 999);
}
+// Verify _adopted_external defaults to false and start_cdc_client (BE_TEST
+// short-circuit) does not alter it.
+TEST_F(CdcClientMgrTest, AdoptedExternalDefaultFalse) {
+ CdcClientMgr mgr;
+ EXPECT_FALSE(mgr.get_adopted_external_for_test());
+
+ PRequestCdcClientResult result;
+ Status status = mgr.start_cdc_client(&result);
+ EXPECT_TRUE(status.ok());
+ EXPECT_FALSE(mgr.get_adopted_external_for_test());
+}
+
+// Verify the _adopted_external flag round-trips through the setter/getter.
+TEST_F(CdcClientMgrTest, AdoptedExternalSetterRoundTrip) {
+ CdcClientMgr mgr;
+ EXPECT_FALSE(mgr.get_adopted_external_for_test());
+
+ mgr.set_adopted_external_for_test(true);
+ EXPECT_TRUE(mgr.get_adopted_external_for_test());
+
+ mgr.set_adopted_external_for_test(false);
+ EXPECT_FALSE(mgr.get_adopted_external_for_test());
+}
+
// Test send_request_to_cdc_client with empty API
TEST_F(CdcClientMgrTest, SendRequestEmptyApi) {
CdcClientMgr mgr;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]