This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new bdc141c9 Opt performance of async log (#2602)
bdc141c9 is described below
commit bdc141c9e4f287f3b6e6397e09ea821838d273ee
Author: Bright Chen <[email protected]>
AuthorDate: Mon May 27 09:38:03 2024 +0800
Opt performance of async log (#2602)
* Opt performance of async log
* Fix ci
---
src/butil/logging.cc | 174 +++++++++++++++++++++++++++++------------------
test/logging_unittest.cc | 43 ++++++------
2 files changed, 132 insertions(+), 85 deletions(-)
diff --git a/src/butil/logging.cc b/src/butil/logging.cc
index 78436ec6..14c977bd 100644
--- a/src/butil/logging.cc
+++ b/src/butil/logging.cc
@@ -428,12 +428,9 @@ void Log2File(const std::string& log) {
if (InitializeLogFileHandle()) {
#if defined(OS_WIN)
SetFilePointer(log_file, 0, 0, SEEK_END);
- DWORD num_written;
- WriteFile(log_file,
- static_cast<const void*>(log.data()),
- static_cast<DWORD>(log.size()),
- &num_written,
- NULL);
+ DWORD num_written;
+ WriteFile(log_file, static_cast<const void*>(log.data()),
+ static_cast<DWORD>(log.size()), &num_written, NULL);
#else
fwrite(log.data(), log.size(), 1, log_file);
fflush(log_file);
@@ -443,11 +440,41 @@ void Log2File(const std::string& log) {
} // namespace
+#if defined(OS_LINUX) || defined(OS_MACOSX)
+typedef timeval TimeVal;
+#else
+struct TimeVal {
+ time_t tv_sec;
+};
+#endif
+
+TimeVal GetTimestamp() {
+#if defined(OS_LINUX) || defined(OS_MACOSX)
+ timeval tv;
+ gettimeofday(&tv, NULL);
+ return tv;
+#else
+ return { time(NULL) };
+#endif
+}
+
+struct BAIDU_CACHELINE_ALIGNMENT LogInfo {
+ std::string file;
+ std::string func;
+ std::string content;
+ TimeVal timestamp{};
+ int severity{0};
+ int line{0};
+ // If `raw' is false, content has been a complete log.
+ // If raw is true, a complete log consists of all properties of LogInfo.
+ bool raw{false};
+};
+
struct BAIDU_CACHELINE_ALIGNMENT LogRequest {
static LogRequest* const UNCONNECTED;
LogRequest* next{NULL};
- std::string data;
+ LogInfo log_info;
};
LogRequest* const LogRequest::UNCONNECTED = (LogRequest*)(intptr_t)-1;
@@ -456,8 +483,8 @@ class AsyncLogger : public butil::SimpleThread {
public:
static AsyncLogger* GetInstance();
- void Log(const std::string& log);
- void Log(std::string&& log);
+ void Log(const LogInfo& log_info);
+ void Log(LogInfo&& log_info);
void StopAndJoin();
private:
@@ -484,7 +511,7 @@ friend struct DefaultSingletonTraits<AsyncLogger>;
bool IsLogComplete(LogRequest* old_head);
void DoLog(LogRequest* req);
- void DoLog(const std::string& log);
+ void DoLog(const LogInfo& log_info);
butil::atomic<LogRequest*> _log_head;
butil::Mutex _mutex;
@@ -515,8 +542,26 @@ AsyncLogger::~AsyncLogger() {
StopAndJoin();
}
-void AsyncLogger::Log(const std::string& log) {
- if (log.empty()) {
+std::string LogInfoToLogStr(int severity, butil::StringPiece file,
+ int line, butil::StringPiece func,
+ butil::StringPiece content) {
+ // There's a copy here to concatenate prefix and content. Since
+ // DefaultLogSink is hardly used right now, the copy is irrelevant.
+ // A LogSink focused on performance should also be able to handle
+ // non-continuous inputs which is a must to maximize performance.
+ std::ostringstream os;
+ PrintLog(os, severity, file.data(), line, func.data(), content);
+ os << '\n';
+ return os.str();
+}
+
+std::string LogInfo2LogStr(const LogInfo& log_info) {
+ return LogInfoToLogStr(log_info.severity, log_info.file, log_info.line,
+ log_info.func, log_info.content);
+}
+
+void AsyncLogger::Log(const LogInfo& log_info) {
+ if (log_info.content.empty()) {
return;
}
@@ -525,22 +570,22 @@ void AsyncLogger::Log(const std::string& log) {
FLAGS_max_async_log_queue_size;
if (is_full || _stop.load(butil::memory_order_relaxed)) {
// Async logger is full or stopped, fallback to sync log.
- DoLog(log);
+ DoLog(log_info);
return;
}
auto log_req = butil::get_object<LogRequest>();
if (!log_req) {
// Async log failed, fallback to sync log.
- DoLog(log);
+ DoLog(log_info);
return;
}
- log_req->data = log;
+ log_req->log_info = log_info;
LogImpl(log_req);
}
-void AsyncLogger::Log(std::string&& log) {
- if (log.empty()) {
+void AsyncLogger::Log(LogInfo&& log_info) {
+ if (log_info.content.empty()) {
return;
}
@@ -549,17 +594,17 @@ void AsyncLogger::Log(std::string&& log) {
FLAGS_max_async_log_queue_size;
if (is_full || _stop.load(butil::memory_order_relaxed)) {
// Async logger is full or stopped, fallback to sync log.
- DoLog(log);
+ DoLog(log_info);
return;
}
auto log_req = butil::get_object<LogRequest>();
if (!log_req) {
// Async log failed, fallback to sync log.
- DoLog(log);
+ DoLog(log_info);
return;
}
- log_req->data = std::move(log);
+ log_req->log_info = std::move(log_info);
LogImpl(log_req);
}
@@ -632,7 +677,7 @@ void AsyncLogger::Run() {
void AsyncLogger::LogTask(LogRequest* req) {
do {
// req was logged, skip it.
- if (req->next != NULL && req->data.empty()) {
+ if (req->next != NULL && req->log_info.content.empty()) {
LogRequest* const saved_req = req;
req = req->next;
butil::return_object(saved_req);
@@ -642,13 +687,13 @@ void AsyncLogger::LogTask(LogRequest* req) {
while (req->next != NULL) {
LogRequest* const saved_req = req;
req = req->next;
- if (!saved_req->data.empty()) {
+ if (!saved_req->log_info.content.empty()) {
DoLog(saved_req);
}
// Release LogRequests until last request.
butil::return_object(saved_req);
}
- if (!req->data.empty()) {
+ if (!req->log_info.content.empty()) {
DoLog(req);
}
@@ -700,13 +745,17 @@ bool AsyncLogger::IsLogComplete(LogRequest* old_head) {
}
void AsyncLogger::DoLog(LogRequest* req) {
- DoLog(req->data);
- req->data.clear();
+ DoLog(req->log_info);
+ req->log_info.content.clear();
}
-void AsyncLogger::DoLog(const std::string& log) {
- Log2File(log);
- _log_request_count.fetch_sub(1);
+void AsyncLogger::DoLog(const LogInfo& log_info) {
+ if (log_info.raw) {
+ Log2File(LogInfo2LogStr(log_info));
+ } else {
+ Log2File(log_info.content);
+ }
+ _log_request_count.fetch_sub(1, butil::memory_order_relaxed);
}
LoggingSettings::LoggingSettings()
@@ -778,16 +827,10 @@ static void PrintLogSeverity(std::ostream& os, int
severity) {
}
void PrintLogPrefix(std::ostream& os, int severity,
- const char* file, int line,
- const char* func) {
+ butil::StringPiece file, int line,
+ butil::StringPiece func, TimeVal tv) {
PrintLogSeverity(os, severity);
-#if defined(OS_LINUX) || defined(OS_MACOSX)
- timeval tv;
- gettimeofday(&tv, NULL);
time_t t = tv.tv_sec;
-#else
- time_t t = time(NULL);
-#endif
struct tm local_tm = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, NULL};
#if _MSC_VER >= 1400
localtime_s(&local_tm, &t);
@@ -822,7 +865,7 @@ void PrintLogPrefix(std::ostream& os, int severity,
os << ' ' << hostname;
}
os << ' ' << file << ':' << line;
- if (func && *func != '\0') {
+ if (!func.empty()) {
os << " " << func;
}
os << "] ";
@@ -832,12 +875,13 @@ void PrintLogPrefix(std::ostream& os, int severity,
void PrintLogPrefix(std::ostream& os, int severity,
const char* file, int line) {
- PrintLogPrefix(os, severity, file, line, "");
+ PrintLogPrefix(os, severity, file, line, "", GetTimestamp());
}
static void PrintLogPrefixAsJSON(std::ostream& os, int severity,
- const char* file, const char* func,
- int line) {
+ butil::StringPiece file,
+ butil::StringPiece func,
+ int line, TimeVal tv) {
// severity
os << "\"L\":\"";
if (severity < 0) {
@@ -849,13 +893,7 @@ static void PrintLogPrefixAsJSON(std::ostream& os, int
severity,
}
// time
os << "\",\"T\":\"";
-#if defined(OS_LINUX)
- timeval tv;
- gettimeofday(&tv, NULL);
time_t t = tv.tv_sec;
-#else
- time_t t = time(NULL);
-#endif
struct tm local_tm = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, NULL};
#if _MSC_VER >= 1400
localtime_s(&local_tm, &t);
@@ -871,7 +909,7 @@ static void PrintLogPrefixAsJSON(std::ostream& os, int
severity,
<< std::setw(2) << local_tm.tm_hour << ':'
<< std::setw(2) << local_tm.tm_min << ':'
<< std::setw(2) << local_tm.tm_sec;
-#if defined(OS_LINUX)
+#if defined(OS_LINUX) || defined(OS_MACOSX)
os << '.' << std::setw(6) << tv.tv_usec;
#endif
os << "\",";
@@ -889,7 +927,7 @@ static void PrintLogPrefixAsJSON(std::ostream& os, int
severity,
os << "\"host\":\"" << hostname << "\",";
}
os << "\"C\":\"" << file << ':' << line;
- if (func && *func != '\0') {
+ if (!func.empty()) {
os << " " << func;
}
os << "\"";
@@ -922,11 +960,11 @@ inline void OutputLog(std::ostream& os, const
butil::StringPiece& s) {
void PrintLog(std::ostream& os, int severity, const char* file, int line,
const char* func, const butil::StringPiece& content) {
if (!FLAGS_log_as_json) {
- PrintLogPrefix(os, severity, file, line, func);
+ PrintLogPrefix(os, severity, file, line, func, GetTimestamp());
OutputLog(os, content);
} else {
os << '{';
- PrintLogPrefixAsJSON(os, severity, file, func, line);
+ PrintLogPrefixAsJSON(os, severity, file, func, line, GetTimestamp());
bool pair_quote = false;
if (content.empty() || content[0] != '"') {
// not a json, add a 'M' field
@@ -1246,33 +1284,39 @@ public:
bool OnLogMessage(int severity, const char* file,
int line, const char* func,
const butil::StringPiece& content) override {
- // There's a copy here to concatenate prefix and content. Since
- // DefaultLogSink is hardly used right now, the copy is irrelevant.
- // A LogSink focused on performance should also be able to handle
- // non-continuous inputs which is a must to maximize performance.
- std::ostringstream os;
- PrintLog(os, severity, file, line, func, content);
- os << '\n';
- std::string log = os.str();
-
- if ((logging_destination & LOG_TO_SYSTEM_DEBUG_LOG) != 0) {
- fwrite(log.data(), log.size(), 1, stderr);
- fflush(stderr);
- } else if (severity >= kAlwaysPrintErrorLevel) {
+ std::string log;
+ if ((logging_destination & LOG_TO_SYSTEM_DEBUG_LOG) != 0 ||
+ severity >= kAlwaysPrintErrorLevel) {
+ log = LogInfoToLogStr(severity, file, line, func, content);
// When we're only outputting to a log file, above a certain log
level, we
// should still output to stderr so that we can better detect and
diagnose
// problems with unit tests, especially on the buildbots.
fwrite(log.data(), log.size(), 1, stderr);
fflush(stderr);
}
-
// write to log file
if ((logging_destination & LOG_TO_FILE) != 0) {
if ((FLAGS_crash_on_fatal_log && severity == BLOG_FATAL) ||
!FLAGS_async_log) {
+ if (log.empty()) {
+ log = LogInfoToLogStr(severity, file, line, func, content);
+ }
Log2File(log);
} else {
- AsyncLogger::GetInstance()->Log(std::move(log));
+ LogInfo info;
+ if (log.empty()) {
+ info.severity = severity;
+ info.timestamp = GetTimestamp();
+ info.file = file;
+ info.func = func;
+ info.line = line;
+ info.content = content.as_string();
+ info.raw = true;
+ } else {
+ info.content = std::move(log);
+ info.raw = false;
+ }
+ AsyncLogger::GetInstance()->Log(std::move(info));
}
}
return true;
diff --git a/test/logging_unittest.cc b/test/logging_unittest.cc
index 66834b98..6072044b 100644
--- a/test/logging_unittest.cc
+++ b/test/logging_unittest.cc
@@ -17,6 +17,8 @@ DECLARE_bool(crash_on_fatal_log);
DECLARE_int32(v);
DECLARE_bool(log_func_name);
DECLARE_bool(async_log);
+DECLARE_bool(async_log_in_background_always);
+DECLARE_int32(max_async_log_queue_size);
namespace {
@@ -513,14 +515,14 @@ TEST_F(LoggingTest, async_log) {
ASSERT_EQ(0, pthread_create(&threads[i], NULL, test_async_log, &log));
}
- sleep(5);
+ usleep(1000 * 500);
g_stopped = true;
for (int i = 0; i < thread_num; ++i) {
pthread_join(threads[i], NULL);
}
// Wait for async log thread to flush all logs to file.
- sleep(10);
+ sleep(15);
std::ostringstream oss;
std::string cmd = butil::string_printf("grep -c %s %s",
@@ -545,6 +547,8 @@ void* test_log(void* void_arg) {
auto args = (PerfArgs*)void_arg;
args->ready = true;
butil::Timer t;
+ std::string log = *args->log;
+ int counter = 0;
while (!g_stopped) {
if (g_started) {
break;
@@ -554,13 +558,14 @@ void* test_log(void* void_arg) {
t.start();
while (!g_stopped) {
{
- LOG(INFO) << *args->log;
+ LOG(INFO) << log;
test_logging_count.fetch_add(1, butil::memory_order_relaxed);
}
- ++args->counter;
+ ++counter;
}
t.stop();
args->elapse_ns = t.n_elapsed();
+ args->counter = counter;
return NULL;
}
@@ -588,11 +593,12 @@ void PerfTest(int thread_num, const std::string& log,
bool async) {
}
usleep(1000);
}
+ int sleep_s = 2;
g_started = true;
char prof_name[32];
snprintf(prof_name, sizeof(prof_name), "logging_%d.prof",
++g_prof_name_counter);
ProfilerStart(prof_name);
- sleep(5);
+ sleep(sleep_s);
ProfilerStop();
g_stopped = true;
int64_t wait_time = 0;
@@ -606,31 +612,28 @@ void PerfTest(int thread_num, const std::string& log,
bool async) {
<< " log_type=" << (async ? "async" : "sync")
<< " log_size=" << log.size()
<< " count=" << count
- << " average_time=" << wait_time / (double)count
+ << " duration=" << sleep_s << "s"
+ << " qps=" << (int)(count / (double)sleep_s)
+ << " average_time=" << wait_time / (double)count << "us"
<< std::endl;
}
TEST_F(LoggingTest, performance) {
bool saved_async_log = FLAGS_async_log;
+ FLAGS_max_async_log_queue_size =
+ std::numeric_limits<int32_t>::max();
+ FLAGS_async_log_in_background_always = true;
LoggingSettings settings;
settings.logging_dest = LOG_TO_FILE;
+ settings.delete_old = DELETE_OLD_LOG_FILE;
InitLogging(settings);
- std::string log(64, 'a');
- int thread_num = 1;
- PerfTest(thread_num, log, true);
- sleep(10);
- PerfTest(thread_num, log, false);
-
- thread_num = 2;
- PerfTest(thread_num, log, true);
- sleep(10);
- PerfTest(thread_num, log, false);
-
- thread_num = 4;
- PerfTest(thread_num, log, true);
+ std::string log(100, 'a');
+ PerfTest(1, log, false);
+ PerfTest(8, log, false);
+ PerfTest(1, log, true);
sleep(10);
- PerfTest(thread_num, log, false);
+ PerfTest(8, log, true);
FLAGS_async_log = saved_async_log;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]