This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 8c92c97b0 chore(log): replace logging calls in server/* (#2928)
8c92c97b0 is described below
commit 8c92c97b09076ee0bcaae9a8b7a53ef608f80568
Author: Jensen <[email protected]>
AuthorDate: Wed May 7 00:14:11 2025 +0800
chore(log): replace logging calls in server/* (#2928)
Co-authored-by: Twice <[email protected]>
Co-authored-by: Copilot <[email protected]>
---
src/server/redis_connection.cc | 25 +++++-----
src/server/redis_request.cc | 2 +-
src/server/server.cc | 109 ++++++++++++++++++++++-------------------
src/server/tls_util.cc | 22 ++++-----
src/server/worker.cc | 38 +++++++-------
5 files changed, 105 insertions(+), 91 deletions(-)
diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc
index aa21a5595..397810873 100644
--- a/src/server/redis_connection.cc
+++ b/src/server/redis_connection.cc
@@ -89,7 +89,7 @@ void Connection::OnRead([[maybe_unused]] struct bufferevent
*bev) {
if (!s.IsOK()) {
EnableFlag(redis::Connection::kCloseAfterReply);
Reply(redis::Error(s));
- LOG(INFO) << "[connection] Failed to tokenize the request. Error: " <<
s.Msg();
+ info("[connection] Failed to tokenize the request. Error: {}", s.Msg());
return;
}
@@ -107,24 +107,26 @@ void Connection::OnWrite([[maybe_unused]] bufferevent
*bev) {
void Connection::OnEvent(bufferevent *bev, int16_t events) {
if (events & BEV_EVENT_ERROR) {
- LOG(ERROR) << "[connection] Going to remove the client: " << GetAddr()
- << ", while encounter error: " <<
evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())
#ifdef ENABLE_OPENSSL
- << ", SSL Error: " <<
SSLError(bufferevent_get_openssl_error(bev)) // NOLINT
+ error("[connection] Removing client: {}, error: {}, SSL Error: {}",
GetAddr(),
+ evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()),
+ fmt::streamed(SSLError(bufferevent_get_openssl_error(bev)))); //
NOLINT
+#else
+ error("[connection] Removing client: {}, error: {}", GetAddr(),
+ evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
#endif
- ; // NOLINT
Close();
return;
}
if (events & BEV_EVENT_EOF) {
- DLOG(INFO) << "[connection] Going to remove the client: " << GetAddr() <<
", while closed by client";
+ debug("[connection] Going to remove the client: {}, while closed by
client", GetAddr());
Close();
return;
}
if (events & BEV_EVENT_TIMEOUT) {
- DLOG(INFO) << "[connection] The client: " << GetAddr() << "] reached
timeout";
+ debug("[connection] The client: {} reached timeout", GetAddr());
bufferevent_enable(bev, EV_READ | EV_WRITE);
}
}
@@ -387,8 +389,9 @@ void Connection::ExecuteCommands(std::deque<CommandTokens>
*to_process_cmds) {
if (!cmd_s.IsOK()) {
auto cmd_name = cmd_tokens.front();
if (util::EqualICase(cmd_name, "host:") || util::EqualICase(cmd_name,
"post")) {
- LOG(WARNING) << "A likely HTTP request is detected in the RESP
connection, indicating a potential "
- "Cross-Protocol Scripting attack. Connection aborted.";
+ warn(
+ "[connection] A likely HTTP request is detected in the RESP
connection, indicating a potential "
+ "Cross-Protocol Scripting attack. Connection aborted.");
EnableFlag(kCloseAsync);
return;
}
@@ -540,7 +543,7 @@ void Connection::ExecuteCommands(std::deque<CommandTokens>
*to_process_cmds) {
if (res.IsOK()) {
index_records.push_back(*res);
} else if (!res.Is<Status::NoPrefixMatched>() &&
!res.Is<Status::TypeMismatched>()) {
- LOG(WARNING) << "index recording failed for key: " <<
key;
+ warn("[connection] index recording failed for key: {}",
key);
}
},
args);
@@ -552,7 +555,7 @@ void Connection::ExecuteCommands(std::deque<CommandTokens>
*to_process_cmds) {
for (const auto &record : index_records) {
auto s = GlobalIndexer::Update(ctx, record);
if (!s.IsOK() && !s.Is<Status::TypeMismatched>()) {
- LOG(WARNING) << "index updating failed for key: " << record.key;
+ warn("[connection] index updating failed for key: {}", record.key);
}
}
}
diff --git a/src/server/redis_request.cc b/src/server/redis_request.cc
index a216416c6..2d6399368 100644
--- a/src/server/redis_request.cc
+++ b/src/server/redis_request.cc
@@ -54,7 +54,7 @@ Status Request::Tokenize(evbuffer *input) {
if (!line || line.length <= 0) {
if (pipeline_size > 128) {
- LOG(INFO) << "Large pipeline detected: " << pipeline_size;
+ info("[request] Large pipeline detected: {}", pipeline_size);
}
if (line) {
continue;
diff --git a/src/server/server.cc b/src/server/server.cc
index fed86c734..09aef5147 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -105,10 +105,10 @@ Server::Server(engine::Storage *storage, Config *config)
if (!config->unixsocket.empty() && i == 0) {
Status s = worker->ListenUnixSocket(config->unixsocket,
config->unixsocketperm, config->backlog);
if (!s.IsOK()) {
- LOG(ERROR) << "[server] Failed to listen on unix socket: " <<
config->unixsocket << ". Error: " << s.Msg();
+ error("[server] Failed to listen on unix socket: {}. Error: {}",
config->unixsocket, s.Msg());
exit(1);
}
- LOG(INFO) << "[server] Listening on unix socket: " << config->unixsocket;
+ info("[server] Listening on unix socket: {}", config->unixsocket);
}
worker_threads_.emplace_back(std::make_unique<WorkerThread>(std::move(worker)));
}
@@ -125,8 +125,8 @@ Server::~Server() {
while (GetFetchFileThreadNum() != 0) {
usleep(100000);
if (++counter == 600) {
- LOG(WARNING) << "[server] Will force destroy the server after waiting
60s, leave " << GetFetchFileThreadNum()
- << " fetch file threads are still running";
+ warn("[server] Will force destroy the server after waiting 60s, leave {}
fetch file threads are still running",
+ GetFetchFileThreadNum());
break;
}
}
@@ -199,7 +199,7 @@ Status Server::Start() {
}
if (auto s = task_runner_.Start(); !s) {
- LOG(WARNING) << "Failed to start task runner: " << s.Msg();
+ warn("Failed to start task runner: {}", s.Msg());
}
// setup server cron thread
cron_thread_ = GET_OR_RET(util::CreateThread("server-cron", [this] {
this->cron(); }));
@@ -239,7 +239,7 @@ Status Server::Start() {
}));
memory_startup_use_.store(Stats::GetMemoryRSS(), std::memory_order_relaxed);
- LOG(INFO) << "[server] Ready to accept connections";
+ info("[server] Ready to accept connections");
return Status::OK();
}
@@ -261,13 +261,13 @@ void Server::Stop() {
void Server::Join() {
if (auto s = util::ThreadJoin(cron_thread_); !s) {
- LOG(WARNING) << "Cron thread operation failed: " << s.Msg();
+ warn("Cron thread operation failed: {}", s.Msg());
}
if (auto s = util::ThreadJoin(compaction_checker_thread_); !s) {
- LOG(WARNING) << "Compaction checker thread operation failed: " << s.Msg();
+ warn("Compaction checker thread operation failed: {}", s.Msg());
}
if (auto s = task_runner_.Join(); !s) {
- LOG(WARNING) << s.Msg();
+ warn("{}", s.Msg());
}
for (const auto &worker : worker_threads_) {
worker->Join();
@@ -298,7 +298,7 @@ Status Server::AddMaster(const std::string &host, uint32_t
port, bool force_reco
[this]() {
this->is_loading_ = false;
if (auto s = task_runner_.Start(); !s)
{
- LOG(WARNING) << "Failed to start
task runner: " << s.Msg();
+ warn("Failed to start task runner:
{}", s.Msg());
}
});
if (s.IsOK()) {
@@ -672,7 +672,7 @@ void Server::WakeupBlockingConns(const std::string &key,
size_t n_conns) {
auto conn_ctx = iter->second.front();
auto s = conn_ctx.owner->EnableWriteEvent(conn_ctx.fd);
if (!s.IsOK()) {
- LOG(ERROR) << "[server] Failed to enable write event on blocked client "
<< conn_ctx.fd << ": " << s.Msg();
+ error("[server] Failed to enable write event on blocked client {}: {}",
conn_ctx.fd, s.Msg());
}
iter->second.pop_front();
}
@@ -691,8 +691,7 @@ void Server::OnEntryAddedToStream(const std::string &ns,
const std::string &key,
if (consumer->ns == ns && entry_id > consumer->last_consumed_id) {
auto s = consumer->owner->EnableWriteEvent(consumer->fd);
if (!s.IsOK()) {
- LOG(ERROR) << "[server] Failed to enable write event on blocked stream
consumer " << consumer->fd << ": "
- << s.Msg();
+ error("[server] Failed to enable write event on blocked stream
consumer {}: {}", consumer->fd, s.Msg());
}
it = iter->second.erase(it);
} else {
@@ -776,11 +775,11 @@ void Server::cron() {
if (!config_->compaction_checker_cron.IsEnabled() &&
config_->compact_cron.IsEnabled() &&
config_->compact_cron.IsTimeMatch(&now)) {
Status s = AsyncCompactDB();
- LOG(INFO) << "[server] Schedule to compact the db, result: " <<
s.Msg();
+ info("[server] Schedule to compact the db, result: {}", s.Msg());
}
if (config_->bgsave_cron.IsEnabled() &&
config_->bgsave_cron.IsTimeMatch(&now)) {
Status s = AsyncBgSaveDB();
- LOG(INFO) << "[server] Schedule to bgsave the db, result: " << s.Msg();
+ info("[server] Schedule to bgsave the db, result: {}", s.Msg());
}
if (config_->dbsize_scan_cron.IsEnabled() &&
config_->dbsize_scan_cron.IsTimeMatch(&now)) {
auto tokens = namespace_.List();
@@ -797,7 +796,7 @@ void Server::cron() {
for (auto &ns : namespaces) {
Status s = AsyncScanDBSize(ns);
- LOG(INFO) << "[server] Schedule to recalculate the db size on
namespace: " << ns << ", result: " << s.Msg();
+ info("[server] Schedule to recalculate the db size on namespace: {},
result: {}", ns, s.Msg());
}
}
}
@@ -824,9 +823,9 @@ void Server::cron() {
(now_secs - create_time_secs > 24 * 60 * 60)) {
auto s = rocksdb::DestroyDB(config_->checkpoint_dir,
rocksdb::Options());
if (!s.ok()) {
- LOG(WARNING) << "[server] Fail to clean checkpoint, error: " <<
s.ToString();
+ warn("[server] Fail to clean checkpoint, error: {}", s.ToString());
} else {
- LOG(INFO) << "[server] Clean checkpoint successfully";
+ info("[server] Clean checkpoint successfully");
}
}
}
@@ -840,9 +839,9 @@ void Server::cron() {
if (counter != 0 && counter % 600 == 0 &&
storage->IsDBInRetryableIOError()) {
auto s = storage->GetDB()->Resume();
if (s.ok()) {
- LOG(WARNING) << "[server] Successfully resumed DB after retryable IO
error";
+ warn("[server] Successfully resumed DB after retryable IO error");
} else {
- LOG(ERROR) << "[server] Failed to resume DB after retryable IO error:
" << s.ToString();
+ error("[server] Failed to resume DB after retryable IO error: {}",
s.ToString());
}
storage->SetDBInRetryableIOError(false);
}
@@ -1339,7 +1338,7 @@ std::string Server::GetRocksDBStatsJson() const {
// guarantee other threads don't access DB and its column families, then close
db.
bool Server::PrepareRestoreDB() {
// Stop feeding slaves thread
- LOG(INFO) << "[server] Disconnecting slaves...";
+ info("[server] Disconnecting slaves...");
DisconnectSlaves();
// If the DB is restored, the object 'db_' will be destroyed, but
@@ -1355,7 +1354,7 @@ bool Server::PrepareRestoreDB() {
// To guarantee work threads don't access DB, we should release
'ExclusivityGuard'
// ASAP to avoid user can't receive responses for long time, because the
following
// 'CloseDB' may cost much time to acquire DB mutex.
- LOG(INFO) << "[server] Waiting workers for finishing executing commands...";
+ info("[server] Waiting workers for finishing executing commands...");
while (!works_concurrency_rw_lock_.try_lock()) {
if (replication_thread_->IsStopped()) {
is_loading_ = false;
@@ -1366,22 +1365,22 @@ bool Server::PrepareRestoreDB() {
works_concurrency_rw_lock_.unlock();
// Stop task runner
- LOG(INFO) << "[server] Stopping the task runner and clear task queue...";
+ info("[server] Stopping the task runner and clear task queue...");
task_runner_.Cancel();
if (auto s = task_runner_.Join(); !s) {
- LOG(WARNING) << "[server] " << s.Msg();
+ warn("[server] {}", s.Msg());
}
// Cron thread, compaction checker thread, full synchronization thread
// may always run in the background, we need to close db, so they don't
actually work.
- LOG(INFO) << "[server] Waiting for closing DB...";
+ info("[server] Waiting for closing DB...");
storage->CloseDB();
return true;
}
void Server::WaitNoMigrateProcessing() {
if (config_->cluster_enabled) {
- LOG(INFO) << "[server] Waiting until no migration task is running...";
+ info("[server] Waiting until no migration task is running...");
slot_migrator->SetStopMigrationFlag(true);
while (slot_migrator->GetCurrentSlotMigrationStage() !=
SlotMigrationStage::kNone) {
usleep(500);
@@ -1408,7 +1407,7 @@ Status Server::AsyncCompactDB(const std::string
&begin_key, const std::string &e
auto s = storage->Compact(nullptr, begin.get(), end.get());
if (!s.ok()) {
- LOG(ERROR) << "[task runner] Failed to do compaction: " << s.ToString();
+ error("[task runner] Failed to do compaction: {}", s.ToString());
}
std::lock_guard<std::mutex> lg(db_job_mu_);
@@ -1463,7 +1462,7 @@ Status Server::AsyncScanDBSize(const std::string &ns) {
engine::Context ctx(storage);
auto s = db.GetKeyNumStats(ctx, "", &stats);
if (!s.ok()) {
- LOG(ERROR) << "failed to retrieve key num stats: " << s.ToString();
+ error("failed to retrieve key num stats: {}", s.ToString());
}
std::lock_guard<std::mutex> lg(db_job_mu_);
@@ -1517,9 +1516,10 @@ Status Server::autoResizeBlockAndSST() {
if (target_file_size_base != config_->rocks_db.target_file_size_base) {
auto old_target_file_size_base = config_->rocks_db.target_file_size_base;
auto s = config_->Set(this, "rocksdb.target_file_size_base",
std::to_string(target_file_size_base));
- LOG(INFO) << "[server] Resize rocksdb.target_file_size_base from " <<
old_target_file_size_base << " to "
- << target_file_size_base << ", average_kv_size: " <<
average_kv_size << ", total_size: " << total_size
- << ", total_keys: " << total_keys << ", result: " << s.Msg();
+ info(
+ "[server] Resize rocksdb.target_file_size_base from {} to {}, "
+ "average_kv_size: {}, total_size: {}, total_keys: {}, result: {}",
+ old_target_file_size_base, target_file_size_base, average_kv_size,
total_size, total_keys, s.Msg());
if (!s.IsOK()) {
return s;
}
@@ -1528,9 +1528,11 @@ Status Server::autoResizeBlockAndSST() {
if (target_file_size_base != config_->rocks_db.write_buffer_size) {
auto old_write_buffer_size = config_->rocks_db.write_buffer_size;
auto s = config_->Set(this, "rocksdb.write_buffer_size",
std::to_string(target_file_size_base));
- LOG(INFO) << "[server] Resize rocksdb.write_buffer_size from " <<
old_write_buffer_size << " to "
- << target_file_size_base << ", average_kv_size: " <<
average_kv_size << ", total_size: " << total_size
- << ", total_keys: " << total_keys << ", result: " << s.Msg();
+ info(
+ "[server] Resize rocksdb.write_buffer_size from {} to {}, "
+ "average_kv_size: {}, total_size: {}, "
+ "total_keys: {}, result: {}",
+ old_write_buffer_size, target_file_size_base, average_kv_size,
total_size, total_keys, s.Msg());
if (!s.IsOK()) {
return s;
}
@@ -1538,9 +1540,11 @@ Status Server::autoResizeBlockAndSST() {
if (block_size != config_->rocks_db.block_size) {
auto s =
storage->SetOptionForAllColumnFamilies("table_factory.block_size",
std::to_string(block_size));
- LOG(INFO) << "[server] Resize rocksdb.block_size from " <<
config_->rocks_db.block_size << " to " << block_size
- << ", average_kv_size: " << average_kv_size << ", total_size: "
<< total_size
- << ", total_keys: " << total_keys << ", result: " << s.Msg();
+ info(
+ "[server] Resize rocksdb.block_size from {} to {}, "
+ "average_kv_size: {}, total_size: {}, "
+ "total_keys: {}, result: {}",
+ config_->rocks_db.block_size, block_size, average_kv_size, total_size,
total_keys, s.Msg());
if (!s.IsOK()) {
return s;
}
@@ -1549,7 +1553,7 @@ Status Server::autoResizeBlockAndSST() {
}
auto s = config_->Rewrite(namespace_.List());
- LOG(INFO) << "[server] Rewrite config, result: " << s.Msg();
+ info("[server] Rewrite config, result: {}", s.Msg());
return Status::OK();
}
@@ -1671,8 +1675,7 @@ void Server::KillClient(int64_t *killed, const
std::string &addr, uint64_t id, u
(type & kTypeMaster || (!addr.empty() && addr == master_host_ + ":" +
std::to_string(master_port_)))) {
// Stop replication thread and start a new one to replicate
if (auto s = AddMaster(master_host_, master_port_, true); !s.IsOK()) {
- LOG(ERROR) << "[server] Failed to add master " << master_host_ << ":" <<
master_port_
- << " with error: " << s.Msg();
+ error("[server] Failed to add master {}:{} with error: {}",
master_host_, master_port_, s.Msg());
}
(*killed)++;
}
@@ -1850,18 +1853,24 @@ void Server::AdjustOpenFilesLimit() {
if (best_limit < max_files) {
if (best_limit <= static_cast<int>(min_reserved_fds)) {
- LOG(WARNING) << "[server] Your current 'ulimit -n' of " << old_limit <<
" is not enough for the server to start."
- << "Please increase your open file limit to at least " <<
max_files << ". Exiting.";
+ warn(
+ "[server] Your current 'ulimit -n' of {} is not enough for the
server to start. "
+ "Please increase your open file limit to at least {}. Exiting.",
+ old_limit, max_files);
exit(1);
}
- LOG(WARNING) << "[server] You requested max clients of " << max_clients <<
" and RocksDB max open files of "
- << rocksdb_max_open_file << " requiring at least " <<
max_files << " max file descriptors.";
- LOG(WARNING) << "[server] Server can't set maximum open files to " <<
max_files
- << " because of OS error: " << strerror(setrlimit_error);
+ warn(
+ "[server] You requested max clients of {} and RocksDB max open files
of {} "
+ "requiring at least {} max file descriptors.",
+ max_clients, rocksdb_max_open_file, max_files);
+
+ warn(
+ "[server] Server can't set maximum open files to {} "
+ "because of OS error: {}",
+ max_files, strerror(setrlimit_error));
} else {
- LOG(WARNING) << "[server] Increased maximum number of open files to " <<
max_files << " (it's originally set to "
- << old_limit << ")";
+ warn("[server] Increased maximum number of open files to {} (it's
originally set to {})", max_files, old_limit);
}
}
@@ -1874,12 +1883,12 @@ void Server::AdjustWorkerThreads() {
if (new_worker_threads > worker_threads_.size()) {
delta = new_worker_threads - worker_threads_.size();
increaseWorkerThreads(delta);
- LOG(INFO) << "[server] Increase worker threads from " <<
worker_threads_.size() << " to " << new_worker_threads;
+ info("[server] Increase worker threads from {} to {}",
worker_threads_.size(), new_worker_threads);
return;
}
delta = worker_threads_.size() - new_worker_threads;
- LOG(INFO) << "[server] Decrease worker threads from " <<
worker_threads_.size() << " to " << new_worker_threads;
+ info("[server] Decrease worker threads from {} to {}",
worker_threads_.size(), new_worker_threads);
decreaseWorkerThreads(delta);
}
diff --git a/src/server/tls_util.cc b/src/server/tls_util.cc
index 3767a32a4..95a7a3ab6 100644
--- a/src/server/tls_util.cc
+++ b/src/server/tls_util.cc
@@ -45,7 +45,7 @@ void InitSSL() {
ERR_load_crypto_strings();
if (!RAND_poll()) {
- LOG(ERROR) << "OpenSSL failed to generate random seed";
+ error("OpenSSL failed to generate random seed");
exit(1);
}
@@ -117,19 +117,19 @@ StatusOr<unsigned long> ParseSSLProtocols(const
std::string &protocols) { // NO
UniqueSSLContext CreateSSLContext(const Config *config, const SSL_METHOD
*method) {
if (config->tls_cert_file.empty() || config->tls_key_file.empty()) {
- LOG(ERROR) << "Both tls-cert-file and tls-key-file must be specified while
TLS is enabled";
+ error("Both tls-cert-file and tls-key-file must be specified while TLS is
enabled");
return nullptr;
}
auto ssl_ctx = UniqueSSLContext(method);
if (!ssl_ctx) {
- LOG(ERROR) << "Failed to construct SSL context: " << SSLErrors{};
+ error("Failed to construct SSL context: {}", fmt::streamed(SSLErrors{}));
return nullptr;
}
auto proto_status = ParseSSLProtocols(config->tls_protocols);
if (!proto_status) {
- LOG(ERROR) << proto_status.Msg();
+ error("{}", proto_status.Msg());
return nullptr;
}
@@ -176,16 +176,16 @@ UniqueSSLContext CreateSSLContext(const Config *config,
const SSL_METHOD *method
auto ca_dir = config->tls_ca_cert_dir.empty() ? nullptr :
config->tls_ca_cert_dir.c_str();
if (ca_file || ca_dir) {
if (SSL_CTX_load_verify_locations(ssl_ctx.get(), ca_file, ca_dir) != 1) {
- LOG(ERROR) << "Failed to load CA certificates: " << SSLErrors{};
+ error("Failed to load CA certificates: {}", fmt::streamed(SSLErrors{}));
return nullptr;
}
} else if (config->tls_auth_clients != TLS_AUTH_CLIENTS_NO) {
- LOG(ERROR) << "Either tls-ca-cert-file or tls-ca-cert-dir must be
specified while tls-auth-clients is enabled";
+ error("Either tls-ca-cert-file or tls-ca-cert-dir must be specified while
tls-auth-clients is enabled");
return nullptr;
}
if (SSL_CTX_use_certificate_chain_file(ssl_ctx.get(),
config->tls_cert_file.c_str()) != 1) {
- LOG(ERROR) << "Failed to load SSL certificate file: " << SSLErrors{};
+ error("Failed to load SSL certificate file: {}",
fmt::streamed(SSLErrors{}));
return nullptr;
}
@@ -199,23 +199,23 @@ UniqueSSLContext CreateSSLContext(const Config *config,
const SSL_METHOD *method
}
if (SSL_CTX_use_PrivateKey_file(ssl_ctx.get(), config->tls_key_file.c_str(),
SSL_FILETYPE_PEM) != 1) {
- LOG(ERROR) << "Failed to load SSL private key file: " << SSLErrors{};
+ error("Failed to load SSL private key file: {}",
fmt::streamed(SSLErrors{}));
return nullptr;
}
if (SSL_CTX_check_private_key(ssl_ctx.get()) != 1) {
- LOG(ERROR) << "Failed to check the loaded private key: " << SSLErrors{};
+ error("Failed to check the loaded private key: {}",
fmt::streamed(SSLErrors{}));
return nullptr;
}
if (!config->tls_ciphers.empty() && !SSL_CTX_set_cipher_list(ssl_ctx.get(),
config->tls_ciphers.c_str())) {
- LOG(ERROR) << "Failed to set SSL ciphers: " << SSLErrors{};
+ error("Failed to set SSL ciphers: {}", fmt::streamed(SSLErrors{}));
return nullptr;
}
#ifdef SSL_OP_NO_TLSv1_3
if (!config->tls_ciphersuites.empty() &&
!SSL_CTX_set_ciphersuites(ssl_ctx.get(), config->tls_ciphersuites.c_str())) {
- LOG(ERROR) << "Failed to set SSL ciphersuites: " << SSLErrors{};
+ error("Failed to set SSL ciphersuites: {}", fmt::streamed(SSLErrors{}));
return nullptr;
}
#endif
diff --git a/src/server/worker.cc b/src/server/worker.cc
index 2984c7686..e65467f2c 100644
--- a/src/server/worker.cc
+++ b/src/server/worker.cc
@@ -23,6 +23,7 @@
#include <event2/util.h>
#include <unistd.h>
+#include <cstdint>
#include <stdexcept>
#include <string>
@@ -60,7 +61,7 @@ Worker::Worker(Server *srv, Config *config) : srv(srv),
base_(event_base_new())
if (config->socket_fd != -1) {
if (const Status s = listenFD(config->socket_fd, config->port,
config->backlog); !s.IsOK()) {
- LOG(ERROR) << "[worker] Failed to listen to socket with fd: " <<
config->socket_fd << ". Error: " << s.Msg();
+ error("[worker] Failed to listen to socket with fd: {}, Error: {}",
config->socket_fd, s.Msg());
exit(1);
}
} else {
@@ -69,10 +70,10 @@ Worker::Worker(Server *srv, Config *config) : srv(srv),
base_(event_base_new())
for (const uint32_t *port = ports; *port; ++port) {
for (const auto &bind : config->binds) {
if (const Status s = listenTCP(bind, *port, config->backlog);
!s.IsOK()) {
- LOG(ERROR) << "[worker] Failed to listen on: " << bind << ":" <<
*port << ". Error: " << s.Msg();
+ error("[worker] Failed to listen on: {}:{}, Error: {}", bind, *port,
s.Msg());
exit(1);
}
- LOG(INFO) << "[worker] Listening on: " << bind << ":" << *port;
+ info("[worker] Listening on: {}:{}", bind, *port);
}
}
}
@@ -113,18 +114,18 @@ void Worker::TimerCB(int, [[maybe_unused]] int16_t
events) {
void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd,
[[maybe_unused]] sockaddr *address,
[[maybe_unused]] int socklen) {
int local_port = util::GetLocalPort(fd); // NOLINT
- DLOG(INFO) << "[worker] New connection: fd=" << fd << " from port: " <<
local_port << " thread #" << tid_;
+ debug("[worker] New connection: fd={} from port: {} thread #{}", fd,
local_port, fmt::streamed(tid_));
auto s = util::SockSetTcpKeepalive(fd, 120);
if (!s.IsOK()) {
- LOG(ERROR) << "[worker] Failed to set tcp-keepalive on socket. Error: " <<
s.Msg();
+ error("[worker] Failed to set tcp-keepalive on socket. Error: {}",
s.Msg());
evutil_closesocket(fd);
return;
}
s = util::SockSetTcpNoDelay(fd, 1);
if (!s.IsOK()) {
- LOG(ERROR) << "[worker] Failed to set tcp-nodelay on socket. Error: " <<
s.Msg();
+ error("[worker] Failed to set tcp-nodelay on socket. Error: {}", s.Msg());
evutil_closesocket(fd);
return;
}
@@ -139,7 +140,7 @@ void Worker::newTCPConnection(evconnlistener *listener,
evutil_socket_t fd, [[ma
if (uint32_t(local_port) == srv->GetConfig()->tls_port) {
ssl = SSL_new(srv->ssl_ctx.get());
if (!ssl) {
- LOG(ERROR) << "Failed to construct SSL structure for new connection: "
<< SSLErrors{};
+ error("[worker] Failed to construct SSL structure for new connection:
{}", fmt::streamed(SSLErrors{}));
evutil_closesocket(fd);
return;
}
@@ -153,10 +154,11 @@ void Worker::newTCPConnection(evconnlistener *listener,
evutil_socket_t fd, [[ma
if (!bev) {
auto socket_err = evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR());
#ifdef ENABLE_OPENSSL
- LOG(ERROR) << "Failed to construct socket for new connection: " <<
socket_err << ", SSL error: " << SSLErrors{};
+ error("[worker] Failed to construct socket for new connection: {}, SSL
error: {}", socket_err,
+ fmt::streamed(SSLErrors{}));
if (ssl) SSL_free(ssl);
#else
- LOG(ERROR) << "Failed to construct socket for new connection: " <<
socket_err;
+ error("[worker] Failed to construct socket for new connection: {}",
socket_err);
#endif
evutil_closesocket(fd);
return;
@@ -175,7 +177,7 @@ void Worker::newTCPConnection(evconnlistener *listener,
evutil_socket_t fd, [[ma
std::string err_msg = redis::Error({Status::NotOK, s.Msg()});
s = util::SockSend(fd, err_msg, ssl);
if (!s.IsOK()) {
- LOG(WARNING) << "Failed to send error response to socket: " << s.Msg();
+ warn("[worker] Failed to send error response to socket: {}", s.Msg());
}
conn->Close();
return;
@@ -193,8 +195,8 @@ void Worker::newTCPConnection(evconnlistener *listener,
evutil_socket_t fd, [[ma
void Worker::newUnixSocketConnection(evconnlistener *listener, evutil_socket_t
fd, [[maybe_unused]] sockaddr *address,
[[maybe_unused]] int socklen) {
- DLOG(INFO) << "[worker] New connection: fd=" << fd << " from unixsocket: "
<< srv->GetConfig()->unixsocket
- << " thread #" << tid_;
+ debug("[worker] New connection: fd={} from unixsocket: {} thread #{}", fd,
srv->GetConfig()->unixsocket,
+ fmt::streamed(tid_));
event_base *base = evconnlistener_get_base(listener);
auto ev_thread_safe_flags =
BEV_OPT_THREADSAFE | BEV_OPT_DEFER_CALLBACKS | BEV_OPT_UNLOCK_CALLBACKS
| BEV_OPT_CLOSE_ON_FREE;
@@ -208,7 +210,7 @@ void Worker::newUnixSocketConnection(evconnlistener
*listener, evutil_socket_t f
if (!s.IsOK()) {
s = util::SockSend(fd, redis::Error(s));
if (!s.IsOK()) {
- LOG(WARNING) << "Failed to send error response to socket: " << s.Msg();
+ warn("[worker] Failed to send error response to socket: {}", s.Msg());
}
conn->Close();
return;
@@ -232,7 +234,7 @@ Status Worker::listenFD(int fd, uint32_t expected_port, int
backlog) {
evconnlistener *lev =
NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_THREADSAFE |
LEV_OPT_CLOSE_ON_FREE, backlog, dup_fd);
listen_events_.emplace_back(lev);
- LOG(INFO) << "Listening on dup'ed fd: " << dup_fd;
+ info("[worker] Listening on dup'ed fd: {}", dup_fd);
return Status::OK();
}
@@ -312,7 +314,7 @@ Status Worker::ListenUnixSocket(const std::string &path,
int perm, int backlog)
void Worker::Run(std::thread::id tid) {
tid_ = tid;
if (event_base_dispatch(base_) != 0) {
- LOG(ERROR) << "[worker] Failed to run server, err: " << strerror(errno);
+ error("[worker] Failed to run server, err: {}", strerror(errno));
}
is_terminated_ = true;
}
@@ -586,17 +588,17 @@ void WorkerThread::Start() {
if (s) {
t_ = std::move(*s);
} else {
- LOG(ERROR) << "[worker] Failed to start worker thread, err: " << s.Msg();
+ error("[worker] Failed to start worker thread, err: {}", s.Msg());
return;
}
- LOG(INFO) << "[worker] Thread #" << t_.get_id() << " started";
+ info("[worker] Thread #{} started", fmt::streamed(t_.get_id()));
}
void WorkerThread::Stop(uint32_t wait_seconds) { worker_->Stop(wait_seconds); }
void WorkerThread::Join() {
if (auto s = util::ThreadJoin(t_); !s) {
- LOG(WARNING) << "[worker] " << s.Msg();
+ warn("[worker] {}", s.Msg());
}
}