This is an automated email from the ASF dual-hosted git repository. wwbmmm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push: new 313258e9 Support async logging (#2413) 313258e9 is described below commit 313258e9825a9b773cf0a7b52dff52bcf22a4b0e Author: Bright Chen <chenguangmin...@foxmail.com> AuthorDate: Tue Jan 23 13:29:46 2024 +0800 Support async logging (#2413) * Support async logging * Set max async log queue size * Flush async log before exit * Use sync log for the first LogRequest * Support usec of logging for macOS --- src/butil/logging.cc | 342 +++++++++++++++++++++++++++++++++++++++++++---- test/logging_unittest.cc | 162 +++++++++++++++++++++- 2 files changed, 476 insertions(+), 28 deletions(-) diff --git a/src/butil/logging.cc b/src/butil/logging.cc index 544a5cab..032a5448 100644 --- a/src/butil/logging.cc +++ b/src/butil/logging.cc @@ -73,8 +73,11 @@ typedef pthread_mutex_t* MutexHandle; #include "butil/strings/string_util.h" #include "butil/strings/stringprintf.h" #include "butil/strings/utf_string_conversions.h" -#include "butil/synchronization/lock.h" +#include "butil/synchronization/condition_variable.h" #include "butil/threading/platform_thread.h" +#include "butil/threading/simple_thread.h" +#include "butil/object_pool.h" + #if defined(OS_POSIX) #include "butil/errno.h" #include "butil/fd_guard.h" @@ -144,6 +147,17 @@ DEFINE_bool(log_year, false, "Log year in datetime part in each log"); DEFINE_bool(log_func_name, false, "Log function name in each log"); +DEFINE_bool(async_log, false, "Use async log"); + +DEFINE_bool(async_log_in_background_always, false, "Async log written in background always."); + +DEFINE_int32(max_async_log_queue_size, 100000, "Max async log size. " + "If current log count of async log > max_async_log_size, " + "Use sync log to protect process."); + +DEFINE_int32(sleep_to_flush_async_log_s, 0, + "If the value > 0, sleep before atexit to flush async log"); + namespace { LoggingDestination logging_destination = LOG_DEFAULT; @@ -399,8 +413,300 @@ void CloseLogFileUnlocked() { log_file = NULL; } +void Log2File(const std::string& log) { + // We can have multiple threads and/or processes, so try to prevent them + // from clobbering each other's writes. + // If the client app did not call InitLogging, and the lock has not + // been created do it now. We do this on demand, but if two threads try + // to do this at the same time, there will be a race condition to create + // the lock. This is why InitLogging should be called from the main + // thread at the beginning of execution. + LoggingLock::Init(LOCK_LOG_FILE, NULL); + LoggingLock logging_lock; + if (InitializeLogFileHandle()) { +#if defined(OS_WIN) + SetFilePointer(log_file, 0, 0, SEEK_END); + DWORD num_written; + WriteFile(log_file, + static_cast<const void*>(log.data()), + static_cast<DWORD>(log.size()), + &num_written, + NULL); +#else + fwrite(log.data(), log.size(), 1, log_file); + fflush(log_file); +#endif + } +} + } // namespace +struct BAIDU_CACHELINE_ALIGNMENT LogRequest { + static LogRequest* const UNCONNECTED; + + LogRequest* next{NULL}; + std::string data; +}; + +LogRequest* const LogRequest::UNCONNECTED = (LogRequest*)(intptr_t)-1; + +class AsyncLogger : public butil::SimpleThread { +public: + static AsyncLogger* GetInstance(); + + void Log(const std::string& log); + void Log(std::string&& log); + void StopAndJoin(); + +private: +friend struct DefaultSingletonTraits<AsyncLogger>; + + static LogRequest _stop_req; + + AsyncLogger(); + ~AsyncLogger() override; + + static void AtExit() { + GetInstance()->StopAndJoin(); + if (FLAGS_sleep_to_flush_async_log_s > 0) { + ::sleep(FLAGS_sleep_to_flush_async_log_s); + } + } + + void LogImpl(LogRequest* log_req); + + void Run() override; + + void LogTask(LogRequest* req); + + bool IsLogComplete(LogRequest* old_head); + + void DoLog(LogRequest* req); + void DoLog(const std::string& log); + + butil::atomic<LogRequest*> _log_head; + butil::Mutex _mutex; + butil::ConditionVariable _cond; + LogRequest* _current_log_request; + butil::atomic<int32_t> _log_request_count; + butil::atomic<bool> _stop; +}; + +AsyncLogger* AsyncLogger::GetInstance() { + return Singleton<AsyncLogger, + LeakySingletonTraits<AsyncLogger>>::get(); +} + +AsyncLogger::AsyncLogger() + : butil::SimpleThread("async_log_thread") + , _log_head(NULL) + , _cond(&_mutex) + , _current_log_request(NULL) + , _stop(false) { + Start(); + // We need to stop async logger and + // flush all async log before exit. + atexit(AtExit); +} + +AsyncLogger::~AsyncLogger() { + StopAndJoin(); +} + +void AsyncLogger::Log(const std::string& log) { + if (log.empty()) { + return; + } + + bool is_full = FLAGS_max_async_log_queue_size > 0 && + _log_request_count.fetch_add(1, butil::memory_order_relaxed) > + FLAGS_max_async_log_queue_size; + if (is_full || _stop.load(butil::memory_order_relaxed)) { + // Async logger is full or stopped, fallback to sync log. + DoLog(log); + return; + } + + auto log_req = butil::get_object<LogRequest>(); + if (!log_req) { + // Async log failed, fallback to sync log. + DoLog(log); + return; + } + log_req->data = log; + LogImpl(log_req); +} + +void AsyncLogger::Log(std::string&& log) { + if (log.empty()) { + return; + } + + bool is_full = FLAGS_max_async_log_queue_size > 0 && + _log_request_count.fetch_add(1, butil::memory_order_relaxed) > + FLAGS_max_async_log_queue_size; + if (is_full || _stop.load(butil::memory_order_relaxed)) { + // Async logger is full or stopped, fallback to sync log. + DoLog(log); + return; + } + + auto log_req = butil::get_object<LogRequest>(); + if (!log_req) { + // Async log failed, fallback to sync log. + DoLog(log); + return; + } + log_req->data = std::move(log); + LogImpl(log_req); +} + +void AsyncLogger::LogImpl(LogRequest* log_req) { + log_req->next = LogRequest::UNCONNECTED; + // Release fence makes sure the thread getting request sees *req + LogRequest* const prev_head = + _log_head.exchange(log_req, butil::memory_order_release); + if (prev_head != NULL) { + // Someone is logging. The async_log_thread thread may spin + // until req->next to be non-UNCONNECTED. This process is not + // lock-free, but the duration is so short(1~2 instructions, + // depending on compiler) that the spin rarely occurs in practice + // (I've not seen any spin in highly contended tests). + log_req->next = prev_head; + return; + } + // We've got the right to write. + log_req->next = NULL; + + if (!FLAGS_async_log_in_background_always) { + // Use sync log for the LogRequest + // which has got the right to write. + DoLog(log_req); + // Return when there's no more LogRequests. + if (IsLogComplete(log_req)) { + butil::return_object(log_req); + return; + } + } + + BAIDU_SCOPED_LOCK(_mutex); + if (_stop.load(butil::memory_order_relaxed)) { + // Async logger is stopped, fallback to sync log. + LogTask(log_req); + } else { + // Wake up async logger. + _current_log_request = log_req; + _cond.Signal(); + } +} + +void AsyncLogger::StopAndJoin() { + if (!_stop.exchange(true, butil::memory_order_relaxed)) { + BAIDU_SCOPED_LOCK(_mutex); + _cond.Signal(); + } + if (!HasBeenJoined()) { + Join(); + } +} + +void AsyncLogger::Run() { + while (true) { + BAIDU_SCOPED_LOCK(_mutex); + while (!_stop.load(butil::memory_order_relaxed) && + !_current_log_request) { + _cond.Wait(); + } + if (_stop.load(butil::memory_order_relaxed) && + !_current_log_request) { + break; + } + + LogTask(_current_log_request); + _current_log_request = NULL; + } +} + +void AsyncLogger::LogTask(LogRequest* req) { + do { + // req was logged, skip it. + if (req->next != NULL && req->data.empty()) { + LogRequest* const saved_req = req; + req = req->next; + butil::return_object(saved_req); + } + + // Log all requests to file. + while (req->next != NULL) { + LogRequest* const saved_req = req; + req = req->next; + if (!saved_req->data.empty()) { + DoLog(saved_req); + } + // Release LogRequests until last request. + butil::return_object(saved_req); + } + if (!req->data.empty()) { + DoLog(req); + } + + // Return when there's no more LogRequests. + if (IsLogComplete(req)) { + butil::return_object(req); + return; + } + } while (true); +} + +bool AsyncLogger::IsLogComplete(LogRequest* old_head) { + if (old_head->next) { + fprintf(stderr, "old_head->next should be NULL\n"); + } + LogRequest* new_head = old_head; + LogRequest* desired = NULL; + if (_log_head.compare_exchange_strong( + new_head, desired, butil::memory_order_acquire)) { + // No one added new requests. + return true; + } + if (new_head == old_head) { + fprintf(stderr, "new_head should not be equal to old_head\n"); + } + // Above acquire fence pairs release fence of exchange in Log() to make + // sure that we see all fields of requests set. + + // Someone added new requests. + // Reverse the list until old_head. + LogRequest* tail = NULL; + LogRequest* p = new_head; + do { + while (p->next == LogRequest::UNCONNECTED) { + sched_yield(); + } + LogRequest* const saved_next = p->next; + p->next = tail; + tail = p; + p = saved_next; + if (!p) { + fprintf(stderr, "p should not be NULL\n"); + } + } while (p != old_head); + + // Link old list with new list. + old_head->next = tail; + return false; +} + +void AsyncLogger::DoLog(LogRequest* req) { + DoLog(req->data); + req->data.clear(); +} + +void AsyncLogger::DoLog(const std::string& log) { + Log2File(log); + _log_request_count.fetch_sub(1); +} + LoggingSettings::LoggingSettings() : logging_dest(LOG_DEFAULT), log_file(NULL), @@ -473,7 +779,7 @@ void PrintLogPrefix(std::ostream& os, int severity, const char* file, int line, const char* func) { PrintLogSeverity(os, severity); -#if defined(OS_LINUX) +#if defined(OS_LINUX) || defined(OS_MACOSX) timeval tv; gettimeofday(&tv, NULL); time_t t = tv.tv_sec; @@ -495,7 +801,7 @@ void PrintLogPrefix(std::ostream& os, int severity, << std::setw(2) << local_tm.tm_hour << ':' << std::setw(2) << local_tm.tm_min << ':' << std::setw(2) << local_tm.tm_sec; -#if defined(OS_LINUX) +#if defined(OS_LINUX) || defined(OS_MACOSX) os << '.' << std::setw(6) << tv.tv_usec; #endif if (FLAGS_log_pid) { @@ -957,35 +1263,17 @@ public: // write to log file if ((logging_destination & LOG_TO_FILE) != 0) { - // We can have multiple threads and/or processes, so try to prevent them - // from clobbering each other's writes. - // If the client app did not call InitLogging, and the lock has not - // been created do it now. We do this on demand, but if two threads try - // to do this at the same time, there will be a race condition to create - // the lock. This is why InitLogging should be called from the main - // thread at the beginning of execution. - LoggingLock::Init(LOCK_LOG_FILE, NULL); - LoggingLock logging_lock; - if (InitializeLogFileHandle()) { -#if defined(OS_WIN) - SetFilePointer(log_file, 0, 0, SEEK_END); - DWORD num_written; - WriteFile(log_file, - static_cast<const void*>(log.data()), - static_cast<DWORD>(log.size()), - &num_written, - NULL); -#else - fwrite(log.data(), log.size(), 1, log_file); - fflush(log_file); -#endif + if (FLAGS_async_log) { + AsyncLogger::GetInstance()->Log(std::move(log)); + } else { + Log2File(log); } } return true; } private: - DefaultLogSink() {} - ~DefaultLogSink() {} + DefaultLogSink() = default; + ~DefaultLogSink() override = default; friend struct DefaultSingletonTraits<DefaultLogSink>; }; diff --git a/test/logging_unittest.cc b/test/logging_unittest.cc index ca025ee9..66834b98 100644 --- a/test/logging_unittest.cc +++ b/test/logging_unittest.cc @@ -4,7 +4,9 @@ #include "butil/basictypes.h" #include "butil/logging.h" - +#include "butil/gperftools_profiler.h" +#include "butil/files/temp_file.h" +#include "butil/popen.h" #include <gtest/gtest.h> #include <gflags/gflags.h> @@ -14,6 +16,7 @@ namespace logging { DECLARE_bool(crash_on_fatal_log); DECLARE_int32(v); DECLARE_bool(log_func_name); +DECLARE_bool(async_log); namespace { @@ -475,6 +478,163 @@ TEST_F(LoggingTest, log_func) { ::logging::FLAGS_crash_on_fatal_log = old_crash_on_fatal_log; } +bool g_started = false; +bool g_stopped = false; +int g_prof_name_counter = 0; +butil::atomic<uint64_t> test_logging_count(0); + +void* test_async_log(void* arg) { + if (arg == NULL) { + return NULL; + } + auto log = (std::string*)(arg); + while (!g_stopped) { + LOG(INFO) << *log; + test_logging_count.fetch_add(1); + } + + return NULL; +} + +TEST_F(LoggingTest, async_log) { + bool saved_async_log = FLAGS_async_log; + FLAGS_async_log = true; + butil::TempFile temp_file; + LoggingSettings settings; + settings.logging_dest = LOG_TO_FILE; + settings.log_file = temp_file.fname(); + settings.delete_old = DELETE_OLD_LOG_FILE; + InitLogging(settings); + + std::string log = "135792468"; + int thread_num = 8; + pthread_t threads[thread_num]; + for (int i = 0; i < thread_num; ++i) { + ASSERT_EQ(0, pthread_create(&threads[i], NULL, test_async_log, &log)); + } + + sleep(5); + + g_stopped = true; + for (int i = 0; i < thread_num; ++i) { + pthread_join(threads[i], NULL); + } + // Wait for async log thread to flush all logs to file. + sleep(10); + + std::ostringstream oss; + std::string cmd = butil::string_printf("grep -c %s %s", + log.c_str(), temp_file.fname()); + ASSERT_LE(0, butil::read_command_output(oss, cmd.c_str())); + uint64_t log_count = std::strtol(oss.str().c_str(), NULL, 10); + ASSERT_EQ(log_count, test_logging_count.load()); + + FLAGS_async_log = saved_async_log; +} + +struct BAIDU_CACHELINE_ALIGNMENT PerfArgs { + const std::string* log; + int64_t counter; + int64_t elapse_ns; + bool ready; + + PerfArgs() : log(NULL), counter(0), elapse_ns(0), ready(false) {} +}; + +void* test_log(void* void_arg) { + auto args = (PerfArgs*)void_arg; + args->ready = true; + butil::Timer t; + while (!g_stopped) { + if (g_started) { + break; + } + usleep(10); + } + t.start(); + while (!g_stopped) { + { + LOG(INFO) << *args->log; + test_logging_count.fetch_add(1, butil::memory_order_relaxed); + } + ++args->counter; + } + t.stop(); + args->elapse_ns = t.n_elapsed(); + return NULL; +} + +void PerfTest(int thread_num, const std::string& log, bool async) { + FLAGS_async_log = async; + + g_started = false; + g_stopped = false; + pthread_t threads[thread_num]; + std::vector<PerfArgs> args(thread_num); + for (int i = 0; i < thread_num; ++i) { + args[i].log = &log; + ASSERT_EQ(0, pthread_create(&threads[i], NULL, test_log, &args[i])); + } + while (true) { + bool all_ready = true; + for (int i = 0; i < thread_num; ++i) { + if (!args[i].ready) { + all_ready = false; + break; + } + } + if (all_ready) { + break; + } + usleep(1000); + } + g_started = true; + char prof_name[32]; + snprintf(prof_name, sizeof(prof_name), "logging_%d.prof", ++g_prof_name_counter); + ProfilerStart(prof_name); + sleep(5); + ProfilerStop(); + g_stopped = true; + int64_t wait_time = 0; + int64_t count = 0; + for (int i = 0; i < thread_num; ++i) { + pthread_join(threads[i], NULL); + wait_time += args[i].elapse_ns; + count += args[i].counter; + } + std::cout << " thread_num=" << thread_num + << " log_type=" << (async ? "async" : "sync") + << " log_size=" << log.size() + << " count=" << count + << " average_time=" << wait_time / (double)count + << std::endl; +} + +TEST_F(LoggingTest, performance) { + bool saved_async_log = FLAGS_async_log; + + LoggingSettings settings; + settings.logging_dest = LOG_TO_FILE; + InitLogging(settings); + std::string log(64, 'a'); + int thread_num = 1; + PerfTest(thread_num, log, true); + sleep(10); + PerfTest(thread_num, log, false); + + thread_num = 2; + PerfTest(thread_num, log, true); + sleep(10); + PerfTest(thread_num, log, false); + + thread_num = 4; + PerfTest(thread_num, log, true); + sleep(10); + PerfTest(thread_num, log, false); + + FLAGS_async_log = saved_async_log; +} + } // namespace } // namespace logging --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org