This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 8c92c97b0 chore(log): replace logging calls in server/* (#2928)
8c92c97b0 is described below

commit 8c92c97b09076ee0bcaae9a8b7a53ef608f80568
Author: Jensen <[email protected]>
AuthorDate: Wed May 7 00:14:11 2025 +0800

    chore(log): replace logging calls in server/* (#2928)
    
    Co-authored-by: Twice <[email protected]>
    Co-authored-by: Copilot <[email protected]>
---
 src/server/redis_connection.cc |  25 +++++-----
 src/server/redis_request.cc    |   2 +-
 src/server/server.cc           | 109 ++++++++++++++++++++++-------------------
 src/server/tls_util.cc         |  22 ++++-----
 src/server/worker.cc           |  38 +++++++-------
 5 files changed, 105 insertions(+), 91 deletions(-)

diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc
index aa21a5595..397810873 100644
--- a/src/server/redis_connection.cc
+++ b/src/server/redis_connection.cc
@@ -89,7 +89,7 @@ void Connection::OnRead([[maybe_unused]] struct bufferevent 
*bev) {
   if (!s.IsOK()) {
     EnableFlag(redis::Connection::kCloseAfterReply);
     Reply(redis::Error(s));
-    LOG(INFO) << "[connection] Failed to tokenize the request. Error: " << 
s.Msg();
+    info("[connection] Failed to tokenize the request. Error: {}", s.Msg());
     return;
   }
 
@@ -107,24 +107,26 @@ void Connection::OnWrite([[maybe_unused]] bufferevent 
*bev) {
 
 void Connection::OnEvent(bufferevent *bev, int16_t events) {
   if (events & BEV_EVENT_ERROR) {
-    LOG(ERROR) << "[connection] Going to remove the client: " << GetAddr()
-               << ", while encounter error: " << 
evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())
 #ifdef ENABLE_OPENSSL
-               << ", SSL Error: " << 
SSLError(bufferevent_get_openssl_error(bev))  // NOLINT
+    error("[connection] Removing client: {}, error: {}, SSL Error: {}", 
GetAddr(),
+          evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()),
+          fmt::streamed(SSLError(bufferevent_get_openssl_error(bev))));  // 
NOLINT
+#else
+    error("[connection] Removing client: {}, error: {}", GetAddr(),
+          evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
 #endif
-        ;  // NOLINT
     Close();
     return;
   }
 
   if (events & BEV_EVENT_EOF) {
-    DLOG(INFO) << "[connection] Going to remove the client: " << GetAddr() << 
", while closed by client";
+    debug("[connection] Going to remove the client: {}, while closed by 
client", GetAddr());
     Close();
     return;
   }
 
   if (events & BEV_EVENT_TIMEOUT) {
-    DLOG(INFO) << "[connection] The client: " << GetAddr() << "] reached 
timeout";
+    debug("[connection] The client: {} reached timeout", GetAddr());
     bufferevent_enable(bev, EV_READ | EV_WRITE);
   }
 }
@@ -387,8 +389,9 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> 
*to_process_cmds) {
     if (!cmd_s.IsOK()) {
       auto cmd_name = cmd_tokens.front();
       if (util::EqualICase(cmd_name, "host:") || util::EqualICase(cmd_name, 
"post")) {
-        LOG(WARNING) << "A likely HTTP request is detected in the RESP 
connection, indicating a potential "
-                        "Cross-Protocol Scripting attack. Connection aborted.";
+        warn(
+            "[connection] A likely HTTP request is detected in the RESP 
connection, indicating a potential "
+            "Cross-Protocol Scripting attack. Connection aborted.");
         EnableFlag(kCloseAsync);
         return;
       }
@@ -540,7 +543,7 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> 
*to_process_cmds) {
                     if (res.IsOK()) {
                       index_records.push_back(*res);
                     } else if (!res.Is<Status::NoPrefixMatched>() && 
!res.Is<Status::TypeMismatched>()) {
-                      LOG(WARNING) << "index recording failed for key: " << 
key;
+                      warn("[connection] index recording failed for key: {}", 
key);
                     }
                   },
                   args);
@@ -552,7 +555,7 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> 
*to_process_cmds) {
       for (const auto &record : index_records) {
         auto s = GlobalIndexer::Update(ctx, record);
         if (!s.IsOK() && !s.Is<Status::TypeMismatched>()) {
-          LOG(WARNING) << "index updating failed for key: " << record.key;
+          warn("[connection] index updating failed for key: {}", record.key);
         }
       }
     }
diff --git a/src/server/redis_request.cc b/src/server/redis_request.cc
index a216416c6..2d6399368 100644
--- a/src/server/redis_request.cc
+++ b/src/server/redis_request.cc
@@ -54,7 +54,7 @@ Status Request::Tokenize(evbuffer *input) {
 
         if (!line || line.length <= 0) {
           if (pipeline_size > 128) {
-            LOG(INFO) << "Large pipeline detected: " << pipeline_size;
+            info("[request] Large pipeline detected: {}", pipeline_size);
           }
           if (line) {
             continue;
diff --git a/src/server/server.cc b/src/server/server.cc
index fed86c734..09aef5147 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -105,10 +105,10 @@ Server::Server(engine::Storage *storage, Config *config)
     if (!config->unixsocket.empty() && i == 0) {
       Status s = worker->ListenUnixSocket(config->unixsocket, 
config->unixsocketperm, config->backlog);
       if (!s.IsOK()) {
-        LOG(ERROR) << "[server] Failed to listen on unix socket: " << 
config->unixsocket << ". Error: " << s.Msg();
+        error("[server] Failed to listen on unix socket: {}. Error: {}", 
config->unixsocket, s.Msg());
         exit(1);
       }
-      LOG(INFO) << "[server] Listening on unix socket: " << config->unixsocket;
+      info("[server] Listening on unix socket: {}", config->unixsocket);
     }
     
worker_threads_.emplace_back(std::make_unique<WorkerThread>(std::move(worker)));
   }
@@ -125,8 +125,8 @@ Server::~Server() {
   while (GetFetchFileThreadNum() != 0) {
     usleep(100000);
     if (++counter == 600) {
-      LOG(WARNING) << "[server] Will force destroy the server after waiting 
60s, leave " << GetFetchFileThreadNum()
-                   << " fetch file threads are still running";
+      warn("[server] Will force destroy the server after waiting 60s, leave {} 
fetch file threads are still running",
+           GetFetchFileThreadNum());
       break;
     }
   }
@@ -199,7 +199,7 @@ Status Server::Start() {
   }
 
   if (auto s = task_runner_.Start(); !s) {
-    LOG(WARNING) << "Failed to start task runner: " << s.Msg();
+    warn("Failed to start task runner: {}", s.Msg());
   }
   // setup server cron thread
   cron_thread_ = GET_OR_RET(util::CreateThread("server-cron", [this] { 
this->cron(); }));
@@ -239,7 +239,7 @@ Status Server::Start() {
   }));
 
   memory_startup_use_.store(Stats::GetMemoryRSS(), std::memory_order_relaxed);
-  LOG(INFO) << "[server] Ready to accept connections";
+  info("[server] Ready to accept connections");
 
   return Status::OK();
 }
@@ -261,13 +261,13 @@ void Server::Stop() {
 
 void Server::Join() {
   if (auto s = util::ThreadJoin(cron_thread_); !s) {
-    LOG(WARNING) << "Cron thread operation failed: " << s.Msg();
+    warn("Cron thread operation failed: {}", s.Msg());
   }
   if (auto s = util::ThreadJoin(compaction_checker_thread_); !s) {
-    LOG(WARNING) << "Compaction checker thread operation failed: " << s.Msg();
+    warn("Compaction checker thread operation failed: {}", s.Msg());
   }
   if (auto s = task_runner_.Join(); !s) {
-    LOG(WARNING) << s.Msg();
+    warn("{}", s.Msg());
   }
   for (const auto &worker : worker_threads_) {
     worker->Join();
@@ -298,7 +298,7 @@ Status Server::AddMaster(const std::string &host, uint32_t 
port, bool force_reco
                                       [this]() {
                                         this->is_loading_ = false;
                                         if (auto s = task_runner_.Start(); !s) 
{
-                                          LOG(WARNING) << "Failed to start 
task runner: " << s.Msg();
+                                          warn("Failed to start task runner: 
{}", s.Msg());
                                         }
                                       });
   if (s.IsOK()) {
@@ -672,7 +672,7 @@ void Server::WakeupBlockingConns(const std::string &key, 
size_t n_conns) {
     auto conn_ctx = iter->second.front();
     auto s = conn_ctx.owner->EnableWriteEvent(conn_ctx.fd);
     if (!s.IsOK()) {
-      LOG(ERROR) << "[server] Failed to enable write event on blocked client " 
<< conn_ctx.fd << ": " << s.Msg();
+      error("[server] Failed to enable write event on blocked client {}: {}", 
conn_ctx.fd, s.Msg());
     }
     iter->second.pop_front();
   }
@@ -691,8 +691,7 @@ void Server::OnEntryAddedToStream(const std::string &ns, 
const std::string &key,
     if (consumer->ns == ns && entry_id > consumer->last_consumed_id) {
       auto s = consumer->owner->EnableWriteEvent(consumer->fd);
       if (!s.IsOK()) {
-        LOG(ERROR) << "[server] Failed to enable write event on blocked stream 
consumer " << consumer->fd << ": "
-                   << s.Msg();
+        error("[server] Failed to enable write event on blocked stream 
consumer {}: {}", consumer->fd, s.Msg());
       }
       it = iter->second.erase(it);
     } else {
@@ -776,11 +775,11 @@ void Server::cron() {
       if (!config_->compaction_checker_cron.IsEnabled() && 
config_->compact_cron.IsEnabled() &&
           config_->compact_cron.IsTimeMatch(&now)) {
         Status s = AsyncCompactDB();
-        LOG(INFO) << "[server] Schedule to compact the db, result: " << 
s.Msg();
+        info("[server] Schedule to compact the db, result: {}", s.Msg());
       }
       if (config_->bgsave_cron.IsEnabled() && 
config_->bgsave_cron.IsTimeMatch(&now)) {
         Status s = AsyncBgSaveDB();
-        LOG(INFO) << "[server] Schedule to bgsave the db, result: " << s.Msg();
+        info("[server] Schedule to bgsave the db, result: {}", s.Msg());
       }
       if (config_->dbsize_scan_cron.IsEnabled() && 
config_->dbsize_scan_cron.IsTimeMatch(&now)) {
         auto tokens = namespace_.List();
@@ -797,7 +796,7 @@ void Server::cron() {
 
         for (auto &ns : namespaces) {
           Status s = AsyncScanDBSize(ns);
-          LOG(INFO) << "[server] Schedule to recalculate the db size on 
namespace: " << ns << ", result: " << s.Msg();
+          info("[server] Schedule to recalculate the db size on namespace: {}, 
result: {}", ns, s.Msg());
         }
       }
     }
@@ -824,9 +823,9 @@ void Server::cron() {
             (now_secs - create_time_secs > 24 * 60 * 60)) {
           auto s = rocksdb::DestroyDB(config_->checkpoint_dir, 
rocksdb::Options());
           if (!s.ok()) {
-            LOG(WARNING) << "[server] Fail to clean checkpoint, error: " << 
s.ToString();
+            warn("[server] Fail to clean checkpoint, error: {}", s.ToString());
           } else {
-            LOG(INFO) << "[server] Clean checkpoint successfully";
+            info("[server] Clean checkpoint successfully");
           }
         }
       }
@@ -840,9 +839,9 @@ void Server::cron() {
     if (counter != 0 && counter % 600 == 0 && 
storage->IsDBInRetryableIOError()) {
       auto s = storage->GetDB()->Resume();
       if (s.ok()) {
-        LOG(WARNING) << "[server] Successfully resumed DB after retryable IO 
error";
+        warn("[server] Successfully resumed DB after retryable IO error");
       } else {
-        LOG(ERROR) << "[server] Failed to resume DB after retryable IO error: 
" << s.ToString();
+        error("[server] Failed to resume DB after retryable IO error: {}", 
s.ToString());
       }
       storage->SetDBInRetryableIOError(false);
     }
@@ -1339,7 +1338,7 @@ std::string Server::GetRocksDBStatsJson() const {
 // guarantee other threads don't access DB and its column families, then close 
db.
 bool Server::PrepareRestoreDB() {
   // Stop feeding slaves thread
-  LOG(INFO) << "[server] Disconnecting slaves...";
+  info("[server] Disconnecting slaves...");
   DisconnectSlaves();
 
   // If the DB is restored, the object 'db_' will be destroyed, but
@@ -1355,7 +1354,7 @@ bool Server::PrepareRestoreDB() {
   // To guarantee work threads don't access DB, we should release 
'ExclusivityGuard'
   // ASAP to avoid user can't receive responses for long time, because the 
following
   // 'CloseDB' may cost much time to acquire DB mutex.
-  LOG(INFO) << "[server] Waiting workers for finishing executing commands...";
+  info("[server] Waiting workers for finishing executing commands...");
   while (!works_concurrency_rw_lock_.try_lock()) {
     if (replication_thread_->IsStopped()) {
       is_loading_ = false;
@@ -1366,22 +1365,22 @@ bool Server::PrepareRestoreDB() {
   works_concurrency_rw_lock_.unlock();
 
   // Stop task runner
-  LOG(INFO) << "[server] Stopping the task runner and clear task queue...";
+  info("[server] Stopping the task runner and clear task queue...");
   task_runner_.Cancel();
   if (auto s = task_runner_.Join(); !s) {
-    LOG(WARNING) << "[server] " << s.Msg();
+    warn("[server] {}", s.Msg());
   }
 
   // Cron thread, compaction checker thread, full synchronization thread
   // may always run in the background, we need to close db, so they don't 
actually work.
-  LOG(INFO) << "[server] Waiting for closing DB...";
+  info("[server] Waiting for closing DB...");
   storage->CloseDB();
   return true;
 }
 
 void Server::WaitNoMigrateProcessing() {
   if (config_->cluster_enabled) {
-    LOG(INFO) << "[server] Waiting until no migration task is running...";
+    info("[server] Waiting until no migration task is running...");
     slot_migrator->SetStopMigrationFlag(true);
     while (slot_migrator->GetCurrentSlotMigrationStage() != 
SlotMigrationStage::kNone) {
       usleep(500);
@@ -1408,7 +1407,7 @@ Status Server::AsyncCompactDB(const std::string 
&begin_key, const std::string &e
 
     auto s = storage->Compact(nullptr, begin.get(), end.get());
     if (!s.ok()) {
-      LOG(ERROR) << "[task runner] Failed to do compaction: " << s.ToString();
+      error("[task runner] Failed to do compaction: {}", s.ToString());
     }
 
     std::lock_guard<std::mutex> lg(db_job_mu_);
@@ -1463,7 +1462,7 @@ Status Server::AsyncScanDBSize(const std::string &ns) {
     engine::Context ctx(storage);
     auto s = db.GetKeyNumStats(ctx, "", &stats);
     if (!s.ok()) {
-      LOG(ERROR) << "failed to retrieve key num stats: " << s.ToString();
+      error("failed to retrieve key num stats: {}", s.ToString());
     }
 
     std::lock_guard<std::mutex> lg(db_job_mu_);
@@ -1517,9 +1516,10 @@ Status Server::autoResizeBlockAndSST() {
   if (target_file_size_base != config_->rocks_db.target_file_size_base) {
     auto old_target_file_size_base = config_->rocks_db.target_file_size_base;
     auto s = config_->Set(this, "rocksdb.target_file_size_base", 
std::to_string(target_file_size_base));
-    LOG(INFO) << "[server] Resize rocksdb.target_file_size_base from " << 
old_target_file_size_base << " to "
-              << target_file_size_base << ", average_kv_size: " << 
average_kv_size << ", total_size: " << total_size
-              << ", total_keys: " << total_keys << ", result: " << s.Msg();
+    info(
+        "[server] Resize rocksdb.target_file_size_base from {} to {}, "
+        "average_kv_size: {}, total_size: {}, total_keys: {}, result: {}",
+        old_target_file_size_base, target_file_size_base, average_kv_size, 
total_size, total_keys, s.Msg());
     if (!s.IsOK()) {
       return s;
     }
@@ -1528,9 +1528,11 @@ Status Server::autoResizeBlockAndSST() {
   if (target_file_size_base != config_->rocks_db.write_buffer_size) {
     auto old_write_buffer_size = config_->rocks_db.write_buffer_size;
     auto s = config_->Set(this, "rocksdb.write_buffer_size", 
std::to_string(target_file_size_base));
-    LOG(INFO) << "[server] Resize rocksdb.write_buffer_size from " << 
old_write_buffer_size << " to "
-              << target_file_size_base << ", average_kv_size: " << 
average_kv_size << ", total_size: " << total_size
-              << ", total_keys: " << total_keys << ", result: " << s.Msg();
+    info(
+        "[server] Resize rocksdb.write_buffer_size from {} to {}, "
+        "average_kv_size: {}, total_size: {}, "
+        "total_keys: {}, result: {}",
+        old_write_buffer_size, target_file_size_base, average_kv_size, 
total_size, total_keys, s.Msg());
     if (!s.IsOK()) {
       return s;
     }
@@ -1538,9 +1540,11 @@ Status Server::autoResizeBlockAndSST() {
 
   if (block_size != config_->rocks_db.block_size) {
     auto s = 
storage->SetOptionForAllColumnFamilies("table_factory.block_size", 
std::to_string(block_size));
-    LOG(INFO) << "[server] Resize rocksdb.block_size from " << 
config_->rocks_db.block_size << " to " << block_size
-              << ", average_kv_size: " << average_kv_size << ", total_size: " 
<< total_size
-              << ", total_keys: " << total_keys << ", result: " << s.Msg();
+    info(
+        "[server] Resize rocksdb.block_size from {} to {}, "
+        "average_kv_size: {}, total_size: {}, "
+        "total_keys: {}, result: {}",
+        config_->rocks_db.block_size, block_size, average_kv_size, total_size, 
total_keys, s.Msg());
     if (!s.IsOK()) {
       return s;
     }
@@ -1549,7 +1553,7 @@ Status Server::autoResizeBlockAndSST() {
   }
 
   auto s = config_->Rewrite(namespace_.List());
-  LOG(INFO) << "[server] Rewrite config, result: " << s.Msg();
+  info("[server] Rewrite config, result: {}", s.Msg());
 
   return Status::OK();
 }
@@ -1671,8 +1675,7 @@ void Server::KillClient(int64_t *killed, const 
std::string &addr, uint64_t id, u
       (type & kTypeMaster || (!addr.empty() && addr == master_host_ + ":" + 
std::to_string(master_port_)))) {
     // Stop replication thread and start a new one to replicate
     if (auto s = AddMaster(master_host_, master_port_, true); !s.IsOK()) {
-      LOG(ERROR) << "[server] Failed to add master " << master_host_ << ":" << 
master_port_
-                 << " with error: " << s.Msg();
+      error("[server] Failed to add master {}:{} with error: {}", 
master_host_, master_port_, s.Msg());
     }
     (*killed)++;
   }
@@ -1850,18 +1853,24 @@ void Server::AdjustOpenFilesLimit() {
 
   if (best_limit < max_files) {
     if (best_limit <= static_cast<int>(min_reserved_fds)) {
-      LOG(WARNING) << "[server] Your current 'ulimit -n' of " << old_limit << 
" is not enough for the server to start."
-                   << "Please increase your open file limit to at least " << 
max_files << ". Exiting.";
+      warn(
+          "[server] Your current 'ulimit -n' of {} is not enough for the 
server to start. "
+          "Please increase your open file limit to at least {}. Exiting.",
+          old_limit, max_files);
       exit(1);
     }
 
-    LOG(WARNING) << "[server] You requested max clients of " << max_clients << 
" and RocksDB max open files of "
-                 << rocksdb_max_open_file << " requiring at least " << 
max_files << " max file descriptors.";
-    LOG(WARNING) << "[server] Server can't set maximum open files to " << 
max_files
-                 << " because of OS error: " << strerror(setrlimit_error);
+    warn(
+        "[server] You requested max clients of {} and RocksDB max open files 
of {} "
+        "requiring at least {} max file descriptors.",
+        max_clients, rocksdb_max_open_file, max_files);
+
+    warn(
+        "[server] Server can't set maximum open files to {} "
+        "because of OS error: {}",
+        max_files, strerror(setrlimit_error));
   } else {
-    LOG(WARNING) << "[server] Increased maximum number of open files to " << 
max_files << " (it's originally set to "
-                 << old_limit << ")";
+    warn("[server] Increased maximum number of open files to {} (it's 
originally set to {})", max_files, old_limit);
   }
 }
 
@@ -1874,12 +1883,12 @@ void Server::AdjustWorkerThreads() {
   if (new_worker_threads > worker_threads_.size()) {
     delta = new_worker_threads - worker_threads_.size();
     increaseWorkerThreads(delta);
-    LOG(INFO) << "[server] Increase worker threads from " << 
worker_threads_.size() << " to " << new_worker_threads;
+    info("[server] Increase worker threads from {} to {}", 
worker_threads_.size(), new_worker_threads);
     return;
   }
 
   delta = worker_threads_.size() - new_worker_threads;
-  LOG(INFO) << "[server] Decrease worker threads from " << 
worker_threads_.size() << " to " << new_worker_threads;
+  info("[server] Decrease worker threads from {} to {}", 
worker_threads_.size(), new_worker_threads);
   decreaseWorkerThreads(delta);
 }
 
diff --git a/src/server/tls_util.cc b/src/server/tls_util.cc
index 3767a32a4..95a7a3ab6 100644
--- a/src/server/tls_util.cc
+++ b/src/server/tls_util.cc
@@ -45,7 +45,7 @@ void InitSSL() {
   ERR_load_crypto_strings();
 
   if (!RAND_poll()) {
-    LOG(ERROR) << "OpenSSL failed to generate random seed";
+    error("OpenSSL failed to generate random seed");
     exit(1);
   }
 
@@ -117,19 +117,19 @@ StatusOr<unsigned long> ParseSSLProtocols(const 
std::string &protocols) {  // NO
 
 UniqueSSLContext CreateSSLContext(const Config *config, const SSL_METHOD 
*method) {
   if (config->tls_cert_file.empty() || config->tls_key_file.empty()) {
-    LOG(ERROR) << "Both tls-cert-file and tls-key-file must be specified while 
TLS is enabled";
+    error("Both tls-cert-file and tls-key-file must be specified while TLS is 
enabled");
     return nullptr;
   }
 
   auto ssl_ctx = UniqueSSLContext(method);
   if (!ssl_ctx) {
-    LOG(ERROR) << "Failed to construct SSL context: " << SSLErrors{};
+    error("Failed to construct SSL context: {}", fmt::streamed(SSLErrors{}));
     return nullptr;
   }
 
   auto proto_status = ParseSSLProtocols(config->tls_protocols);
   if (!proto_status) {
-    LOG(ERROR) << proto_status.Msg();
+    error("{}", proto_status.Msg());
     return nullptr;
   }
 
@@ -176,16 +176,16 @@ UniqueSSLContext CreateSSLContext(const Config *config, 
const SSL_METHOD *method
   auto ca_dir = config->tls_ca_cert_dir.empty() ? nullptr : 
config->tls_ca_cert_dir.c_str();
   if (ca_file || ca_dir) {
     if (SSL_CTX_load_verify_locations(ssl_ctx.get(), ca_file, ca_dir) != 1) {
-      LOG(ERROR) << "Failed to load CA certificates: " << SSLErrors{};
+      error("Failed to load CA certificates: {}", fmt::streamed(SSLErrors{}));
       return nullptr;
     }
   } else if (config->tls_auth_clients != TLS_AUTH_CLIENTS_NO) {
-    LOG(ERROR) << "Either tls-ca-cert-file or tls-ca-cert-dir must be 
specified while tls-auth-clients is enabled";
+    error("Either tls-ca-cert-file or tls-ca-cert-dir must be specified while 
tls-auth-clients is enabled");
     return nullptr;
   }
 
   if (SSL_CTX_use_certificate_chain_file(ssl_ctx.get(), 
config->tls_cert_file.c_str()) != 1) {
-    LOG(ERROR) << "Failed to load SSL certificate file: " << SSLErrors{};
+    error("Failed to load SSL certificate file: {}", 
fmt::streamed(SSLErrors{}));
     return nullptr;
   }
 
@@ -199,23 +199,23 @@ UniqueSSLContext CreateSSLContext(const Config *config, 
const SSL_METHOD *method
   }
 
   if (SSL_CTX_use_PrivateKey_file(ssl_ctx.get(), config->tls_key_file.c_str(), 
SSL_FILETYPE_PEM) != 1) {
-    LOG(ERROR) << "Failed to load SSL private key file: " << SSLErrors{};
+    error("Failed to load SSL private key file: {}", 
fmt::streamed(SSLErrors{}));
     return nullptr;
   }
 
   if (SSL_CTX_check_private_key(ssl_ctx.get()) != 1) {
-    LOG(ERROR) << "Failed to check the loaded private key: " << SSLErrors{};
+    error("Failed to check the loaded private key: {}", 
fmt::streamed(SSLErrors{}));
     return nullptr;
   }
 
   if (!config->tls_ciphers.empty() && !SSL_CTX_set_cipher_list(ssl_ctx.get(), 
config->tls_ciphers.c_str())) {
-    LOG(ERROR) << "Failed to set SSL ciphers: " << SSLErrors{};
+    error("Failed to set SSL ciphers: {}", fmt::streamed(SSLErrors{}));
     return nullptr;
   }
 
 #ifdef SSL_OP_NO_TLSv1_3
   if (!config->tls_ciphersuites.empty() && 
!SSL_CTX_set_ciphersuites(ssl_ctx.get(), config->tls_ciphersuites.c_str())) {
-    LOG(ERROR) << "Failed to set SSL ciphersuites: " << SSLErrors{};
+    error("Failed to set SSL ciphersuites: {}", fmt::streamed(SSLErrors{}));
     return nullptr;
   }
 #endif
diff --git a/src/server/worker.cc b/src/server/worker.cc
index 2984c7686..e65467f2c 100644
--- a/src/server/worker.cc
+++ b/src/server/worker.cc
@@ -23,6 +23,7 @@
 #include <event2/util.h>
 #include <unistd.h>
 
+#include <cstdint>
 #include <stdexcept>
 #include <string>
 
@@ -60,7 +61,7 @@ Worker::Worker(Server *srv, Config *config) : srv(srv), 
base_(event_base_new())
 
   if (config->socket_fd != -1) {
     if (const Status s = listenFD(config->socket_fd, config->port, 
config->backlog); !s.IsOK()) {
-      LOG(ERROR) << "[worker] Failed to listen to socket with fd: " << 
config->socket_fd << ". Error: " << s.Msg();
+      error("[worker] Failed to listen to socket with fd: {}, Error: {}", 
config->socket_fd, s.Msg());
       exit(1);
     }
   } else {
@@ -69,10 +70,10 @@ Worker::Worker(Server *srv, Config *config) : srv(srv), 
base_(event_base_new())
     for (const uint32_t *port = ports; *port; ++port) {
       for (const auto &bind : config->binds) {
         if (const Status s = listenTCP(bind, *port, config->backlog); 
!s.IsOK()) {
-          LOG(ERROR) << "[worker] Failed to listen on: " << bind << ":" << 
*port << ". Error: " << s.Msg();
+          error("[worker] Failed to listen on: {}:{}, Error: {}", bind, *port, 
s.Msg());
           exit(1);
         }
-        LOG(INFO) << "[worker] Listening on: " << bind << ":" << *port;
+        info("[worker] Listening on: {}:{}", bind, *port);
       }
     }
   }
@@ -113,18 +114,18 @@ void Worker::TimerCB(int, [[maybe_unused]] int16_t 
events) {
 void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, 
[[maybe_unused]] sockaddr *address,
                               [[maybe_unused]] int socklen) {
   int local_port = util::GetLocalPort(fd);  // NOLINT
-  DLOG(INFO) << "[worker] New connection: fd=" << fd << " from port: " << 
local_port << " thread #" << tid_;
+  debug("[worker] New connection: fd={} from port: {} thread #{}", fd, 
local_port, fmt::streamed(tid_));
 
   auto s = util::SockSetTcpKeepalive(fd, 120);
   if (!s.IsOK()) {
-    LOG(ERROR) << "[worker] Failed to set tcp-keepalive on socket. Error: " << 
s.Msg();
+    error("[worker] Failed to set tcp-keepalive on socket. Error: {}", 
s.Msg());
     evutil_closesocket(fd);
     return;
   }
 
   s = util::SockSetTcpNoDelay(fd, 1);
   if (!s.IsOK()) {
-    LOG(ERROR) << "[worker] Failed to set tcp-nodelay on socket. Error: " << 
s.Msg();
+    error("[worker] Failed to set tcp-nodelay on socket. Error: {}", s.Msg());
     evutil_closesocket(fd);
     return;
   }
@@ -139,7 +140,7 @@ void Worker::newTCPConnection(evconnlistener *listener, 
evutil_socket_t fd, [[ma
   if (uint32_t(local_port) == srv->GetConfig()->tls_port) {
     ssl = SSL_new(srv->ssl_ctx.get());
     if (!ssl) {
-      LOG(ERROR) << "Failed to construct SSL structure for new connection: " 
<< SSLErrors{};
+      error("[worker] Failed to construct SSL structure for new connection: 
{}", fmt::streamed(SSLErrors{}));
       evutil_closesocket(fd);
       return;
     }
@@ -153,10 +154,11 @@ void Worker::newTCPConnection(evconnlistener *listener, 
evutil_socket_t fd, [[ma
   if (!bev) {
     auto socket_err = evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR());
 #ifdef ENABLE_OPENSSL
-    LOG(ERROR) << "Failed to construct socket for new connection: " << 
socket_err << ", SSL error: " << SSLErrors{};
+    error("[worker] Failed to construct socket for new connection: {}, SSL 
error: {}", socket_err,
+          fmt::streamed(SSLErrors{}));
     if (ssl) SSL_free(ssl);
 #else
-    LOG(ERROR) << "Failed to construct socket for new connection: " << 
socket_err;
+    error("[worker] Failed to construct socket for new connection: {}", 
socket_err);
 #endif
     evutil_closesocket(fd);
     return;
@@ -175,7 +177,7 @@ void Worker::newTCPConnection(evconnlistener *listener, 
evutil_socket_t fd, [[ma
     std::string err_msg = redis::Error({Status::NotOK, s.Msg()});
     s = util::SockSend(fd, err_msg, ssl);
     if (!s.IsOK()) {
-      LOG(WARNING) << "Failed to send error response to socket: " << s.Msg();
+      warn("[worker] Failed to send error response to socket: {}", s.Msg());
     }
     conn->Close();
     return;
@@ -193,8 +195,8 @@ void Worker::newTCPConnection(evconnlistener *listener, 
evutil_socket_t fd, [[ma
 
 void Worker::newUnixSocketConnection(evconnlistener *listener, evutil_socket_t 
fd, [[maybe_unused]] sockaddr *address,
                                      [[maybe_unused]] int socklen) {
-  DLOG(INFO) << "[worker] New connection: fd=" << fd << " from unixsocket: " 
<< srv->GetConfig()->unixsocket
-             << " thread #" << tid_;
+  debug("[worker] New connection: fd={} from unixsocket: {} thread #{}", fd, 
srv->GetConfig()->unixsocket,
+        fmt::streamed(tid_));
   event_base *base = evconnlistener_get_base(listener);
   auto ev_thread_safe_flags =
       BEV_OPT_THREADSAFE | BEV_OPT_DEFER_CALLBACKS | BEV_OPT_UNLOCK_CALLBACKS 
| BEV_OPT_CLOSE_ON_FREE;
@@ -208,7 +210,7 @@ void Worker::newUnixSocketConnection(evconnlistener 
*listener, evutil_socket_t f
   if (!s.IsOK()) {
     s = util::SockSend(fd, redis::Error(s));
     if (!s.IsOK()) {
-      LOG(WARNING) << "Failed to send error response to socket: " << s.Msg();
+      warn("[worker] Failed to send error response to socket: {}", s.Msg());
     }
     conn->Close();
     return;
@@ -232,7 +234,7 @@ Status Worker::listenFD(int fd, uint32_t expected_port, int 
backlog) {
   evconnlistener *lev =
       NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_THREADSAFE | 
LEV_OPT_CLOSE_ON_FREE, backlog, dup_fd);
   listen_events_.emplace_back(lev);
-  LOG(INFO) << "Listening on dup'ed fd: " << dup_fd;
+  info("[worker] Listening on dup'ed fd: {}", dup_fd);
   return Status::OK();
 }
 
@@ -312,7 +314,7 @@ Status Worker::ListenUnixSocket(const std::string &path, 
int perm, int backlog)
 void Worker::Run(std::thread::id tid) {
   tid_ = tid;
   if (event_base_dispatch(base_) != 0) {
-    LOG(ERROR) << "[worker] Failed to run server, err: " << strerror(errno);
+    error("[worker] Failed to run server, err: {}", strerror(errno));
   }
   is_terminated_ = true;
 }
@@ -586,17 +588,17 @@ void WorkerThread::Start() {
   if (s) {
     t_ = std::move(*s);
   } else {
-    LOG(ERROR) << "[worker] Failed to start worker thread, err: " << s.Msg();
+    error("[worker] Failed to start worker thread, err: {}", s.Msg());
     return;
   }
 
-  LOG(INFO) << "[worker] Thread #" << t_.get_id() << " started";
+  info("[worker] Thread #{} started", fmt::streamed(t_.get_id()));
 }
 
 void WorkerThread::Stop(uint32_t wait_seconds) { worker_->Stop(wait_seconds); }
 
 void WorkerThread::Join() {
   if (auto s = util::ThreadJoin(t_); !s) {
-    LOG(WARNING) << "[worker] " << s.Msg();
+    warn("[worker] {}", s.Msg());
   }
 }

Reply via email to