chenBright commented on code in PR #2413:
URL: https://github.com/apache/brpc/pull/2413#discussion_r1368919615


##########
src/butil/logging.cc:
##########
@@ -399,8 +404,253 @@ void CloseLogFileUnlocked() {
     log_file = NULL;
 }
 
+void Log2File(const std::string& log) {
+    // We can have multiple threads and/or processes, so try to prevent them
+    // from clobbering each other's writes.
+    // If the client app did not call InitLogging, and the lock has not
+    // been created do it now. We do this on demand, but if two threads try
+    // to do this at the same time, there will be a race condition to create
+    // the lock. This is why InitLogging should be called from the main
+    // thread at the beginning of execution.
+    LoggingLock::Init(LOCK_LOG_FILE, NULL);
+    LoggingLock logging_lock;
+    if (InitializeLogFileHandle()) {
+#if defined(OS_WIN)
+        SetFilePointer(log_file, 0, 0, SEEK_END);
+                DWORD num_written;
+                WriteFile(log_file,
+                          static_cast<const void*>(log.data()),
+                          static_cast<DWORD>(log.size()),
+                          &num_written,
+                          NULL);
+#else
+        fwrite(log.data(), log.size(), 1, log_file);
+        fflush(log_file);
+#endif
+    }
+}
+
 }  // namespace
 
+struct BAIDU_CACHELINE_ALIGNMENT LogRequest {
+    static LogRequest* const UNCONNECTED;
+
+    LogRequest* next{NULL};
+    std::string data;
+};
+
+LogRequest* const LogRequest::UNCONNECTED = (LogRequest*)(intptr_t)-1;
+
+class AsyncLog : public butil::SimpleThread {
+public:
+    static AsyncLog* GetInstance();
+
+    void Log(const std::string& log);
+    void Log(std::string&& log);
+
+private:
+friend struct DefaultSingletonTraits<AsyncLog>;
+
+    AsyncLog();
+    ~AsyncLog() override;
+
+    void LogImpl(LogRequest* log_req);
+
+    void Run() override;
+
+    void LogTask();
+
+    bool IsLogComplete(LogRequest* old_head,
+                       bool singular_node,
+                       LogRequest** new_tail);
+
+    butil::atomic<LogRequest*> _log_head;
+    butil::Mutex _mutex;
+    butil::ConditionVariable _cond;
+    LogRequest* _current_log_request;
+    bool _stop;
+};
+
+AsyncLog* AsyncLog::GetInstance() {
+    return Singleton<AsyncLog,
+                     LeakySingletonTraits<AsyncLog>>::get();
+}
+
+AsyncLog::AsyncLog()
+    : butil::SimpleThread("async_log_thread")
+    , _log_head(NULL)
+    , _cond(&_mutex)
+    , _current_log_request(NULL)
+    , _stop(false) {
+    Start();
+}
+
+AsyncLog::~AsyncLog() {
+    {
+        BAIDU_SCOPED_LOCK(_mutex);
+        _stop = true;
+        _cond.Signal();
+    }
+    Join();
+}
+
+void AsyncLog::Log(const std::string& log) {
+    if (log.empty()) {
+        return;
+    }
+
+    auto log_req = butil::get_object<LogRequest>();
+    if (!log_req) {
+        // Async log failed, fallback to sync log.
+        Log2File(log);
+        return;
+    }
+    log_req->data = log;
+    LogImpl(log_req);
+}
+
+void AsyncLog::Log(std::string&& log) {
+    if (log.empty()) {
+        return;
+    }
+
+    auto log_req = butil::get_object<LogRequest>();
+    if (!log_req) {
+        // Async log failed, fallback to sync log.
+        Log2File(log);
+        return;
+    }
+    log_req->data = std::move(log);
+    LogImpl(log_req);
+}
+
+void AsyncLog::LogImpl(LogRequest* log_req) {
+    log_req->next = LogRequest::UNCONNECTED;
+    LogRequest* const prev_head =
+        _log_head.exchange(log_req, butil::memory_order_release);
+    if (prev_head != NULL) {
+        log_req->next = prev_head;
+        return;
+    }
+    // We've got the right to write.
+    log_req->next = NULL;
+
+    BAIDU_SCOPED_LOCK(_mutex);
+    _current_log_request = log_req;
+    _cond.Signal();
+}
+
+void AsyncLog::Run() {
+    while (true) {
+        BAIDU_SCOPED_LOCK(_mutex);
+        while (!_stop && !_current_log_request) {
+            _cond.Wait();
+        }
+        if (_stop) {
+            break;
+        }
+
+        LogTask();
+        _current_log_request = NULL;
+    }
+}
+
+void AsyncLog::LogTask() {
+    LogRequest* req = _current_log_request;
+    LogRequest* cur_tail = NULL;
+    do {
+        // req was written, skip it.
+        if (req->next != NULL && req->data.empty()) {
+            LogRequest* const saved_req = req;
+            req = req->next;
+            butil::return_object(saved_req);
+        }
+
+        // Log all req to file.
+        for (LogRequest* p = req; p != NULL; p = p->next) {
+            if (p->data.empty()) {
+                continue;
+            }
+            Log2File(p->data);
+            p->data.clear();

Review Comment:
   done



##########
src/butil/logging.cc:
##########
@@ -399,8 +404,253 @@ void CloseLogFileUnlocked() {
     log_file = NULL;
 }
 
+void Log2File(const std::string& log) {
+    // We can have multiple threads and/or processes, so try to prevent them
+    // from clobbering each other's writes.
+    // If the client app did not call InitLogging, and the lock has not
+    // been created do it now. We do this on demand, but if two threads try
+    // to do this at the same time, there will be a race condition to create
+    // the lock. This is why InitLogging should be called from the main
+    // thread at the beginning of execution.
+    LoggingLock::Init(LOCK_LOG_FILE, NULL);
+    LoggingLock logging_lock;
+    if (InitializeLogFileHandle()) {
+#if defined(OS_WIN)
+        SetFilePointer(log_file, 0, 0, SEEK_END);
+                DWORD num_written;
+                WriteFile(log_file,
+                          static_cast<const void*>(log.data()),
+                          static_cast<DWORD>(log.size()),
+                          &num_written,
+                          NULL);
+#else
+        fwrite(log.data(), log.size(), 1, log_file);
+        fflush(log_file);
+#endif
+    }
+}
+
 }  // namespace
 
+struct BAIDU_CACHELINE_ALIGNMENT LogRequest {
+    static LogRequest* const UNCONNECTED;
+
+    LogRequest* next{NULL};
+    std::string data;
+};
+
+LogRequest* const LogRequest::UNCONNECTED = (LogRequest*)(intptr_t)-1;
+
+class AsyncLog : public butil::SimpleThread {
+public:
+    static AsyncLog* GetInstance();
+
+    void Log(const std::string& log);
+    void Log(std::string&& log);
+
+private:
+friend struct DefaultSingletonTraits<AsyncLog>;
+
+    AsyncLog();
+    ~AsyncLog() override;
+
+    void LogImpl(LogRequest* log_req);
+
+    void Run() override;
+
+    void LogTask();
+
+    bool IsLogComplete(LogRequest* old_head,
+                       bool singular_node,
+                       LogRequest** new_tail);
+
+    butil::atomic<LogRequest*> _log_head;
+    butil::Mutex _mutex;
+    butil::ConditionVariable _cond;
+    LogRequest* _current_log_request;
+    bool _stop;
+};
+
+AsyncLog* AsyncLog::GetInstance() {
+    return Singleton<AsyncLog,
+                     LeakySingletonTraits<AsyncLog>>::get();
+}
+
+AsyncLog::AsyncLog()
+    : butil::SimpleThread("async_log_thread")
+    , _log_head(NULL)
+    , _cond(&_mutex)
+    , _current_log_request(NULL)
+    , _stop(false) {
+    Start();
+}
+
+AsyncLog::~AsyncLog() {
+    {
+        BAIDU_SCOPED_LOCK(_mutex);
+        _stop = true;
+        _cond.Signal();
+    }
+    Join();
+}
+
+void AsyncLog::Log(const std::string& log) {
+    if (log.empty()) {
+        return;
+    }
+
+    auto log_req = butil::get_object<LogRequest>();
+    if (!log_req) {
+        // Async log failed, fallback to sync log.
+        Log2File(log);
+        return;
+    }
+    log_req->data = log;
+    LogImpl(log_req);
+}
+
+void AsyncLog::Log(std::string&& log) {
+    if (log.empty()) {
+        return;
+    }
+
+    auto log_req = butil::get_object<LogRequest>();
+    if (!log_req) {
+        // Async log failed, fallback to sync log.
+        Log2File(log);
+        return;
+    }
+    log_req->data = std::move(log);
+    LogImpl(log_req);
+}
+
+void AsyncLog::LogImpl(LogRequest* log_req) {
+    log_req->next = LogRequest::UNCONNECTED;
+    LogRequest* const prev_head =
+        _log_head.exchange(log_req, butil::memory_order_release);
+    if (prev_head != NULL) {
+        log_req->next = prev_head;
+        return;
+    }
+    // We've got the right to write.
+    log_req->next = NULL;
+
+    BAIDU_SCOPED_LOCK(_mutex);
+    _current_log_request = log_req;
+    _cond.Signal();
+}
+
+void AsyncLog::Run() {
+    while (true) {
+        BAIDU_SCOPED_LOCK(_mutex);
+        while (!_stop && !_current_log_request) {
+            _cond.Wait();
+        }
+        if (_stop) {
+            break;
+        }
+
+        LogTask();
+        _current_log_request = NULL;
+    }
+}
+
+void AsyncLog::LogTask() {
+    LogRequest* req = _current_log_request;
+    LogRequest* cur_tail = NULL;
+    do {
+        // req was written, skip it.
+        if (req->next != NULL && req->data.empty()) {
+            LogRequest* const saved_req = req;
+            req = req->next;
+            butil::return_object(saved_req);
+        }
+
+        // Log all req to file.
+        for (LogRequest* p = req; p != NULL; p = p->next) {
+            if (p->data.empty()) {
+                continue;
+            }
+            Log2File(p->data);
+            p->data.clear();
+        }
+
+        // Release WriteRequest until non-empty data or last request.
+        while (req->next != NULL && req->data.empty()) {
+            LogRequest* const saved_req = req;
+            req = req->next;
+            butil::return_object(saved_req);
+        }
+
+        if (NULL == cur_tail) {
+            for (cur_tail = req; cur_tail->next != NULL;
+                 cur_tail = cur_tail->next);
+        }
+        // Return when there's no more WriteRequests and req is completely
+        // written.
+        if (IsLogComplete(cur_tail, (req == cur_tail), &cur_tail)) {
+            if (cur_tail != req) {
+                fprintf(stderr, "cur_tail should equal to req\n");
+            }
+            butil::return_object(req);
+            return;
+        }
+    } while (true);
+}
+
+bool AsyncLog::IsLogComplete(LogRequest* old_head,
+                             bool singular_node,
+                             LogRequest** new_tail) {
+    if (old_head->next) {
+        fprintf(stderr, "old_head->next should be NULL\n");
+    }
+    LogRequest* new_head = old_head;
+    LogRequest* desired = NULL;
+    bool return_when_no_more = true;
+    if (!old_head->data.empty() || !singular_node) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to