This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new afb143c1e86 [improve](binlog) Add config to control whether enable 
persistent connection during ingesting (#49005)
afb143c1e86 is described below

commit afb143c1e8606c5e4879335ee1efe5429a9423a0
Author: walter <[email protected]>
AuthorDate: Sat Mar 15 10:45:53 2025 +0800

    [improve](binlog) Add config to control whether enable persistent 
connection during ingesting (#49005)
    
    Cherry-pick #48467, #48761
---
 be/src/common/config.cpp           |  3 +++
 be/src/common/config.h             |  3 +++
 be/src/http/http_client.cpp        | 29 ++++++++++++++++++++++++++---
 be/src/http/http_client.h          |  4 ++++
 be/src/service/backend_service.cpp | 31 ++++++++++++++++++++++---------
 5 files changed, 58 insertions(+), 12 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index c8c30ae5e9f..96a136153ce 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1240,6 +1240,9 @@ DEFINE_Bool(ignore_always_true_predicate_for_segment, 
"true");
 // Ingest binlog work pool size, -1 is disable, 0 is hardware concurrency
 DEFINE_Int32(ingest_binlog_work_pool_size, "-1");
 
+// Ingest binlog with persistent connection
+DEFINE_Bool(enable_ingest_binlog_with_persistent_connection, "false");
+
 // Download binlog rate limit, unit is KB/s, 0 means no limit
 DEFINE_Int32(download_binlog_rate_limit_kbs, "0");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 0d14fcf4cb6..ccd6706863c 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1310,6 +1310,9 @@ DECLARE_Bool(ignore_always_true_predicate_for_segment);
 // Ingest binlog work pool size
 DECLARE_Int32(ingest_binlog_work_pool_size);
 
+// Ingest binlog with persistent connection
+DECLARE_Bool(enable_ingest_binlog_with_persistent_connection);
+
 // Download binlog rate limit, unit is KB/s
 DECLARE_Int32(download_binlog_rate_limit_kbs);
 
diff --git a/be/src/http/http_client.cpp b/be/src/http/http_client.cpp
index a9c1a435650..8ead9387a4f 100644
--- a/be/src/http/http_client.cpp
+++ b/be/src/http/http_client.cpp
@@ -189,7 +189,7 @@ private:
         _written_size += write_size;
         if (_written_size == _file_size) {
             // This file has been downloaded, switch to the next one.
-            switchToNextFile();
+            switch_to_next_file();
         }
 
         return write_size;
@@ -197,7 +197,7 @@ private:
 
     Status finish_inner() {
         if (!_is_reading_header && _written_size == _file_size) {
-            switchToNextFile();
+            switch_to_next_file();
         }
 
         if (_fd >= 0) {
@@ -220,7 +220,7 @@ private:
         return Status::OK();
     }
 
-    void switchToNextFile() {
+    void switch_to_next_file() {
         DCHECK(_fd >= 0);
         DCHECK(_written_size == _file_size);
 
@@ -519,6 +519,29 @@ const char* HttpClient::_get_url() const {
     return url;
 }
 
+// execute remote call action with retry
+Status HttpClient::execute(int retry_times, int sleep_time,
+                           const std::function<Status(HttpClient*)>& callback) 
{
+    Status status;
+    for (int i = 0; i < retry_times; ++i) {
+        status = callback(this);
+        if (status.ok()) {
+            auto http_status = get_http_status();
+            if (http_status == 200) {
+                return status;
+            } else {
+                std::string url = mask_token(_get_url());
+                auto error_msg = fmt::format("http status code is not 200, 
code={}, url={}",
+                                             http_status, url);
+                LOG(WARNING) << error_msg;
+                return Status::HttpError(error_msg);
+            }
+        }
+        sleep(sleep_time);
+    }
+    return status;
+}
+
 Status HttpClient::execute_with_retry(int retry_times, int sleep_time,
                                       const 
std::function<Status(HttpClient*)>& callback) {
     Status status;
diff --git a/be/src/http/http_client.h b/be/src/http/http_client.h
index 8594c6c3832..e56e6b93e3d 100644
--- a/be/src/http/http_client.h
+++ b/be/src/http/http_client.h
@@ -149,6 +149,10 @@ public:
     // execute remote call action
     Status execute(const std::function<bool(const void* data, size_t length)>& 
callback = {});
 
+    // execute remote call action with retry, like execute_with_retry but keep 
the http client instance
+    Status execute(int retry_times, int sleep_time,
+                   const std::function<Status(HttpClient*)>& callback);
+
     size_t on_response_data(const void* data, size_t length);
 
     // The file name of the variant column with the inverted index contains %
diff --git a/be/src/service/backend_service.cpp 
b/be/src/service/backend_service.cpp
index 7403b979f8b..8d0326f0c3e 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -95,7 +95,22 @@ struct IngestBinlogArg {
     TStatus* tstatus;
 };
 
+Status _exec_http_req(std::optional<HttpClient>& client, int retry_times, int 
sleep_time,
+                      const std::function<Status(HttpClient*)>& callback) {
+    if (client.has_value()) {
+        return client->execute(retry_times, sleep_time, callback);
+    } else {
+        return HttpClient::execute_with_retry(retry_times, sleep_time, 
callback);
+    }
+}
+
 void _ingest_binlog(IngestBinlogArg* arg) {
+    std::optional<HttpClient> client;
+    if (config::enable_ingest_binlog_with_persistent_connection) {
+        // Save the http client instance for persistent connection
+        client = std::make_optional<HttpClient>();
+    }
+
     auto txn_id = arg->txn_id;
     auto partition_id = arg->partition_id;
     auto local_tablet_id = arg->local_tablet_id;
@@ -174,7 +189,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
         client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
         return client->execute(&binlog_info);
     };
-    auto status = HttpClient::execute_with_retry(max_retry, 1, 
get_binlog_info_cb);
+    auto status = _exec_http_req(client, max_retry, 1, get_binlog_info_cb);
     if (!status.ok()) {
         LOG(WARNING) << "failed to get binlog info from " << 
get_binlog_info_url
                      << ", status=" << status.to_string();
@@ -213,7 +228,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
         client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
         return client->execute(&rowset_meta_str);
     };
-    status = HttpClient::execute_with_retry(max_retry, 1, get_rowset_meta_cb);
+    status = _exec_http_req(client, max_retry, 1, get_rowset_meta_cb);
     if (!status.ok()) {
         LOG(WARNING) << "failed to get rowset meta from " << 
get_rowset_meta_url
                      << ", status=" << status.to_string();
@@ -264,7 +279,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
             return client->get_content_length(&segment_file_size);
         };
 
-        status = HttpClient::execute_with_retry(max_retry, 1, 
get_segment_file_size_cb);
+        status = _exec_http_req(client, max_retry, 1, 
get_segment_file_size_cb);
         if (!status.ok()) {
             LOG(WARNING) << "failed to get segment file size from " << 
get_segment_file_size_url
                          << ", status=" << status.to_string();
@@ -353,7 +368,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
                                                              
io::LocalFileSystem::PERMS_OWNER_RW);
         };
 
-        auto status = HttpClient::execute_with_retry(max_retry, 1, 
get_segment_file_cb);
+        auto status = _exec_http_req(client, max_retry, 1, 
get_segment_file_cb);
         if (!status.ok()) {
             LOG(WARNING) << "failed to get segment file from " << 
get_segment_file_url
                          << ", status=" << status.to_string();
@@ -394,8 +409,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
                         index_id, index.get_index_suffix());
                 segment_index_file_names.push_back(index_file);
 
-                status = HttpClient::execute_with_retry(max_retry, 1,
-                                                        
get_segment_index_file_size_cb);
+                status = _exec_http_req(client, max_retry, 1, 
get_segment_index_file_size_cb);
                 if (!status.ok()) {
                     LOG(WARNING) << "failed to get segment file size from "
                                  << get_segment_index_file_size_url
@@ -430,8 +444,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
                 auto index_file = 
InvertedIndexDescriptor::get_index_file_name(local_segment_path);
                 segment_index_file_names.push_back(index_file);
 
-                status = HttpClient::execute_with_retry(max_retry, 1,
-                                                        
get_segment_index_file_size_cb);
+                status = _exec_http_req(client, max_retry, 1, 
get_segment_index_file_size_cb);
                 if (!status.ok()) {
                     LOG(WARNING) << "failed to get segment file size from "
                                  << get_segment_index_file_size_url
@@ -524,7 +537,7 @@ void _ingest_binlog(IngestBinlogArg* arg) {
                                                              
io::LocalFileSystem::PERMS_OWNER_RW);
         };
 
-        status = HttpClient::execute_with_retry(max_retry, 1, 
get_segment_index_file_cb);
+        status = _exec_http_req(client, max_retry, 1, 
get_segment_index_file_cb);
         if (!status.ok()) {
             LOG(WARNING) << "failed to get segment index file from " << 
get_segment_index_file_url
                          << ", status=" << status.to_string();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to