This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new bdc141c9 Opt performance of async log (#2602)
bdc141c9 is described below

commit bdc141c9e4f287f3b6e6397e09ea821838d273ee
Author: Bright Chen <chenguangmin...@foxmail.com>
AuthorDate: Mon May 27 09:38:03 2024 +0800

    Opt performance of async log (#2602)
    
    * Opt performance of async log
    
    * Fix ci
---
 src/butil/logging.cc     | 174 +++++++++++++++++++++++++++++------------------
 test/logging_unittest.cc |  43 ++++++------
 2 files changed, 132 insertions(+), 85 deletions(-)

diff --git a/src/butil/logging.cc b/src/butil/logging.cc
index 78436ec6..14c977bd 100644
--- a/src/butil/logging.cc
+++ b/src/butil/logging.cc
@@ -428,12 +428,9 @@ void Log2File(const std::string& log) {
     if (InitializeLogFileHandle()) {
 #if defined(OS_WIN)
         SetFilePointer(log_file, 0, 0, SEEK_END);
-                DWORD num_written;
-                WriteFile(log_file,
-                          static_cast<const void*>(log.data()),
-                          static_cast<DWORD>(log.size()),
-                          &num_written,
-                          NULL);
+        DWORD num_written;
+        WriteFile(log_file, static_cast<const void*>(log.data()),
+                  static_cast<DWORD>(log.size()), &num_written, NULL);
 #else
         fwrite(log.data(), log.size(), 1, log_file);
         fflush(log_file);
@@ -443,11 +440,41 @@ void Log2File(const std::string& log) {
 
 }  // namespace
 
+#if defined(OS_LINUX) || defined(OS_MACOSX)
+typedef timeval TimeVal;
+#else
+struct TimeVal {
+    time_t tv_sec;
+};
+#endif
+
+TimeVal GetTimestamp() {
+#if defined(OS_LINUX) || defined(OS_MACOSX)
+    timeval tv;
+    gettimeofday(&tv, NULL);
+    return tv;
+#else
+    return { time(NULL) };
+#endif
+}
+
+struct BAIDU_CACHELINE_ALIGNMENT LogInfo {
+    std::string file;
+    std::string func;
+    std::string content;
+    TimeVal timestamp{};
+    int severity{0};
+    int line{0};
+    // If `raw' is false, content has been a complete log.
+    // If raw is true, a complete log consists of all properties of LogInfo.
+    bool raw{false};
+};
+
 struct BAIDU_CACHELINE_ALIGNMENT LogRequest {
     static LogRequest* const UNCONNECTED;
 
     LogRequest* next{NULL};
-    std::string data;
+    LogInfo log_info;
 };
 
 LogRequest* const LogRequest::UNCONNECTED = (LogRequest*)(intptr_t)-1;
@@ -456,8 +483,8 @@ class AsyncLogger : public butil::SimpleThread {
 public:
     static AsyncLogger* GetInstance();
 
-    void Log(const std::string& log);
-    void Log(std::string&& log);
+    void Log(const LogInfo& log_info);
+    void Log(LogInfo&& log_info);
     void StopAndJoin();
 
 private:
@@ -484,7 +511,7 @@ friend struct DefaultSingletonTraits<AsyncLogger>;
     bool IsLogComplete(LogRequest* old_head);
 
     void DoLog(LogRequest* req);
-    void DoLog(const std::string& log);
+    void DoLog(const LogInfo& log_info);
 
     butil::atomic<LogRequest*> _log_head;
     butil::Mutex _mutex;
@@ -515,8 +542,26 @@ AsyncLogger::~AsyncLogger() {
     StopAndJoin();
 }
 
-void AsyncLogger::Log(const std::string& log) {
-    if (log.empty()) {
+std::string LogInfoToLogStr(int severity, butil::StringPiece file,
+                            int line, butil::StringPiece func,
+                            butil::StringPiece content) {
+    // There's a copy here to concatenate prefix and content. Since
+    // DefaultLogSink is hardly used right now, the copy is irrelevant.
+    // A LogSink focused on performance should also be able to handle
+    // non-continuous inputs which is a must to maximize performance.
+    std::ostringstream os;
+    PrintLog(os, severity, file.data(), line, func.data(), content);
+    os << '\n';
+    return os.str();
+}
+
+std::string LogInfo2LogStr(const LogInfo& log_info) {
+    return LogInfoToLogStr(log_info.severity, log_info.file, log_info.line,
+                           log_info.func, log_info.content);
+}
+
+void AsyncLogger::Log(const LogInfo& log_info) {
+    if (log_info.content.empty()) {
         return;
     }
 
@@ -525,22 +570,22 @@ void AsyncLogger::Log(const std::string& log) {
         FLAGS_max_async_log_queue_size;
     if (is_full || _stop.load(butil::memory_order_relaxed)) {
         // Async logger is full or stopped, fallback to sync log.
-        DoLog(log);
+        DoLog(log_info);
         return;
     }
 
     auto log_req = butil::get_object<LogRequest>();
     if (!log_req) {
         // Async log failed, fallback to sync log.
-        DoLog(log);
+        DoLog(log_info);
         return;
     }
-    log_req->data = log;
+    log_req->log_info = log_info;
     LogImpl(log_req);
 }
 
-void AsyncLogger::Log(std::string&& log) {
-    if (log.empty()) {
+void AsyncLogger::Log(LogInfo&& log_info) {
+    if (log_info.content.empty()) {
         return;
     }
 
@@ -549,17 +594,17 @@ void AsyncLogger::Log(std::string&& log) {
         FLAGS_max_async_log_queue_size;
     if (is_full || _stop.load(butil::memory_order_relaxed)) {
         // Async logger is full or stopped, fallback to sync log.
-        DoLog(log);
+        DoLog(log_info);
         return;
     }
 
     auto log_req = butil::get_object<LogRequest>();
     if (!log_req) {
         // Async log failed, fallback to sync log.
-        DoLog(log);
+        DoLog(log_info);
         return;
     }
-    log_req->data = std::move(log);
+    log_req->log_info = std::move(log_info);
     LogImpl(log_req);
 }
 
@@ -632,7 +677,7 @@ void AsyncLogger::Run() {
 void AsyncLogger::LogTask(LogRequest* req) {
     do {
         // req was logged, skip it.
-        if (req->next != NULL && req->data.empty()) {
+        if (req->next != NULL && req->log_info.content.empty()) {
             LogRequest* const saved_req = req;
             req = req->next;
             butil::return_object(saved_req);
@@ -642,13 +687,13 @@ void AsyncLogger::LogTask(LogRequest* req) {
         while (req->next != NULL) {
             LogRequest* const saved_req = req;
             req = req->next;
-            if (!saved_req->data.empty()) {
+            if (!saved_req->log_info.content.empty()) {
                 DoLog(saved_req);
             }
             // Release LogRequests until last request.
             butil::return_object(saved_req);
         }
-        if (!req->data.empty()) {
+        if (!req->log_info.content.empty()) {
             DoLog(req);
         }
 
@@ -700,13 +745,17 @@ bool AsyncLogger::IsLogComplete(LogRequest* old_head) {
 }
 
 void AsyncLogger::DoLog(LogRequest* req) {
-    DoLog(req->data);
-    req->data.clear();
+    DoLog(req->log_info);
+    req->log_info.content.clear();
 }
 
-void AsyncLogger::DoLog(const std::string& log) {
-    Log2File(log);
-    _log_request_count.fetch_sub(1);
+void AsyncLogger::DoLog(const LogInfo& log_info) {
+    if (log_info.raw) {
+        Log2File(LogInfo2LogStr(log_info));
+    } else {
+        Log2File(log_info.content);
+    }
+    _log_request_count.fetch_sub(1, butil::memory_order_relaxed);
 }
 
 LoggingSettings::LoggingSettings()
@@ -778,16 +827,10 @@ static void PrintLogSeverity(std::ostream& os, int 
severity) {
 }
 
 void PrintLogPrefix(std::ostream& os, int severity,
-                    const char* file, int line,
-                    const char* func) {
+                    butil::StringPiece file, int line,
+                    butil::StringPiece func, TimeVal tv) {
     PrintLogSeverity(os, severity);
-#if defined(OS_LINUX) || defined(OS_MACOSX)
-    timeval tv;
-    gettimeofday(&tv, NULL);
     time_t t = tv.tv_sec;
-#else
-    time_t t = time(NULL);
-#endif
     struct tm local_tm = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, NULL};
 #if _MSC_VER >= 1400
     localtime_s(&local_tm, &t);
@@ -822,7 +865,7 @@ void PrintLogPrefix(std::ostream& os, int severity,
         os << ' ' << hostname;
     }
     os << ' ' << file << ':' << line;
-    if (func && *func != '\0') {
+    if (!func.empty()) {
         os << " " << func;
     }
     os << "] ";
@@ -832,12 +875,13 @@ void PrintLogPrefix(std::ostream& os, int severity,
 
 void PrintLogPrefix(std::ostream& os, int severity,
                     const char* file, int line) {
-    PrintLogPrefix(os, severity, file, line, "");
+    PrintLogPrefix(os, severity, file, line, "", GetTimestamp());
 }
 
 static void PrintLogPrefixAsJSON(std::ostream& os, int severity,
-                                 const char* file, const char* func,
-                                 int line) {
+                                 butil::StringPiece file,
+                                 butil::StringPiece func,
+                                 int line, TimeVal tv) {
     // severity
     os << "\"L\":\"";
     if (severity < 0) {
@@ -849,13 +893,7 @@ static void PrintLogPrefixAsJSON(std::ostream& os, int 
severity,
     }
     // time
     os << "\",\"T\":\"";
-#if defined(OS_LINUX)
-    timeval tv;
-    gettimeofday(&tv, NULL);
     time_t t = tv.tv_sec;
-#else
-    time_t t = time(NULL);
-#endif
     struct tm local_tm = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, NULL};
 #if _MSC_VER >= 1400
     localtime_s(&local_tm, &t);
@@ -871,7 +909,7 @@ static void PrintLogPrefixAsJSON(std::ostream& os, int 
severity,
        << std::setw(2) << local_tm.tm_hour << ':'
        << std::setw(2) << local_tm.tm_min << ':'
        << std::setw(2) << local_tm.tm_sec;
-#if defined(OS_LINUX)
+#if defined(OS_LINUX) || defined(OS_MACOSX)
     os << '.' << std::setw(6) << tv.tv_usec;
 #endif
     os << "\",";
@@ -889,7 +927,7 @@ static void PrintLogPrefixAsJSON(std::ostream& os, int 
severity,
         os << "\"host\":\"" << hostname << "\",";
     }
     os << "\"C\":\"" << file << ':' << line;
-    if (func && *func != '\0') {
+    if (!func.empty()) {
         os << " " << func;
     }
     os << "\"";
@@ -922,11 +960,11 @@ inline void OutputLog(std::ostream& os, const 
butil::StringPiece& s) {
 void PrintLog(std::ostream& os, int severity, const char* file, int line,
               const char* func, const butil::StringPiece& content) {
     if (!FLAGS_log_as_json) {
-        PrintLogPrefix(os, severity, file, line, func);
+        PrintLogPrefix(os, severity, file, line, func, GetTimestamp());
         OutputLog(os, content);
     } else {
         os << '{';
-        PrintLogPrefixAsJSON(os, severity, file, func, line);
+        PrintLogPrefixAsJSON(os, severity, file, func, line, GetTimestamp());
         bool pair_quote = false;
         if (content.empty() || content[0] != '"') {
             // not a json, add a 'M' field
@@ -1246,33 +1284,39 @@ public:
     bool OnLogMessage(int severity, const char* file,
                       int line, const char* func,
                       const butil::StringPiece& content) override {
-        // There's a copy here to concatenate prefix and content. Since
-        // DefaultLogSink is hardly used right now, the copy is irrelevant.
-        // A LogSink focused on performance should also be able to handle
-        // non-continuous inputs which is a must to maximize performance.
-        std::ostringstream os;
-        PrintLog(os, severity, file, line, func, content);
-        os << '\n';
-        std::string log = os.str();
-        
-        if ((logging_destination & LOG_TO_SYSTEM_DEBUG_LOG) != 0) {
-            fwrite(log.data(), log.size(), 1, stderr);
-            fflush(stderr);
-        } else if (severity >= kAlwaysPrintErrorLevel) {
+        std::string log;
+        if ((logging_destination & LOG_TO_SYSTEM_DEBUG_LOG) != 0 ||
+            severity >= kAlwaysPrintErrorLevel) {
+            log = LogInfoToLogStr(severity, file, line, func, content);
             // When we're only outputting to a log file, above a certain log 
level, we
             // should still output to stderr so that we can better detect and 
diagnose
             // problems with unit tests, especially on the buildbots.
             fwrite(log.data(), log.size(), 1, stderr);
             fflush(stderr);
         }
-
         // write to log file
         if ((logging_destination & LOG_TO_FILE) != 0) {
             if ((FLAGS_crash_on_fatal_log && severity == BLOG_FATAL) ||
                 !FLAGS_async_log) {
+                if (log.empty()) {
+                    log = LogInfoToLogStr(severity, file, line, func, content);
+                }
                 Log2File(log);
             } else {
-                AsyncLogger::GetInstance()->Log(std::move(log));
+                LogInfo info;
+                if (log.empty()) {
+                    info.severity = severity;
+                    info.timestamp = GetTimestamp();
+                    info.file = file;
+                    info.func = func;
+                    info.line = line;
+                    info.content = content.as_string();
+                    info.raw = true;
+                } else {
+                    info.content = std::move(log);
+                    info.raw = false;
+                }
+                AsyncLogger::GetInstance()->Log(std::move(info));
             }
         }
         return true;
diff --git a/test/logging_unittest.cc b/test/logging_unittest.cc
index 66834b98..6072044b 100644
--- a/test/logging_unittest.cc
+++ b/test/logging_unittest.cc
@@ -17,6 +17,8 @@ DECLARE_bool(crash_on_fatal_log);
 DECLARE_int32(v);
 DECLARE_bool(log_func_name);
 DECLARE_bool(async_log);
+DECLARE_bool(async_log_in_background_always);
+DECLARE_int32(max_async_log_queue_size);
 
 namespace {
 
@@ -513,14 +515,14 @@ TEST_F(LoggingTest, async_log) {
         ASSERT_EQ(0, pthread_create(&threads[i], NULL, test_async_log, &log));
     }
 
-    sleep(5);
+    usleep(1000 * 500);
 
     g_stopped = true;
     for (int i = 0; i < thread_num; ++i) {
         pthread_join(threads[i], NULL);
     }
     // Wait for async log thread to flush all logs to file.
-    sleep(10);
+    sleep(15);
 
     std::ostringstream oss;
     std::string cmd = butil::string_printf("grep -c %s %s",
@@ -545,6 +547,8 @@ void* test_log(void* void_arg) {
     auto args = (PerfArgs*)void_arg;
     args->ready = true;
     butil::Timer t;
+    std::string log = *args->log;
+    int counter = 0;
     while (!g_stopped) {
         if (g_started) {
             break;
@@ -554,13 +558,14 @@ void* test_log(void* void_arg) {
     t.start();
     while (!g_stopped) {
         {
-            LOG(INFO) << *args->log;
+            LOG(INFO) << log;
             test_logging_count.fetch_add(1, butil::memory_order_relaxed);
         }
-        ++args->counter;
+        ++counter;
     }
     t.stop();
     args->elapse_ns = t.n_elapsed();
+    args->counter = counter;
     return NULL;
 }
 
@@ -588,11 +593,12 @@ void PerfTest(int thread_num, const std::string& log, 
bool async) {
         }
         usleep(1000);
     }
+    int sleep_s = 2;
     g_started = true;
     char prof_name[32];
     snprintf(prof_name, sizeof(prof_name), "logging_%d.prof", 
++g_prof_name_counter);
     ProfilerStart(prof_name);
-    sleep(5);
+    sleep(sleep_s);
     ProfilerStop();
     g_stopped = true;
     int64_t wait_time = 0;
@@ -606,31 +612,28 @@ void PerfTest(int thread_num, const std::string& log, 
bool async) {
               << " log_type=" << (async ? "async" : "sync")
               << " log_size=" << log.size()
               << " count=" << count
-              << " average_time=" << wait_time / (double)count
+              << " duration=" << sleep_s << "s"
+              << " qps=" << (int)(count / (double)sleep_s)
+              << " average_time=" << wait_time / (double)count << "us"
               << std::endl;
 }
 
 TEST_F(LoggingTest, performance) {
     bool saved_async_log = FLAGS_async_log;
+    FLAGS_max_async_log_queue_size =
+        std::numeric_limits<int32_t>::max();
+    FLAGS_async_log_in_background_always = true;
 
     LoggingSettings settings;
     settings.logging_dest = LOG_TO_FILE;
+    settings.delete_old = DELETE_OLD_LOG_FILE;
     InitLogging(settings);
-    std::string log(64, 'a');
-    int thread_num = 1;
-    PerfTest(thread_num, log, true);
-    sleep(10);
-    PerfTest(thread_num, log, false);
-
-    thread_num = 2;
-    PerfTest(thread_num, log, true);
-    sleep(10);
-    PerfTest(thread_num, log, false);
-
-    thread_num = 4;
-    PerfTest(thread_num, log, true);
+    std::string log(100, 'a');
+    PerfTest(1, log, false);
+    PerfTest(8, log, false);
+    PerfTest(1, log, true);
     sleep(10);
-    PerfTest(thread_num, log, false);
+    PerfTest(8, log, true);
 
     FLAGS_async_log = saved_async_log;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to