This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 313258e9 Support async logging (#2413)
313258e9 is described below

commit 313258e9825a9b773cf0a7b52dff52bcf22a4b0e
Author: Bright Chen <chenguangmin...@foxmail.com>
AuthorDate: Tue Jan 23 13:29:46 2024 +0800

    Support async logging (#2413)
    
    * Support async logging
    
    * Set max async log queue size
    
    * Flush async log before exit
    
    * Use sync log for the first LogRequest
    
    * Support usec of logging for macOS
---
 src/butil/logging.cc     | 342 +++++++++++++++++++++++++++++++++++++++++++----
 test/logging_unittest.cc | 162 +++++++++++++++++++++-
 2 files changed, 476 insertions(+), 28 deletions(-)

diff --git a/src/butil/logging.cc b/src/butil/logging.cc
index 544a5cab..032a5448 100644
--- a/src/butil/logging.cc
+++ b/src/butil/logging.cc
@@ -73,8 +73,11 @@ typedef pthread_mutex_t* MutexHandle;
 #include "butil/strings/string_util.h"
 #include "butil/strings/stringprintf.h"
 #include "butil/strings/utf_string_conversions.h"
-#include "butil/synchronization/lock.h"
+#include "butil/synchronization/condition_variable.h"
 #include "butil/threading/platform_thread.h"
+#include "butil/threading/simple_thread.h"
+#include "butil/object_pool.h"
+
 #if defined(OS_POSIX)
 #include "butil/errno.h"
 #include "butil/fd_guard.h"
@@ -144,6 +147,17 @@ DEFINE_bool(log_year, false, "Log year in datetime part in 
each log");
 
 DEFINE_bool(log_func_name, false, "Log function name in each log");
 
+DEFINE_bool(async_log, false, "Use async log");
+
+DEFINE_bool(async_log_in_background_always, false, "Async log written in 
background always.");
+
+DEFINE_int32(max_async_log_queue_size, 100000, "Max async log size. "
+             "If current log count of async log > max_async_log_size, "
+             "Use sync log to protect process.");
+
+DEFINE_int32(sleep_to_flush_async_log_s, 0,
+             "If the value > 0, sleep before atexit to flush async log");
+
 namespace {
 
 LoggingDestination logging_destination = LOG_DEFAULT;
@@ -399,8 +413,300 @@ void CloseLogFileUnlocked() {
     log_file = NULL;
 }
 
+void Log2File(const std::string& log) {
+    // We can have multiple threads and/or processes, so try to prevent them
+    // from clobbering each other's writes.
+    // If the client app did not call InitLogging, and the lock has not
+    // been created do it now. We do this on demand, but if two threads try
+    // to do this at the same time, there will be a race condition to create
+    // the lock. This is why InitLogging should be called from the main
+    // thread at the beginning of execution.
+    LoggingLock::Init(LOCK_LOG_FILE, NULL);
+    LoggingLock logging_lock;
+    if (InitializeLogFileHandle()) {
+#if defined(OS_WIN)
+        SetFilePointer(log_file, 0, 0, SEEK_END);
+                DWORD num_written;
+                WriteFile(log_file,
+                          static_cast<const void*>(log.data()),
+                          static_cast<DWORD>(log.size()),
+                          &num_written,
+                          NULL);
+#else
+        fwrite(log.data(), log.size(), 1, log_file);
+        fflush(log_file);
+#endif
+    }
+}
+
 }  // namespace
 
+struct BAIDU_CACHELINE_ALIGNMENT LogRequest {
+    static LogRequest* const UNCONNECTED;
+
+    LogRequest* next{NULL};
+    std::string data;
+};
+
+LogRequest* const LogRequest::UNCONNECTED = (LogRequest*)(intptr_t)-1;
+
+class AsyncLogger : public butil::SimpleThread {
+public:
+    static AsyncLogger* GetInstance();
+
+    void Log(const std::string& log);
+    void Log(std::string&& log);
+    void StopAndJoin();
+
+private:
+friend struct DefaultSingletonTraits<AsyncLogger>;
+
+    static LogRequest _stop_req;
+
+    AsyncLogger();
+    ~AsyncLogger() override;
+
+    static void AtExit() {
+        GetInstance()->StopAndJoin();
+        if (FLAGS_sleep_to_flush_async_log_s > 0) {
+            ::sleep(FLAGS_sleep_to_flush_async_log_s);
+        }
+    }
+
+    void LogImpl(LogRequest* log_req);
+
+    void Run() override;
+
+    void LogTask(LogRequest* req);
+
+    bool IsLogComplete(LogRequest* old_head);
+
+    void DoLog(LogRequest* req);
+    void DoLog(const std::string& log);
+
+    butil::atomic<LogRequest*> _log_head;
+    butil::Mutex _mutex;
+    butil::ConditionVariable _cond;
+    LogRequest* _current_log_request;
+    butil::atomic<int32_t> _log_request_count;
+    butil::atomic<bool> _stop;
+};
+
+AsyncLogger* AsyncLogger::GetInstance() {
+    return Singleton<AsyncLogger,
+                     LeakySingletonTraits<AsyncLogger>>::get();
+}
+
+AsyncLogger::AsyncLogger()
+    : butil::SimpleThread("async_log_thread")
+    , _log_head(NULL)
+    , _cond(&_mutex)
+    , _current_log_request(NULL)
+    , _stop(false) {
+    Start();
+    // We need to stop async logger and
+    // flush all async log before exit.
+    atexit(AtExit);
+}
+
+AsyncLogger::~AsyncLogger() {
+    StopAndJoin();
+}
+
+void AsyncLogger::Log(const std::string& log) {
+    if (log.empty()) {
+        return;
+    }
+
+    bool is_full = FLAGS_max_async_log_queue_size > 0 &&
+        _log_request_count.fetch_add(1, butil::memory_order_relaxed) >
+        FLAGS_max_async_log_queue_size;
+    if (is_full || _stop.load(butil::memory_order_relaxed)) {
+        // Async logger is full or stopped, fallback to sync log.
+        DoLog(log);
+        return;
+    }
+
+    auto log_req = butil::get_object<LogRequest>();
+    if (!log_req) {
+        // Async log failed, fallback to sync log.
+        DoLog(log);
+        return;
+    }
+    log_req->data = log;
+    LogImpl(log_req);
+}
+
+void AsyncLogger::Log(std::string&& log) {
+    if (log.empty()) {
+        return;
+    }
+
+    bool is_full = FLAGS_max_async_log_queue_size > 0 &&
+        _log_request_count.fetch_add(1, butil::memory_order_relaxed) >
+        FLAGS_max_async_log_queue_size;
+    if (is_full || _stop.load(butil::memory_order_relaxed)) {
+        // Async logger is full or stopped, fallback to sync log.
+        DoLog(log);
+        return;
+    }
+
+    auto log_req = butil::get_object<LogRequest>();
+    if (!log_req) {
+        // Async log failed, fallback to sync log.
+        DoLog(log);
+        return;
+    }
+    log_req->data = std::move(log);
+    LogImpl(log_req);
+}
+
+void AsyncLogger::LogImpl(LogRequest* log_req) {
+    log_req->next = LogRequest::UNCONNECTED;
+    // Release fence makes sure the thread getting request sees *req
+    LogRequest* const prev_head =
+        _log_head.exchange(log_req, butil::memory_order_release);
+    if (prev_head != NULL) {
+        // Someone is logging. The async_log_thread thread may spin
+        // until req->next to be non-UNCONNECTED. This process is not
+        // lock-free, but the duration is so short(1~2 instructions,
+        // depending on compiler) that the spin rarely occurs in practice
+        // (I've not seen any spin in highly contended tests).
+        log_req->next = prev_head;
+        return;
+    }
+    // We've got the right to write.
+    log_req->next = NULL;
+
+    if (!FLAGS_async_log_in_background_always) {
+        // Use sync log for the LogRequest
+        // which has got the right to write.
+        DoLog(log_req);
+        // Return when there's no more LogRequests.
+        if (IsLogComplete(log_req)) {
+            butil::return_object(log_req);
+            return;
+        }
+    }
+
+    BAIDU_SCOPED_LOCK(_mutex);
+    if (_stop.load(butil::memory_order_relaxed)) {
+        // Async logger is stopped, fallback to sync log.
+        LogTask(log_req);
+    } else {
+        // Wake up async logger.
+        _current_log_request = log_req;
+        _cond.Signal();
+    }
+}
+
+void AsyncLogger::StopAndJoin() {
+    if (!_stop.exchange(true, butil::memory_order_relaxed)) {
+        BAIDU_SCOPED_LOCK(_mutex);
+        _cond.Signal();
+    }
+    if (!HasBeenJoined()) {
+        Join();
+    }
+}
+
+void AsyncLogger::Run() {
+    while (true) {
+        BAIDU_SCOPED_LOCK(_mutex);
+        while (!_stop.load(butil::memory_order_relaxed) &&
+               !_current_log_request) {
+            _cond.Wait();
+        }
+        if (_stop.load(butil::memory_order_relaxed) &&
+            !_current_log_request) {
+            break;
+        }
+
+        LogTask(_current_log_request);
+        _current_log_request = NULL;
+    }
+}
+
+void AsyncLogger::LogTask(LogRequest* req) {
+    do {
+        // req was logged, skip it.
+        if (req->next != NULL && req->data.empty()) {
+            LogRequest* const saved_req = req;
+            req = req->next;
+            butil::return_object(saved_req);
+        }
+
+        // Log all requests to file.
+        while (req->next != NULL) {
+            LogRequest* const saved_req = req;
+            req = req->next;
+            if (!saved_req->data.empty()) {
+                DoLog(saved_req);
+            }
+            // Release LogRequests until last request.
+            butil::return_object(saved_req);
+        }
+        if (!req->data.empty()) {
+            DoLog(req);
+        }
+
+        // Return when there's no more LogRequests.
+        if (IsLogComplete(req)) {
+            butil::return_object(req);
+            return;
+        }
+    } while (true);
+}
+
+bool AsyncLogger::IsLogComplete(LogRequest* old_head) {
+    if (old_head->next) {
+        fprintf(stderr, "old_head->next should be NULL\n");
+    }
+    LogRequest* new_head = old_head;
+    LogRequest* desired = NULL;
+    if (_log_head.compare_exchange_strong(
+        new_head, desired, butil::memory_order_acquire)) {
+        // No one added new requests.
+        return true;
+    }
+    if (new_head == old_head) {
+        fprintf(stderr, "new_head should not be equal to old_head\n");
+    }
+    // Above acquire fence pairs release fence of exchange in Log() to make
+    // sure that we see all fields of requests set.
+
+    // Someone added new requests.
+    // Reverse the list until old_head.
+    LogRequest* tail = NULL;
+    LogRequest* p = new_head;
+    do {
+        while (p->next == LogRequest::UNCONNECTED) {
+            sched_yield();
+        }
+        LogRequest* const saved_next = p->next;
+        p->next = tail;
+        tail = p;
+        p = saved_next;
+        if (!p) {
+            fprintf(stderr, "p should not be NULL\n");
+        }
+    } while (p != old_head);
+
+    // Link old list with new list.
+    old_head->next = tail;
+    return false;
+}
+
+void AsyncLogger::DoLog(LogRequest* req) {
+    DoLog(req->data);
+    req->data.clear();
+}
+
+void AsyncLogger::DoLog(const std::string& log) {
+    Log2File(log);
+    _log_request_count.fetch_sub(1);
+}
+
 LoggingSettings::LoggingSettings()
     : logging_dest(LOG_DEFAULT),
       log_file(NULL),
@@ -473,7 +779,7 @@ void PrintLogPrefix(std::ostream& os, int severity,
                     const char* file, int line,
                     const char* func) {
     PrintLogSeverity(os, severity);
-#if defined(OS_LINUX)
+#if defined(OS_LINUX) || defined(OS_MACOSX)
     timeval tv;
     gettimeofday(&tv, NULL);
     time_t t = tv.tv_sec;
@@ -495,7 +801,7 @@ void PrintLogPrefix(std::ostream& os, int severity,
        << std::setw(2) << local_tm.tm_hour << ':'
        << std::setw(2) << local_tm.tm_min << ':'
        << std::setw(2) << local_tm.tm_sec;
-#if defined(OS_LINUX)
+#if defined(OS_LINUX) || defined(OS_MACOSX)
     os << '.' << std::setw(6) << tv.tv_usec;
 #endif
     if (FLAGS_log_pid) {
@@ -957,35 +1263,17 @@ public:
 
         // write to log file
         if ((logging_destination & LOG_TO_FILE) != 0) {
-            // We can have multiple threads and/or processes, so try to 
prevent them
-            // from clobbering each other's writes.
-            // If the client app did not call InitLogging, and the lock has not
-            // been created do it now. We do this on demand, but if two 
threads try
-            // to do this at the same time, there will be a race condition to 
create
-            // the lock. This is why InitLogging should be called from the main
-            // thread at the beginning of execution.
-            LoggingLock::Init(LOCK_LOG_FILE, NULL);
-            LoggingLock logging_lock;
-            if (InitializeLogFileHandle()) {
-#if defined(OS_WIN)
-                SetFilePointer(log_file, 0, 0, SEEK_END);
-                DWORD num_written;
-                WriteFile(log_file,
-                          static_cast<const void*>(log.data()),
-                          static_cast<DWORD>(log.size()),
-                          &num_written,
-                          NULL);
-#else
-                fwrite(log.data(), log.size(), 1, log_file);
-                fflush(log_file);
-#endif
+            if (FLAGS_async_log) {
+                AsyncLogger::GetInstance()->Log(std::move(log));
+            } else {
+                Log2File(log);
             }
         }
         return true;
     }
 private:
-    DefaultLogSink() {}
-    ~DefaultLogSink() {}
+    DefaultLogSink() = default;
+    ~DefaultLogSink() override = default;
 friend struct DefaultSingletonTraits<DefaultLogSink>;
 };
 
diff --git a/test/logging_unittest.cc b/test/logging_unittest.cc
index ca025ee9..66834b98 100644
--- a/test/logging_unittest.cc
+++ b/test/logging_unittest.cc
@@ -4,7 +4,9 @@
 
 #include "butil/basictypes.h"
 #include "butil/logging.h"
-
+#include "butil/gperftools_profiler.h"
+#include "butil/files/temp_file.h"
+#include "butil/popen.h"
 #include <gtest/gtest.h>
 #include <gflags/gflags.h>
 
@@ -14,6 +16,7 @@ namespace logging {
 DECLARE_bool(crash_on_fatal_log);
 DECLARE_int32(v);
 DECLARE_bool(log_func_name);
+DECLARE_bool(async_log);
 
 namespace {
 
@@ -475,6 +478,163 @@ TEST_F(LoggingTest, log_func) {
     ::logging::FLAGS_crash_on_fatal_log = old_crash_on_fatal_log;
 }
 
+bool g_started = false;
+bool g_stopped = false;
+int g_prof_name_counter = 0;
+butil::atomic<uint64_t> test_logging_count(0);
+
+void* test_async_log(void* arg) {
+    if (arg == NULL) {
+        return NULL;
+    }
+    auto log = (std::string*)(arg);
+    while (!g_stopped) {
+        LOG(INFO) << *log;
+        test_logging_count.fetch_add(1);
+    }
+
+    return NULL;
+}
+
+TEST_F(LoggingTest, async_log) {
+    bool saved_async_log = FLAGS_async_log;
+    FLAGS_async_log = true;
+    butil::TempFile temp_file;
+    LoggingSettings settings;
+    settings.logging_dest = LOG_TO_FILE;
+    settings.log_file = temp_file.fname();
+    settings.delete_old = DELETE_OLD_LOG_FILE;
+    InitLogging(settings);
+
+    std::string log = "135792468";
+    int thread_num = 8;
+    pthread_t threads[thread_num];
+    for (int i = 0; i < thread_num; ++i) {
+        ASSERT_EQ(0, pthread_create(&threads[i], NULL, test_async_log, &log));
+    }
+
+    sleep(5);
+
+    g_stopped = true;
+    for (int i = 0; i < thread_num; ++i) {
+        pthread_join(threads[i], NULL);
+    }
+    // Wait for async log thread to flush all logs to file.
+    sleep(10);
+
+    std::ostringstream oss;
+    std::string cmd = butil::string_printf("grep -c %s %s",
+        log.c_str(), temp_file.fname());
+    ASSERT_LE(0, butil::read_command_output(oss, cmd.c_str()));
+    uint64_t log_count = std::strtol(oss.str().c_str(), NULL, 10);
+    ASSERT_EQ(log_count, test_logging_count.load());
+
+    FLAGS_async_log = saved_async_log;
+}
+
+struct BAIDU_CACHELINE_ALIGNMENT PerfArgs {
+    const std::string* log;
+    int64_t counter;
+    int64_t elapse_ns;
+    bool ready;
+
+    PerfArgs() : log(NULL), counter(0), elapse_ns(0), ready(false) {}
+};
+
+void* test_log(void* void_arg) {
+    auto args = (PerfArgs*)void_arg;
+    args->ready = true;
+    butil::Timer t;
+    while (!g_stopped) {
+        if (g_started) {
+            break;
+        }
+        usleep(10);
+    }
+    t.start();
+    while (!g_stopped) {
+        {
+            LOG(INFO) << *args->log;
+            test_logging_count.fetch_add(1, butil::memory_order_relaxed);
+        }
+        ++args->counter;
+    }
+    t.stop();
+    args->elapse_ns = t.n_elapsed();
+    return NULL;
+}
+
+void PerfTest(int thread_num, const std::string& log, bool async) {
+    FLAGS_async_log = async;
+
+    g_started = false;
+    g_stopped = false;
+    pthread_t threads[thread_num];
+    std::vector<PerfArgs> args(thread_num);
+    for (int i = 0; i < thread_num; ++i) {
+        args[i].log = &log;
+        ASSERT_EQ(0, pthread_create(&threads[i], NULL, test_log, &args[i]));
+    }
+    while (true) {
+        bool all_ready = true;
+        for (int i = 0; i < thread_num; ++i) {
+            if (!args[i].ready) {
+                all_ready = false;
+                break;
+            }
+        }
+        if (all_ready) {
+            break;
+        }
+        usleep(1000);
+    }
+    g_started = true;
+    char prof_name[32];
+    snprintf(prof_name, sizeof(prof_name), "logging_%d.prof", 
++g_prof_name_counter);
+    ProfilerStart(prof_name);
+    sleep(5);
+    ProfilerStop();
+    g_stopped = true;
+    int64_t wait_time = 0;
+    int64_t count = 0;
+    for (int i = 0; i < thread_num; ++i) {
+        pthread_join(threads[i], NULL);
+        wait_time += args[i].elapse_ns;
+        count += args[i].counter;
+    }
+    std::cout << " thread_num=" << thread_num
+              << " log_type=" << (async ? "async" : "sync")
+              << " log_size=" << log.size()
+              << " count=" << count
+              << " average_time=" << wait_time / (double)count
+              << std::endl;
+}
+
+TEST_F(LoggingTest, performance) {
+    bool saved_async_log = FLAGS_async_log;
+
+    LoggingSettings settings;
+    settings.logging_dest = LOG_TO_FILE;
+    InitLogging(settings);
+    std::string log(64, 'a');
+    int thread_num = 1;
+    PerfTest(thread_num, log, true);
+    sleep(10);
+    PerfTest(thread_num, log, false);
+
+    thread_num = 2;
+    PerfTest(thread_num, log, true);
+    sleep(10);
+    PerfTest(thread_num, log, false);
+
+    thread_num = 4;
+    PerfTest(thread_num, log, true);
+    sleep(10);
+    PerfTest(thread_num, log, false);
+
+    FLAGS_async_log = saved_async_log;
+}
+
 }  // namespace
 
 }  // namespace logging


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to