This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 313258e9 Support async logging (#2413)
313258e9 is described below
commit 313258e9825a9b773cf0a7b52dff52bcf22a4b0e
Author: Bright Chen <[email protected]>
AuthorDate: Tue Jan 23 13:29:46 2024 +0800
Support async logging (#2413)
* Support async logging
* Set max async log queue size
* Flush async log before exit
* Use sync log for the first LogRequest
* Support usec of logging for macOS
---
src/butil/logging.cc | 342 +++++++++++++++++++++++++++++++++++++++++++----
test/logging_unittest.cc | 162 +++++++++++++++++++++-
2 files changed, 476 insertions(+), 28 deletions(-)
diff --git a/src/butil/logging.cc b/src/butil/logging.cc
index 544a5cab..032a5448 100644
--- a/src/butil/logging.cc
+++ b/src/butil/logging.cc
@@ -73,8 +73,11 @@ typedef pthread_mutex_t* MutexHandle;
#include "butil/strings/string_util.h"
#include "butil/strings/stringprintf.h"
#include "butil/strings/utf_string_conversions.h"
-#include "butil/synchronization/lock.h"
+#include "butil/synchronization/condition_variable.h"
#include "butil/threading/platform_thread.h"
+#include "butil/threading/simple_thread.h"
+#include "butil/object_pool.h"
+
#if defined(OS_POSIX)
#include "butil/errno.h"
#include "butil/fd_guard.h"
@@ -144,6 +147,17 @@ DEFINE_bool(log_year, false, "Log year in datetime part in
each log");
DEFINE_bool(log_func_name, false, "Log function name in each log");
+DEFINE_bool(async_log, false, "Use async log");
+
+DEFINE_bool(async_log_in_background_always, false, "Async log written in
background always.");
+
+DEFINE_int32(max_async_log_queue_size, 100000, "Max async log size. "
+ "If current log count of async log > max_async_log_size, "
+ "Use sync log to protect process.");
+
+DEFINE_int32(sleep_to_flush_async_log_s, 0,
+ "If the value > 0, sleep before atexit to flush async log");
+
namespace {
LoggingDestination logging_destination = LOG_DEFAULT;
@@ -399,8 +413,300 @@ void CloseLogFileUnlocked() {
log_file = NULL;
}
+void Log2File(const std::string& log) {
+ // We can have multiple threads and/or processes, so try to prevent them
+ // from clobbering each other's writes.
+ // If the client app did not call InitLogging, and the lock has not
+ // been created do it now. We do this on demand, but if two threads try
+ // to do this at the same time, there will be a race condition to create
+ // the lock. This is why InitLogging should be called from the main
+ // thread at the beginning of execution.
+ LoggingLock::Init(LOCK_LOG_FILE, NULL);
+ LoggingLock logging_lock;
+ if (InitializeLogFileHandle()) {
+#if defined(OS_WIN)
+ SetFilePointer(log_file, 0, 0, SEEK_END);
+ DWORD num_written;
+ WriteFile(log_file,
+ static_cast<const void*>(log.data()),
+ static_cast<DWORD>(log.size()),
+ &num_written,
+ NULL);
+#else
+ fwrite(log.data(), log.size(), 1, log_file);
+ fflush(log_file);
+#endif
+ }
+}
+
} // namespace
+struct BAIDU_CACHELINE_ALIGNMENT LogRequest {
+ static LogRequest* const UNCONNECTED;
+
+ LogRequest* next{NULL};
+ std::string data;
+};
+
+LogRequest* const LogRequest::UNCONNECTED = (LogRequest*)(intptr_t)-1;
+
+class AsyncLogger : public butil::SimpleThread {
+public:
+ static AsyncLogger* GetInstance();
+
+ void Log(const std::string& log);
+ void Log(std::string&& log);
+ void StopAndJoin();
+
+private:
+friend struct DefaultSingletonTraits<AsyncLogger>;
+
+ static LogRequest _stop_req;
+
+ AsyncLogger();
+ ~AsyncLogger() override;
+
+ static void AtExit() {
+ GetInstance()->StopAndJoin();
+ if (FLAGS_sleep_to_flush_async_log_s > 0) {
+ ::sleep(FLAGS_sleep_to_flush_async_log_s);
+ }
+ }
+
+ void LogImpl(LogRequest* log_req);
+
+ void Run() override;
+
+ void LogTask(LogRequest* req);
+
+ bool IsLogComplete(LogRequest* old_head);
+
+ void DoLog(LogRequest* req);
+ void DoLog(const std::string& log);
+
+ butil::atomic<LogRequest*> _log_head;
+ butil::Mutex _mutex;
+ butil::ConditionVariable _cond;
+ LogRequest* _current_log_request;
+ butil::atomic<int32_t> _log_request_count;
+ butil::atomic<bool> _stop;
+};
+
+AsyncLogger* AsyncLogger::GetInstance() {
+ return Singleton<AsyncLogger,
+ LeakySingletonTraits<AsyncLogger>>::get();
+}
+
+AsyncLogger::AsyncLogger()
+ : butil::SimpleThread("async_log_thread")
+ , _log_head(NULL)
+ , _cond(&_mutex)
+ , _current_log_request(NULL)
+ , _stop(false) {
+ Start();
+ // We need to stop async logger and
+ // flush all async log before exit.
+ atexit(AtExit);
+}
+
+AsyncLogger::~AsyncLogger() {
+ StopAndJoin();
+}
+
+void AsyncLogger::Log(const std::string& log) {
+ if (log.empty()) {
+ return;
+ }
+
+ bool is_full = FLAGS_max_async_log_queue_size > 0 &&
+ _log_request_count.fetch_add(1, butil::memory_order_relaxed) >
+ FLAGS_max_async_log_queue_size;
+ if (is_full || _stop.load(butil::memory_order_relaxed)) {
+ // Async logger is full or stopped, fallback to sync log.
+ DoLog(log);
+ return;
+ }
+
+ auto log_req = butil::get_object<LogRequest>();
+ if (!log_req) {
+ // Async log failed, fallback to sync log.
+ DoLog(log);
+ return;
+ }
+ log_req->data = log;
+ LogImpl(log_req);
+}
+
+void AsyncLogger::Log(std::string&& log) {
+ if (log.empty()) {
+ return;
+ }
+
+ bool is_full = FLAGS_max_async_log_queue_size > 0 &&
+ _log_request_count.fetch_add(1, butil::memory_order_relaxed) >
+ FLAGS_max_async_log_queue_size;
+ if (is_full || _stop.load(butil::memory_order_relaxed)) {
+ // Async logger is full or stopped, fallback to sync log.
+ DoLog(log);
+ return;
+ }
+
+ auto log_req = butil::get_object<LogRequest>();
+ if (!log_req) {
+ // Async log failed, fallback to sync log.
+ DoLog(log);
+ return;
+ }
+ log_req->data = std::move(log);
+ LogImpl(log_req);
+}
+
+void AsyncLogger::LogImpl(LogRequest* log_req) {
+ log_req->next = LogRequest::UNCONNECTED;
+ // Release fence makes sure the thread getting request sees *req
+ LogRequest* const prev_head =
+ _log_head.exchange(log_req, butil::memory_order_release);
+ if (prev_head != NULL) {
+ // Someone is logging. The async_log_thread thread may spin
+ // until req->next to be non-UNCONNECTED. This process is not
+ // lock-free, but the duration is so short(1~2 instructions,
+ // depending on compiler) that the spin rarely occurs in practice
+ // (I've not seen any spin in highly contended tests).
+ log_req->next = prev_head;
+ return;
+ }
+ // We've got the right to write.
+ log_req->next = NULL;
+
+ if (!FLAGS_async_log_in_background_always) {
+ // Use sync log for the LogRequest
+ // which has got the right to write.
+ DoLog(log_req);
+ // Return when there's no more LogRequests.
+ if (IsLogComplete(log_req)) {
+ butil::return_object(log_req);
+ return;
+ }
+ }
+
+ BAIDU_SCOPED_LOCK(_mutex);
+ if (_stop.load(butil::memory_order_relaxed)) {
+ // Async logger is stopped, fallback to sync log.
+ LogTask(log_req);
+ } else {
+ // Wake up async logger.
+ _current_log_request = log_req;
+ _cond.Signal();
+ }
+}
+
+void AsyncLogger::StopAndJoin() {
+ if (!_stop.exchange(true, butil::memory_order_relaxed)) {
+ BAIDU_SCOPED_LOCK(_mutex);
+ _cond.Signal();
+ }
+ if (!HasBeenJoined()) {
+ Join();
+ }
+}
+
+void AsyncLogger::Run() {
+ while (true) {
+ BAIDU_SCOPED_LOCK(_mutex);
+ while (!_stop.load(butil::memory_order_relaxed) &&
+ !_current_log_request) {
+ _cond.Wait();
+ }
+ if (_stop.load(butil::memory_order_relaxed) &&
+ !_current_log_request) {
+ break;
+ }
+
+ LogTask(_current_log_request);
+ _current_log_request = NULL;
+ }
+}
+
+void AsyncLogger::LogTask(LogRequest* req) {
+ do {
+ // req was logged, skip it.
+ if (req->next != NULL && req->data.empty()) {
+ LogRequest* const saved_req = req;
+ req = req->next;
+ butil::return_object(saved_req);
+ }
+
+ // Log all requests to file.
+ while (req->next != NULL) {
+ LogRequest* const saved_req = req;
+ req = req->next;
+ if (!saved_req->data.empty()) {
+ DoLog(saved_req);
+ }
+ // Release LogRequests until last request.
+ butil::return_object(saved_req);
+ }
+ if (!req->data.empty()) {
+ DoLog(req);
+ }
+
+ // Return when there's no more LogRequests.
+ if (IsLogComplete(req)) {
+ butil::return_object(req);
+ return;
+ }
+ } while (true);
+}
+
+bool AsyncLogger::IsLogComplete(LogRequest* old_head) {
+ if (old_head->next) {
+ fprintf(stderr, "old_head->next should be NULL\n");
+ }
+ LogRequest* new_head = old_head;
+ LogRequest* desired = NULL;
+ if (_log_head.compare_exchange_strong(
+ new_head, desired, butil::memory_order_acquire)) {
+ // No one added new requests.
+ return true;
+ }
+ if (new_head == old_head) {
+ fprintf(stderr, "new_head should not be equal to old_head\n");
+ }
+ // Above acquire fence pairs release fence of exchange in Log() to make
+ // sure that we see all fields of requests set.
+
+ // Someone added new requests.
+ // Reverse the list until old_head.
+ LogRequest* tail = NULL;
+ LogRequest* p = new_head;
+ do {
+ while (p->next == LogRequest::UNCONNECTED) {
+ sched_yield();
+ }
+ LogRequest* const saved_next = p->next;
+ p->next = tail;
+ tail = p;
+ p = saved_next;
+ if (!p) {
+ fprintf(stderr, "p should not be NULL\n");
+ }
+ } while (p != old_head);
+
+ // Link old list with new list.
+ old_head->next = tail;
+ return false;
+}
+
+void AsyncLogger::DoLog(LogRequest* req) {
+ DoLog(req->data);
+ req->data.clear();
+}
+
+void AsyncLogger::DoLog(const std::string& log) {
+ Log2File(log);
+ _log_request_count.fetch_sub(1);
+}
+
LoggingSettings::LoggingSettings()
: logging_dest(LOG_DEFAULT),
log_file(NULL),
@@ -473,7 +779,7 @@ void PrintLogPrefix(std::ostream& os, int severity,
const char* file, int line,
const char* func) {
PrintLogSeverity(os, severity);
-#if defined(OS_LINUX)
+#if defined(OS_LINUX) || defined(OS_MACOSX)
timeval tv;
gettimeofday(&tv, NULL);
time_t t = tv.tv_sec;
@@ -495,7 +801,7 @@ void PrintLogPrefix(std::ostream& os, int severity,
<< std::setw(2) << local_tm.tm_hour << ':'
<< std::setw(2) << local_tm.tm_min << ':'
<< std::setw(2) << local_tm.tm_sec;
-#if defined(OS_LINUX)
+#if defined(OS_LINUX) || defined(OS_MACOSX)
os << '.' << std::setw(6) << tv.tv_usec;
#endif
if (FLAGS_log_pid) {
@@ -957,35 +1263,17 @@ public:
// write to log file
if ((logging_destination & LOG_TO_FILE) != 0) {
- // We can have multiple threads and/or processes, so try to
prevent them
- // from clobbering each other's writes.
- // If the client app did not call InitLogging, and the lock has not
- // been created do it now. We do this on demand, but if two
threads try
- // to do this at the same time, there will be a race condition to
create
- // the lock. This is why InitLogging should be called from the main
- // thread at the beginning of execution.
- LoggingLock::Init(LOCK_LOG_FILE, NULL);
- LoggingLock logging_lock;
- if (InitializeLogFileHandle()) {
-#if defined(OS_WIN)
- SetFilePointer(log_file, 0, 0, SEEK_END);
- DWORD num_written;
- WriteFile(log_file,
- static_cast<const void*>(log.data()),
- static_cast<DWORD>(log.size()),
- &num_written,
- NULL);
-#else
- fwrite(log.data(), log.size(), 1, log_file);
- fflush(log_file);
-#endif
+ if (FLAGS_async_log) {
+ AsyncLogger::GetInstance()->Log(std::move(log));
+ } else {
+ Log2File(log);
}
}
return true;
}
private:
- DefaultLogSink() {}
- ~DefaultLogSink() {}
+ DefaultLogSink() = default;
+ ~DefaultLogSink() override = default;
friend struct DefaultSingletonTraits<DefaultLogSink>;
};
diff --git a/test/logging_unittest.cc b/test/logging_unittest.cc
index ca025ee9..66834b98 100644
--- a/test/logging_unittest.cc
+++ b/test/logging_unittest.cc
@@ -4,7 +4,9 @@
#include "butil/basictypes.h"
#include "butil/logging.h"
-
+#include "butil/gperftools_profiler.h"
+#include "butil/files/temp_file.h"
+#include "butil/popen.h"
#include <gtest/gtest.h>
#include <gflags/gflags.h>
@@ -14,6 +16,7 @@ namespace logging {
DECLARE_bool(crash_on_fatal_log);
DECLARE_int32(v);
DECLARE_bool(log_func_name);
+DECLARE_bool(async_log);
namespace {
@@ -475,6 +478,163 @@ TEST_F(LoggingTest, log_func) {
::logging::FLAGS_crash_on_fatal_log = old_crash_on_fatal_log;
}
+bool g_started = false;
+bool g_stopped = false;
+int g_prof_name_counter = 0;
+butil::atomic<uint64_t> test_logging_count(0);
+
+void* test_async_log(void* arg) {
+ if (arg == NULL) {
+ return NULL;
+ }
+ auto log = (std::string*)(arg);
+ while (!g_stopped) {
+ LOG(INFO) << *log;
+ test_logging_count.fetch_add(1);
+ }
+
+ return NULL;
+}
+
+TEST_F(LoggingTest, async_log) {
+ bool saved_async_log = FLAGS_async_log;
+ FLAGS_async_log = true;
+ butil::TempFile temp_file;
+ LoggingSettings settings;
+ settings.logging_dest = LOG_TO_FILE;
+ settings.log_file = temp_file.fname();
+ settings.delete_old = DELETE_OLD_LOG_FILE;
+ InitLogging(settings);
+
+ std::string log = "135792468";
+ int thread_num = 8;
+ pthread_t threads[thread_num];
+ for (int i = 0; i < thread_num; ++i) {
+ ASSERT_EQ(0, pthread_create(&threads[i], NULL, test_async_log, &log));
+ }
+
+ sleep(5);
+
+ g_stopped = true;
+ for (int i = 0; i < thread_num; ++i) {
+ pthread_join(threads[i], NULL);
+ }
+ // Wait for async log thread to flush all logs to file.
+ sleep(10);
+
+ std::ostringstream oss;
+ std::string cmd = butil::string_printf("grep -c %s %s",
+ log.c_str(), temp_file.fname());
+ ASSERT_LE(0, butil::read_command_output(oss, cmd.c_str()));
+ uint64_t log_count = std::strtol(oss.str().c_str(), NULL, 10);
+ ASSERT_EQ(log_count, test_logging_count.load());
+
+ FLAGS_async_log = saved_async_log;
+}
+
+struct BAIDU_CACHELINE_ALIGNMENT PerfArgs {
+ const std::string* log;
+ int64_t counter;
+ int64_t elapse_ns;
+ bool ready;
+
+ PerfArgs() : log(NULL), counter(0), elapse_ns(0), ready(false) {}
+};
+
+void* test_log(void* void_arg) {
+ auto args = (PerfArgs*)void_arg;
+ args->ready = true;
+ butil::Timer t;
+ while (!g_stopped) {
+ if (g_started) {
+ break;
+ }
+ usleep(10);
+ }
+ t.start();
+ while (!g_stopped) {
+ {
+ LOG(INFO) << *args->log;
+ test_logging_count.fetch_add(1, butil::memory_order_relaxed);
+ }
+ ++args->counter;
+ }
+ t.stop();
+ args->elapse_ns = t.n_elapsed();
+ return NULL;
+}
+
+void PerfTest(int thread_num, const std::string& log, bool async) {
+ FLAGS_async_log = async;
+
+ g_started = false;
+ g_stopped = false;
+ pthread_t threads[thread_num];
+ std::vector<PerfArgs> args(thread_num);
+ for (int i = 0; i < thread_num; ++i) {
+ args[i].log = &log;
+ ASSERT_EQ(0, pthread_create(&threads[i], NULL, test_log, &args[i]));
+ }
+ while (true) {
+ bool all_ready = true;
+ for (int i = 0; i < thread_num; ++i) {
+ if (!args[i].ready) {
+ all_ready = false;
+ break;
+ }
+ }
+ if (all_ready) {
+ break;
+ }
+ usleep(1000);
+ }
+ g_started = true;
+ char prof_name[32];
+ snprintf(prof_name, sizeof(prof_name), "logging_%d.prof",
++g_prof_name_counter);
+ ProfilerStart(prof_name);
+ sleep(5);
+ ProfilerStop();
+ g_stopped = true;
+ int64_t wait_time = 0;
+ int64_t count = 0;
+ for (int i = 0; i < thread_num; ++i) {
+ pthread_join(threads[i], NULL);
+ wait_time += args[i].elapse_ns;
+ count += args[i].counter;
+ }
+ std::cout << " thread_num=" << thread_num
+ << " log_type=" << (async ? "async" : "sync")
+ << " log_size=" << log.size()
+ << " count=" << count
+ << " average_time=" << wait_time / (double)count
+ << std::endl;
+}
+
+TEST_F(LoggingTest, performance) {
+ bool saved_async_log = FLAGS_async_log;
+
+ LoggingSettings settings;
+ settings.logging_dest = LOG_TO_FILE;
+ InitLogging(settings);
+ std::string log(64, 'a');
+ int thread_num = 1;
+ PerfTest(thread_num, log, true);
+ sleep(10);
+ PerfTest(thread_num, log, false);
+
+ thread_num = 2;
+ PerfTest(thread_num, log, true);
+ sleep(10);
+ PerfTest(thread_num, log, false);
+
+ thread_num = 4;
+ PerfTest(thread_num, log, true);
+ sleep(10);
+ PerfTest(thread_num, log, false);
+
+ FLAGS_async_log = saved_async_log;
+}
+
} // namespace
} // namespace logging
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]