This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 31d4a4a5 refactor(rest): switch HttpClient to use connection pool 
(#530)
31d4a4a5 is described below

commit 31d4a4a5808b5868f2636d96e6025b6dea16e2bd
Author: Feiyang Li <[email protected]>
AuthorDate: Mon Jan 26 22:48:09 2026 +0800

    refactor(rest): switch HttpClient to use connection pool (#530)
---
 src/iceberg/catalog/rest/http_client.cc | 97 +++++++--------------------------
 src/iceberg/catalog/rest/http_client.h  | 14 +----
 2 files changed, 23 insertions(+), 88 deletions(-)

diff --git a/src/iceberg/catalog/rest/http_client.cc 
b/src/iceberg/catalog/rest/http_client.cc
index 84d458b9..41be14ce 100644
--- a/src/iceberg/catalog/rest/http_client.cc
+++ b/src/iceberg/catalog/rest/http_client.cc
@@ -134,41 +134,9 @@ Status HandleFailureResponse(const cpr::Response& response,
 
 }  // namespace
 
-void HttpClient::PrepareSession(
-    const std::string& path, HttpMethod method,
-    const std::unordered_map<std::string, std::string>& params,
-    const std::unordered_map<std::string, std::string>& headers) {
-  session_->SetUrl(cpr::Url{path});
-  session_->SetParameters(GetParameters(params));
-  session_->RemoveContent();
-  // clear lingering POST mode state from prior requests. CURLOPT_POST is 
implicitly set
-  // to 1 by POST requests, and this state is not reset by RemoveContent(), so 
we must
-  // manually enforce HTTP GET to clear it.
-  curl_easy_setopt(session_->GetCurlHolder()->handle, CURLOPT_HTTPGET, 1L);
-  switch (method) {
-    case HttpMethod::kGet:
-      session_->PrepareGet();
-      break;
-    case HttpMethod::kPost:
-      session_->PreparePost();
-      break;
-    case HttpMethod::kPut:
-      session_->PreparePut();
-      break;
-    case HttpMethod::kDelete:
-      session_->PrepareDelete();
-      break;
-    case HttpMethod::kHead:
-      session_->PrepareHead();
-      break;
-  }
-  auto final_headers = MergeHeaders(default_headers_, headers);
-  session_->SetHeader(final_headers);
-}
-
 HttpClient::HttpClient(std::unordered_map<std::string, std::string> 
default_headers)
     : default_headers_{std::move(default_headers)},
-      session_{std::make_unique<cpr::Session>()} {
+      connection_pool_{std::make_unique<cpr::ConnectionPool>()} {
   // Set default Content-Type for all requests (including GET/HEAD/DELETE).
   // Many systems require that content type is set regardless and will fail,
   // even on an empty bodied request.
@@ -182,12 +150,9 @@ Result<HttpResponse> HttpClient::Get(
     const std::string& path, const std::unordered_map<std::string, 
std::string>& params,
     const std::unordered_map<std::string, std::string>& headers,
     const ErrorHandler& error_handler) {
-  cpr::Response response;
-  {
-    std::lock_guard guard(session_mutex_);
-    PrepareSession(path, HttpMethod::kGet, params, headers);
-    response = session_->Get();
-  }
+  auto final_headers = MergeHeaders(default_headers_, headers);
+  cpr::Response response =
+      cpr::Get(cpr::Url{path}, GetParameters(params), final_headers, 
*connection_pool_);
 
   ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
   HttpResponse http_response;
@@ -199,13 +164,9 @@ Result<HttpResponse> HttpClient::Post(
     const std::string& path, const std::string& body,
     const std::unordered_map<std::string, std::string>& headers,
     const ErrorHandler& error_handler) {
-  cpr::Response response;
-  {
-    std::lock_guard guard(session_mutex_);
-    PrepareSession(path, HttpMethod::kPost, /*params=*/{}, headers);
-    session_->SetBody(cpr::Body{body});
-    response = session_->Post();
-  }
+  auto final_headers = MergeHeaders(default_headers_, headers);
+  cpr::Response response =
+      cpr::Post(cpr::Url{path}, cpr::Body{body}, final_headers, 
*connection_pool_);
 
   ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
   HttpResponse http_response;
@@ -218,25 +179,16 @@ Result<HttpResponse> HttpClient::PostForm(
     const std::unordered_map<std::string, std::string>& form_data,
     const std::unordered_map<std::string, std::string>& headers,
     const ErrorHandler& error_handler) {
-  cpr::Response response;
-
-  {
-    std::lock_guard guard(session_mutex_);
-
-    // Override default Content-Type (application/json) with form-urlencoded
-    auto form_headers = headers;
-    form_headers[kHeaderContentType] = kMimeTypeFormUrlEncoded;
-
-    PrepareSession(path, HttpMethod::kPost, /*params=*/{}, form_headers);
-    std::vector<cpr::Pair> pair_list;
-    pair_list.reserve(form_data.size());
-    for (const auto& [key, val] : form_data) {
-      pair_list.emplace_back(key, val);
-    }
-    session_->SetPayload(cpr::Payload(pair_list.begin(), pair_list.end()));
-
-    response = session_->Post();
+  auto final_headers = MergeHeaders(default_headers_, headers);
+  final_headers.insert_or_assign(kHeaderContentType, kMimeTypeFormUrlEncoded);
+  std::vector<cpr::Pair> pair_list;
+  pair_list.reserve(form_data.size());
+  for (const auto& [key, val] : form_data) {
+    pair_list.emplace_back(key, val);
   }
+  cpr::Response response =
+      cpr::Post(cpr::Url{path}, cpr::Payload(pair_list.begin(), 
pair_list.end()),
+                final_headers, *connection_pool_);
 
   ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
   HttpResponse http_response;
@@ -247,12 +199,8 @@ Result<HttpResponse> HttpClient::PostForm(
 Result<HttpResponse> HttpClient::Head(
     const std::string& path, const std::unordered_map<std::string, 
std::string>& headers,
     const ErrorHandler& error_handler) {
-  cpr::Response response;
-  {
-    std::lock_guard guard(session_mutex_);
-    PrepareSession(path, HttpMethod::kHead, /*params=*/{}, headers);
-    response = session_->Head();
-  }
+  auto final_headers = MergeHeaders(default_headers_, headers);
+  cpr::Response response = cpr::Head(cpr::Url{path}, final_headers, 
*connection_pool_);
 
   ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
   HttpResponse http_response;
@@ -264,12 +212,9 @@ Result<HttpResponse> HttpClient::Delete(
     const std::string& path, const std::unordered_map<std::string, 
std::string>& params,
     const std::unordered_map<std::string, std::string>& headers,
     const ErrorHandler& error_handler) {
-  cpr::Response response;
-  {
-    std::lock_guard guard(session_mutex_);
-    PrepareSession(path, HttpMethod::kDelete, params, headers);
-    response = session_->Delete();
-  }
+  auto final_headers = MergeHeaders(default_headers_, headers);
+  cpr::Response response = cpr::Delete(cpr::Url{path}, GetParameters(params),
+                                       final_headers, *connection_pool_);
 
   ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler));
   HttpResponse http_response;
diff --git a/src/iceberg/catalog/rest/http_client.h 
b/src/iceberg/catalog/rest/http_client.h
index 84f8e590..38f902e4 100644
--- a/src/iceberg/catalog/rest/http_client.h
+++ b/src/iceberg/catalog/rest/http_client.h
@@ -21,11 +21,9 @@
 
 #include <cstdint>
 #include <memory>
-#include <mutex>
 #include <string>
 #include <unordered_map>
 
-#include "iceberg/catalog/rest/endpoint.h"
 #include "iceberg/catalog/rest/iceberg_rest_export.h"
 #include "iceberg/catalog/rest/type_fwd.h"
 #include "iceberg/result.h"
@@ -34,7 +32,7 @@
 /// \brief Http client for Iceberg REST API.
 
 namespace cpr {
-class Session;
+class ConnectionPool;
 }  // namespace cpr
 
 namespace iceberg::rest {
@@ -110,16 +108,8 @@ class ICEBERG_REST_EXPORT HttpClient {
                               const ErrorHandler& error_handler);
 
  private:
-  void PrepareSession(const std::string& path, HttpMethod method,
-                      const std::unordered_map<std::string, std::string>& 
params,
-                      const std::unordered_map<std::string, std::string>& 
headers);
-
   std::unordered_map<std::string, std::string> default_headers_;
-
-  // TODO(Li Feiyang): use connection pool to support external multi-threaded 
concurrent
-  // calls
-  std::unique_ptr<cpr::Session> session_;
-  mutable std::mutex session_mutex_;
+  std::unique_ptr<cpr::ConnectionPool> connection_pool_;
 };
 
 }  // namespace iceberg::rest

Reply via email to