This is an automated email from the ASF dual-hosted git repository. jamesge pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-brpc.git
commit 86acfa893d8f7776a7085701cf9ae5721434e2b6 Author: jamesge <jge...@gmail.com> AuthorDate: Thu Nov 26 14:11:20 2020 +0800 add Controller.SessionKV() to record and print session-level KV; Add LOGD/I/W/E/F to print contextual log; Add flag -log_as_json to print logs as valid JSON --- src/brpc/controller.cpp | 83 +++++++++++++++++++++++++- src/brpc/controller.h | 27 +++++++++ src/brpc/{session_log.h => kvmap.h} | 21 +++---- src/butil/logging.cc | 115 ++++++++++++++++++++++++++++++++---- src/butil/logging.h | 1 + test/brpc_controller_unittest.cpp | 55 +++++++++++++++++ 6 files changed, 278 insertions(+), 24 deletions(-) diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index ace380f..5c061ee 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -79,7 +79,9 @@ BAIDU_REGISTER_ERRNO(brpc::EITP, "Bad Itp response"); namespace brpc { -DEFINE_bool(graceful_quit_on_sigterm, false, "Register SIGTERM handle func to quit graceful"); +DEFINE_bool(graceful_quit_on_sigterm, false, + "Register SIGTERM handle func to quit graceful"); +DEFINE_string(request_id_header, "x-request-id", "The http header to mark a session"); const IdlNames idl_single_req_single_res = { "req", "res" }; const IdlNames idl_single_req_multi_res = { "req", "" }; @@ -128,6 +130,7 @@ Controller::Controller() { Controller::~Controller() { *g_ncontroller << -1; + FlushSessionKV(LOG_STREAM(INFO)); ResetNonPods(); } @@ -1485,4 +1488,82 @@ google::protobuf::Closure* DoNothing() { return butil::get_leaky_singleton<DoNothingClosure>(); } +KVMap& Controller::SessionKV() { + if (_session_kv == nullptr) { + _session_kv.reset(new KVMap); + } + return *_session_kv.get(); +} + +#define BRPC_SESSION_END_MSG "Session ends" +#define BRPC_REQ_ID "@rid" +#define BRPC_KV_SEP ":" + +void Controller::FlushSessionKV(std::ostream& os) { + if (_session_kv == nullptr || _session_kv->Count() == 0) { + return; + } + + const std::string* pRID = nullptr; + if (_http_request) { + pRID = _http_request->GetHeader(FLAGS_request_id_header); + } + + if (logging::FLAGS_log_as_json) { + os << "\"M\":\"" BRPC_SESSION_END_MSG "\""; + if (pRID) { + os << ",\"" BRPC_REQ_ID "\":\"" << *pRID << '"'; + } + for (auto it = _session_kv->Begin(); it != _session_kv->End(); ++it) { + os << ",\"" << it->first << "\":\"" << it->second << '"'; + } + } else { + os << BRPC_SESSION_END_MSG; + if (pRID) { + os << " " BRPC_REQ_ID BRPC_KV_SEP << *pRID; + } + for (auto it = _session_kv->Begin(); it != _session_kv->End(); ++it) { + os << ' ' << it->first << BRPC_KV_SEP << it->second; + } + } +} + +Controller::LogPostfixDummy::~LogPostfixDummy() { + *osptr << postfix; +} + +std::ostream& operator<<(std::ostream& os, const Controller::LogPostfixDummy& p) { + const_cast<brpc::Controller::LogPostfixDummy&>(p).osptr = &os; + if (logging::FLAGS_log_as_json) { + os << "\"M\":\""; + } + return os; +} + + +Controller::LogPostfixDummy Controller::LogPostfix() const { + Controller::LogPostfixDummy result; + std::string& p = result.postfix; + if (logging::FLAGS_log_as_json) { + p.push_back('"'); + } + const std::string* pRID = nullptr; + if (_http_request) { + pRID = _http_request->GetHeader(FLAGS_request_id_header); + if (pRID) { + if (logging::FLAGS_log_as_json) { + p.append(",\"" BRPC_REQ_ID "\":\""); + p.append(*pRID); + p.push_back('"'); + } else { + p.reserve(5 + pRID->size()); + p.append(" " BRPC_REQ_ID BRPC_KV_SEP); + p.append(*pRID); + } + + } + } + return result; +} + } // namespace brpc diff --git a/src/brpc/controller.h b/src/brpc/controller.h index 19a352e..fc39e52 100755 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -43,6 +43,7 @@ #include "brpc/progressive_attachment.h" // ProgressiveAttachment #include "brpc/progressive_reader.h" // ProgressiveReader #include "brpc/grpc.h" +#include "brpc/kvmap.h" // EAUTH is defined in MAC #ifndef EAUTH @@ -482,6 +483,19 @@ public: const butil::IOBuf& request_attachment() const { return _request_attachment; } const butil::IOBuf& response_attachment() const { return _response_attachment; } + // Get the object to write key/value which will be flushed into + // LOG(INFO) when this controller is deleted. + KVMap& SessionKV(); + + // Contextual prefixes for LOGD/LOGI/LOGW/LOGE/LOGF macros + struct LogPostfixDummy { + LogPostfixDummy() : osptr(nullptr) {} + ~LogPostfixDummy(); + std::string postfix; + std::ostream* osptr; + }; + LogPostfixDummy LogPostfix() const; + // Return true if the remote side creates a stream. bool has_remote_stream() { return _remote_stream_settings != NULL; } @@ -660,6 +674,9 @@ private: std::string& protocol_param() { return _thrift_method_name; } const std::string& protocol_param() const { return _thrift_method_name; } + // Flush this->SessionKV() into `os' + void FlushSessionKV(std::ostream& os); + private: // NOTE: align and group fields to make Controller as compact as possible. @@ -739,6 +756,8 @@ private: HttpHeader* _http_request; HttpHeader* _http_response; + std::unique_ptr<KVMap> _session_kv; + // Fields with large size but low access frequency butil::IOBuf _request_attachment; butil::IOBuf _response_attachment; @@ -787,7 +806,15 @@ bool IsAskedToQuit(); // Send Ctrl-C to current process. void AskToQuit(); +std::ostream& operator<<(std::ostream& os, const Controller::LogPostfixDummy& p); + } // namespace brpc +// Print contextual logs +#define LOGD(cntl) LOG(DEBUG) << (cntl)->LogPostfix() +#define LOGI(cntl) LOG(INFO) << (cntl)->LogPostfix() +#define LOGW(cntl) LOG(WARNING) << (cntl)->LogPostfix() +#define LOGE(cntl) LOG(ERROR) << (cntl)->LogPostfix() +#define LOGF(cntl) LOG(FATAL) << (cntl)->LogPostfix() #endif // BRPC_CONTROLLER_H diff --git a/src/brpc/session_log.h b/src/brpc/kvmap.h similarity index 85% rename from src/brpc/session_log.h rename to src/brpc/kvmap.h index 049232a..1fd7041 100644 --- a/src/brpc/session_log.h +++ b/src/brpc/kvmap.h @@ -15,28 +15,23 @@ // specific language governing permissions and limitations // under the License. -#ifndef BRPC_SESSION_LOG_H -#define BRPC_SESSION_LOG_H +#ifndef BRPC_KVMAP_H +#define BRPC_KVMAP_H #include "butil/containers/flat_map.h" namespace brpc { -class SessionLog { +// Remember Key/Values in string +class KVMap { public: - class Formatter { - public: - virtual ~Formatter() {} - virtual void Print(std::ostream&, const SessionLog&) = 0; - }; - typedef butil::FlatMap<std::string, std::string> Map; typedef Map::const_iterator Iterator; - SessionLog() {} + KVMap() {} - // Exchange internal fields with another SessionLog. - void Swap(SessionLog &rhs) { _entries.swap(rhs._entries); } + // Exchange internal fields with another KVMap. + void Swap(KVMap &rhs) { _entries.swap(rhs._entries); } // Reset internal fields as if they're just default-constructed. void Clear() { _entries.clear(); } @@ -77,4 +72,4 @@ private: } // namespace brpc -#endif // BRPC_SESSION_LOG_H +#endif // BRPC_KVMAP_H diff --git a/src/butil/logging.cc b/src/butil/logging.cc index 39b2a37..26e6259 100644 --- a/src/butil/logging.cc +++ b/src/butil/logging.cc @@ -127,7 +127,7 @@ DEFINE_string(vmodule, "", "per-module verbose level." " (that is, name ignoring .cpp/.h)." " LOG_LEVEL overrides any value given by --v."); -DEFINE_bool(log_process_id, false, "Log process id"); +DEFINE_bool(log_pid, false, "Log process id"); DEFINE_int32(minloglevel, 0, "Any log at or above this level will be " "displayed. Anything below this level will be silently ignored. " @@ -139,6 +139,8 @@ DEFINE_bool(log_hostname, false, "Add host after pid in each log so" DEFINE_bool(log_year, false, "Log year in datetime part in each log"); +DEFINE_bool(log_as_json, false, "Print log as a valid JSON"); + namespace { LoggingDestination logging_destination = LOG_DEFAULT; @@ -453,7 +455,7 @@ void SetLogAssertHandler(LogAssertHandler handler) { const char* const log_severity_names[LOG_NUM_SEVERITIES] = { "INFO", "NOTICE", "WARNING", "ERROR", "FATAL" }; -inline void log_severity_name(std::ostream& os, int severity) { +static void PrintLogSeverity(std::ostream& os, int severity) { if (severity < 0) { // Add extra space to separate from following datetime. os << 'V' << -severity << ' '; @@ -464,9 +466,9 @@ inline void log_severity_name(std::ostream& os, int severity) { } } -void print_log_prefix(std::ostream& os, - int severity, const char* file, int line) { - log_severity_name(os, severity); +static void PrintLogPrefix( + std::ostream& os, int severity, const char* file, int line) { + PrintLogSeverity(os, severity); #if defined(OS_LINUX) timeval tv; gettimeofday(&tv, NULL); @@ -492,7 +494,7 @@ void print_log_prefix(std::ostream& os, #if defined(OS_LINUX) os << '.' << std::setw(6) << tv.tv_usec; #endif - if (FLAGS_log_process_id) { + if (FLAGS_log_pid) { os << ' ' << std::setfill(' ') << std::setw(5) << CurrentProcessId(); } os << ' ' << std::setfill(' ') << std::setw(5) @@ -508,6 +510,62 @@ void print_log_prefix(std::ostream& os, os.fill(prev_fill); } +static void PrintLogPrefixAsJSON( + std::ostream& os, int severity, const char* file, int line) { + // severity + os << "\"L\":\""; + if (severity < 0) { + os << 'V' << -severity; + } else if (severity < LOG_NUM_SEVERITIES) { + os << log_severity_names[severity][0]; + } else { + os << 'U'; + } + // time + os << "\",\"T\":\""; +#if defined(OS_LINUX) + timeval tv; + gettimeofday(&tv, NULL); + time_t t = tv.tv_sec; +#else + time_t t = time(NULL); +#endif + struct tm local_tm = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, NULL}; +#if _MSC_VER >= 1400 + localtime_s(&local_tm, &t); +#else + localtime_r(&t, &local_tm); +#endif + const char prev_fill = os.fill('0'); + if (FLAGS_log_year) { + os << std::setw(4) << local_tm.tm_year + 1900; + } + os << std::setw(2) << local_tm.tm_mon + 1 + << std::setw(2) << local_tm.tm_mday << ' ' + << std::setw(2) << local_tm.tm_hour << ':' + << std::setw(2) << local_tm.tm_min << ':' + << std::setw(2) << local_tm.tm_sec; +#if defined(OS_LINUX) + os << '.' << std::setw(6) << tv.tv_usec; +#endif + os << "\","; + os.fill(prev_fill); + + if (FLAGS_log_pid) { + os << "\"pid\":\"" << CurrentProcessId() << "\","; + } + os << "\"tid\":\"" << butil::PlatformThread::CurrentId() << "\","; + if (FLAGS_log_hostname) { + butil::StringPiece hostname(butil::my_hostname()); + if (hostname.ends_with(".baidu.com")) { // make it shorter + hostname.remove_suffix(10); + } + os << "\"host\":\"" << hostname << "\","; + } + os << "\"C\":\"" << file << ':' << line << "\""; +} + + // A log message handler that gets notified of every log message we process. class DoublyBufferedLogSink : public butil::DoublyBufferedData<LogSink*> { public: @@ -612,13 +670,32 @@ void DisplayDebugMessageInDialog(const std::string& str) { bool StringSink::OnLogMessage(int severity, const char* file, int line, const butil::StringPiece& content) { std::ostringstream prefix_os; - print_log_prefix(prefix_os, severity, file, line); + bool pair_quote = false; + if (FLAGS_log_as_json) { + prefix_os << '{'; + PrintLogPrefixAsJSON(prefix_os, severity, file, line); + if (content.empty() || content[0] != '"') { + // not a json, add 'M' field + prefix_os << ",\"M\":\""; + pair_quote = true; + } else { + prefix_os << ','; + } + } else { + PrintLogPrefix(prefix_os, severity, file, line); + } const std::string prefix = prefix_os.str(); { butil::AutoLock lock_guard(_lock); reserve(size() + prefix.size() + content.size()); append(prefix); append(content.data(), content.size()); + if (FLAGS_log_as_json) { + if (pair_quote) { + push_back('"'); + } + push_back('}'); + } } return true; } @@ -772,9 +849,27 @@ public: // A LogSink focused on performance should also be able to handle // non-continuous inputs which is a must to maximize performance. std::ostringstream os; - print_log_prefix(os, severity, file, line); - os.write(content.data(), content.size()); - os << '\n'; + if (!FLAGS_log_as_json) { + PrintLogPrefix(os, severity, file, line); + os.write(content.data(), content.size()); + os << '\n'; + } else { + os << '{'; + PrintLogPrefixAsJSON(os, severity, file, line); + bool pair_quote = false; + if (content.empty() || content[0] != '"') { + // not a json, add a 'M' field + os << ",\"M\":\""; + pair_quote = true; + } else { + os << ','; + } + os.write(content.data(), content.size()); + if (pair_quote) { + os << '"'; + } + os << "}\n"; + } std::string log = os.str(); if ((logging_destination & LOG_TO_SYSTEM_DEBUG_LOG) != 0) { diff --git a/src/butil/logging.h b/src/butil/logging.h index f92cd78..20754b0 100644 --- a/src/butil/logging.h +++ b/src/butil/logging.h @@ -409,6 +409,7 @@ const LogSeverity BLOG_0 = BLOG_ERROR; #define VLOG_IS_ON(verbose_level) BAIDU_VLOG_IS_ON(verbose_level, __FILE__) DECLARE_int32(v); +DECLARE_bool(log_as_json); extern const int VLOG_UNINITIALIZED; diff --git a/test/brpc_controller_unittest.cpp b/test/brpc_controller_unittest.cpp index ea628ef..bf79b97 100644 --- a/test/brpc_controller_unittest.cpp +++ b/test/brpc_controller_unittest.cpp @@ -21,6 +21,7 @@ #include <gtest/gtest.h> #include <google/protobuf/stubs/common.h> +#include "butil/logging.h" #include "butil/time.h" #include "butil/macros.h" #include "brpc/socket.h" @@ -72,3 +73,57 @@ TEST_F(ControllerTest, notify_on_destruction) { delete cntl; ASSERT_TRUE(cancel); } + +/* +class MyFormatter : public brpc::SessionLog::Formatter { + void Print(std::ostream& os, const brpc::SessionLog& log) override { + for (auto it = log.Begin(); it != log.End(); ++it) { + os << '"' << it->first << "\":\"" << it->second << "\","; + } + } +}; +*/ + +static bool endsWith(const std::string& s1, const butil::StringPiece& s2) { + if (s1.size() < s2.size()) { + return false; + } + return memcmp(s1.data() + s1.size() - s2.size(), s2.data(), s2.size()) == 0; +} +static bool startsWith(const std::string& s1, const butil::StringPiece& s2) { + if (s1.size() < s2.size()) { + return false; + } + return memcmp(s1.data(), s2.data(), s2.size()) == 0; +} + +TEST_F(ControllerTest, SessionKV) { + logging::FLAGS_log_as_json = false; + logging::StringSink sink1; + auto oldSink = logging::SetLogSink(&sink1); + //brpc::SetGlobalSessionLogFormatter(new MyFormatter); + { + brpc::Controller cntl; + cntl.set_log_id(123); // not working now + cntl.SessionKV().Set("Apple", 1); + cntl.SessionKV().Set("Baidu", "22"); + cntl.SessionKV().Set("Cisco", 33.3); + + LOGW(&cntl) << "My WARNING Log"; + ASSERT_TRUE(endsWith(sink1, "] My WARNING Log")) << sink1; + ASSERT_TRUE(startsWith(sink1, "W")) << sink1; + sink1.clear(); + + cntl.http_request().SetHeader("x-request-id", "abcdEFG-456"); + LOGE(&cntl) << "My ERROR Log"; + ASSERT_TRUE(endsWith(sink1, "] My ERROR Log @rid:abcdEFG-456")) << sink1; + ASSERT_TRUE(startsWith(sink1, "E")) << sink1; + sink1.clear(); + + logging::FLAGS_log_as_json = true; + } + ASSERT_TRUE(endsWith(sink1, R"(,"M":"Session ends","@rid":"abcdEFG-456","Baidu":"22","Cisco":"33.300000","Apple":"1"})")) << sink1; + ASSERT_TRUE(startsWith(sink1, R"({"L":"I",)")) << sink1; + + logging::SetLogSink(oldSink); +} --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org