This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new afb143c1e86 [improve](binlog) Add config to control whether enable
persistent connection during ingesting (#49005)
afb143c1e86 is described below
commit afb143c1e8606c5e4879335ee1efe5429a9423a0
Author: walter <[email protected]>
AuthorDate: Sat Mar 15 10:45:53 2025 +0800
[improve](binlog) Add config to control whether enable persistent
connection during ingesting (#49005)
Cherry-pick #48467, #48761
---
be/src/common/config.cpp | 3 +++
be/src/common/config.h | 3 +++
be/src/http/http_client.cpp | 29 ++++++++++++++++++++++++++---
be/src/http/http_client.h | 4 ++++
be/src/service/backend_service.cpp | 31 ++++++++++++++++++++++---------
5 files changed, 58 insertions(+), 12 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index c8c30ae5e9f..96a136153ce 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1240,6 +1240,9 @@ DEFINE_Bool(ignore_always_true_predicate_for_segment,
"true");
// Ingest binlog work pool size, -1 is disable, 0 is hardware concurrency
DEFINE_Int32(ingest_binlog_work_pool_size, "-1");
+// Ingest binlog with persistent connection
+DEFINE_Bool(enable_ingest_binlog_with_persistent_connection, "false");
+
// Download binlog rate limit, unit is KB/s, 0 means no limit
DEFINE_Int32(download_binlog_rate_limit_kbs, "0");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 0d14fcf4cb6..ccd6706863c 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1310,6 +1310,9 @@ DECLARE_Bool(ignore_always_true_predicate_for_segment);
// Ingest binlog work pool size
DECLARE_Int32(ingest_binlog_work_pool_size);
+// Ingest binlog with persistent connection
+DECLARE_Bool(enable_ingest_binlog_with_persistent_connection);
+
// Download binlog rate limit, unit is KB/s
DECLARE_Int32(download_binlog_rate_limit_kbs);
diff --git a/be/src/http/http_client.cpp b/be/src/http/http_client.cpp
index a9c1a435650..8ead9387a4f 100644
--- a/be/src/http/http_client.cpp
+++ b/be/src/http/http_client.cpp
@@ -189,7 +189,7 @@ private:
_written_size += write_size;
if (_written_size == _file_size) {
// This file has been downloaded, switch to the next one.
- switchToNextFile();
+ switch_to_next_file();
}
return write_size;
@@ -197,7 +197,7 @@ private:
Status finish_inner() {
if (!_is_reading_header && _written_size == _file_size) {
- switchToNextFile();
+ switch_to_next_file();
}
if (_fd >= 0) {
@@ -220,7 +220,7 @@ private:
return Status::OK();
}
- void switchToNextFile() {
+ void switch_to_next_file() {
DCHECK(_fd >= 0);
DCHECK(_written_size == _file_size);
@@ -519,6 +519,29 @@ const char* HttpClient::_get_url() const {
return url;
}
+// execute remote call action with retry
+Status HttpClient::execute(int retry_times, int sleep_time,
+ const std::function<Status(HttpClient*)>& callback)
{
+ Status status;
+ for (int i = 0; i < retry_times; ++i) {
+ status = callback(this);
+ if (status.ok()) {
+ auto http_status = get_http_status();
+ if (http_status == 200) {
+ return status;
+ } else {
+ std::string url = mask_token(_get_url());
+ auto error_msg = fmt::format("http status code is not 200,
code={}, url={}",
+ http_status, url);
+ LOG(WARNING) << error_msg;
+ return Status::HttpError(error_msg);
+ }
+ }
+ sleep(sleep_time);
+ }
+ return status;
+}
+
Status HttpClient::execute_with_retry(int retry_times, int sleep_time,
const
std::function<Status(HttpClient*)>& callback) {
Status status;
diff --git a/be/src/http/http_client.h b/be/src/http/http_client.h
index 8594c6c3832..e56e6b93e3d 100644
--- a/be/src/http/http_client.h
+++ b/be/src/http/http_client.h
@@ -149,6 +149,10 @@ public:
// execute remote call action
Status execute(const std::function<bool(const void* data, size_t length)>&
callback = {});
+ // execute remote call action with retry, like execute_with_retry but keep
the http client instance
+ Status execute(int retry_times, int sleep_time,
+ const std::function<Status(HttpClient*)>& callback);
+
size_t on_response_data(const void* data, size_t length);
// The file name of the variant column with the inverted index contains %
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index 7403b979f8b..8d0326f0c3e 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -95,7 +95,22 @@ struct IngestBinlogArg {
TStatus* tstatus;
};
+Status _exec_http_req(std::optional<HttpClient>& client, int retry_times, int
sleep_time,
+ const std::function<Status(HttpClient*)>& callback) {
+ if (client.has_value()) {
+ return client->execute(retry_times, sleep_time, callback);
+ } else {
+ return HttpClient::execute_with_retry(retry_times, sleep_time,
callback);
+ }
+}
+
void _ingest_binlog(IngestBinlogArg* arg) {
+ std::optional<HttpClient> client;
+ if (config::enable_ingest_binlog_with_persistent_connection) {
+ // Save the http client instance for persistent connection
+ client = std::make_optional<HttpClient>();
+ }
+
auto txn_id = arg->txn_id;
auto partition_id = arg->partition_id;
auto local_tablet_id = arg->local_tablet_id;
@@ -174,7 +189,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
return client->execute(&binlog_info);
};
- auto status = HttpClient::execute_with_retry(max_retry, 1,
get_binlog_info_cb);
+ auto status = _exec_http_req(client, max_retry, 1, get_binlog_info_cb);
if (!status.ok()) {
LOG(WARNING) << "failed to get binlog info from " <<
get_binlog_info_url
<< ", status=" << status.to_string();
@@ -213,7 +228,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
return client->execute(&rowset_meta_str);
};
- status = HttpClient::execute_with_retry(max_retry, 1, get_rowset_meta_cb);
+ status = _exec_http_req(client, max_retry, 1, get_rowset_meta_cb);
if (!status.ok()) {
LOG(WARNING) << "failed to get rowset meta from " <<
get_rowset_meta_url
<< ", status=" << status.to_string();
@@ -264,7 +279,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
return client->get_content_length(&segment_file_size);
};
- status = HttpClient::execute_with_retry(max_retry, 1,
get_segment_file_size_cb);
+ status = _exec_http_req(client, max_retry, 1,
get_segment_file_size_cb);
if (!status.ok()) {
LOG(WARNING) << "failed to get segment file size from " <<
get_segment_file_size_url
<< ", status=" << status.to_string();
@@ -353,7 +368,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
io::LocalFileSystem::PERMS_OWNER_RW);
};
- auto status = HttpClient::execute_with_retry(max_retry, 1,
get_segment_file_cb);
+ auto status = _exec_http_req(client, max_retry, 1,
get_segment_file_cb);
if (!status.ok()) {
LOG(WARNING) << "failed to get segment file from " <<
get_segment_file_url
<< ", status=" << status.to_string();
@@ -394,8 +409,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
index_id, index.get_index_suffix());
segment_index_file_names.push_back(index_file);
- status = HttpClient::execute_with_retry(max_retry, 1,
-
get_segment_index_file_size_cb);
+ status = _exec_http_req(client, max_retry, 1,
get_segment_index_file_size_cb);
if (!status.ok()) {
LOG(WARNING) << "failed to get segment file size from "
<< get_segment_index_file_size_url
@@ -430,8 +444,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
auto index_file =
InvertedIndexDescriptor::get_index_file_name(local_segment_path);
segment_index_file_names.push_back(index_file);
- status = HttpClient::execute_with_retry(max_retry, 1,
-
get_segment_index_file_size_cb);
+ status = _exec_http_req(client, max_retry, 1,
get_segment_index_file_size_cb);
if (!status.ok()) {
LOG(WARNING) << "failed to get segment file size from "
<< get_segment_index_file_size_url
@@ -524,7 +537,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
io::LocalFileSystem::PERMS_OWNER_RW);
};
- status = HttpClient::execute_with_retry(max_retry, 1,
get_segment_index_file_cb);
+ status = _exec_http_req(client, max_retry, 1,
get_segment_index_file_cb);
if (!status.ok()) {
LOG(WARNING) << "failed to get segment index file from " <<
get_segment_index_file_url
<< ", status=" << status.to_string();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]