This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 24fc31ea span for new bthread (#2519)
24fc31ea is described below
commit 24fc31eaa56d8b522975ca41ed012faf6cf77438
Author: Yang,Liming <[email protected]>
AuthorDate: Mon Feb 26 10:46:43 2024 +0800
span for new bthread (#2519)
---
example/rpcz_echo_c++/CMakeLists.txt | 141 +++++++++++++++++++++++++++++
example/rpcz_echo_c++/client.cpp | 96 ++++++++++++++++++++
example/rpcz_echo_c++/echo.proto | 33 +++++++
example/rpcz_echo_c++/server.cpp | 171 +++++++++++++++++++++++++++++++++++
src/brpc/builtin/rpcz_service.cpp | 19 +++-
src/brpc/global.cpp | 8 ++
src/brpc/span.cpp | 80 +++++++++++++---
src/brpc/span.h | 14 ++-
src/brpc/span.proto | 1 +
src/bthread/bthread.cpp | 9 ++
src/bthread/task_group.cpp | 13 ++-
src/bthread/unstable.h | 3 +
test/brpc_alpn_protocol_unittest.cpp | 2 +-
test/bthread_unittest.cpp | 51 ++++++++++-
14 files changed, 616 insertions(+), 25 deletions(-)
diff --git a/example/rpcz_echo_c++/CMakeLists.txt
b/example/rpcz_echo_c++/CMakeLists.txt
new file mode 100644
index 00000000..53c16690
--- /dev/null
+++ b/example/rpcz_echo_c++/CMakeLists.txt
@@ -0,0 +1,141 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+cmake_minimum_required(VERSION 2.8.10)
+project(rpcz_echo_c++ C CXX)
+
+option(LINK_SO "Whether examples are linked dynamically" OFF)
+
+execute_process(
+ COMMAND bash -c "find ${PROJECT_SOURCE_DIR}/../.. -type d -regex
\".*output/include$\" | head -n1 | xargs dirname | tr -d '\n'"
+ OUTPUT_VARIABLE OUTPUT_PATH
+)
+
+set(CMAKE_PREFIX_PATH ${OUTPUT_PATH})
+
+include(FindThreads)
+include(FindProtobuf)
+protobuf_generate_cpp(PROTO_SRC PROTO_HEADER echo.proto)
+# include PROTO_HEADER
+include_directories(${CMAKE_CURRENT_BINARY_DIR})
+
+# Search for libthrift* by best effort. If it is not found and brpc is
+# compiled with thrift protocol enabled, a link error would be reported.
+find_library(THRIFT_LIB NAMES thrift)
+if (NOT THRIFT_LIB)
+ set(THRIFT_LIB "")
+endif()
+
+find_path(BRPC_INCLUDE_PATH NAMES brpc/server.h)
+if(LINK_SO)
+ find_library(BRPC_LIB NAMES brpc)
+else()
+ find_library(BRPC_LIB NAMES libbrpc.a brpc)
+endif()
+if((NOT BRPC_INCLUDE_PATH) OR (NOT BRPC_LIB))
+ message(FATAL_ERROR "Fail to find brpc")
+endif()
+include_directories(${BRPC_INCLUDE_PATH})
+
+find_path(GFLAGS_INCLUDE_PATH gflags/gflags.h)
+find_library(GFLAGS_LIBRARY NAMES gflags libgflags)
+if((NOT GFLAGS_INCLUDE_PATH) OR (NOT GFLAGS_LIBRARY))
+ message(FATAL_ERROR "Fail to find gflags")
+endif()
+include_directories(${GFLAGS_INCLUDE_PATH})
+
+execute_process(
+ COMMAND bash -c "grep \"namespace [_A-Za-z0-9]\\+ {\"
${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $2}' |
tr -d '\n'"
+ OUTPUT_VARIABLE GFLAGS_NS
+)
+if(${GFLAGS_NS} STREQUAL "GFLAGS_NAMESPACE")
+ execute_process(
+ COMMAND bash -c "grep \"#define GFLAGS_NAMESPACE [_A-Za-z0-9]\\+\"
${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $3}' |
tr -d '\n'"
+ OUTPUT_VARIABLE GFLAGS_NS
+ )
+endif()
+if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
+ include(CheckFunctionExists)
+ CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME)
+ if(NOT HAVE_CLOCK_GETTIME)
+ set(DEFINE_CLOCK_GETTIME "-DNO_CLOCK_GETTIME_IN_MAC")
+ endif()
+endif()
+
+set(CMAKE_CPP_FLAGS "${DEFINE_CLOCK_GETTIME} -DGFLAGS_NS=${GFLAGS_NS}")
+set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -DNDEBUG -O2 -D__const__=__unused__
-pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer")
+
+if(CMAKE_VERSION VERSION_LESS "3.1.3")
+ if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
+ endif()
+ if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
+ endif()
+else()
+ set(CMAKE_CXX_STANDARD 11)
+ set(CMAKE_CXX_STANDARD_REQUIRED ON)
+endif()
+
+find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h)
+find_library(LEVELDB_LIB NAMES leveldb)
+if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB))
+ message(FATAL_ERROR "Fail to find leveldb")
+endif()
+include_directories(${LEVELDB_INCLUDE_PATH})
+
+if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
+ set(OPENSSL_ROOT_DIR
+ "/usr/local/opt/openssl" # Homebrew installed OpenSSL
+ )
+endif()
+
+find_package(OpenSSL)
+include_directories(${OPENSSL_INCLUDE_DIR})
+
+set(DYNAMIC_LIB
+ ${CMAKE_THREAD_LIBS_INIT}
+ ${GFLAGS_LIBRARY}
+ ${PROTOBUF_LIBRARIES}
+ ${LEVELDB_LIB}
+ ${OPENSSL_CRYPTO_LIBRARY}
+ ${OPENSSL_SSL_LIBRARY}
+ ${THRIFT_LIB}
+ dl
+ )
+
+if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
+ set(DYNAMIC_LIB ${DYNAMIC_LIB}
+ pthread
+ "-framework CoreFoundation"
+ "-framework CoreGraphics"
+ "-framework CoreData"
+ "-framework CoreText"
+ "-framework Security"
+ "-framework Foundation"
+ "-Wl,-U,_MallocExtension_ReleaseFreeMemory"
+ "-Wl,-U,_ProfilerStart"
+ "-Wl,-U,_ProfilerStop"
+ "-Wl,-U,__Z13GetStackTracePPvii")
+endif()
+
+add_executable(rpcz_echo_client client.cpp ${PROTO_SRC} ${PROTO_HEADER})
+add_executable(rpcz_echo_server server.cpp ${PROTO_SRC} ${PROTO_HEADER})
+
+target_link_libraries(rpcz_echo_client ${BRPC_LIB} ${DYNAMIC_LIB})
+target_link_libraries(rpcz_echo_server ${BRPC_LIB} ${DYNAMIC_LIB})
+
diff --git a/example/rpcz_echo_c++/client.cpp b/example/rpcz_echo_c++/client.cpp
new file mode 100644
index 00000000..ad80de0f
--- /dev/null
+++ b/example/rpcz_echo_c++/client.cpp
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// A client sending requests to server every 1 second.
+
+#include <gflags/gflags.h>
+#include <butil/logging.h>
+#include <butil/time.h>
+#include <brpc/channel.h>
+#include "echo.pb.h"
+
+namespace brpc {
+DECLARE_bool(enable_rpcz);
+}
+DEFINE_string(attachment, "", "Carry this along with requests");
+DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in
src/brpc/options.proto");
+DEFINE_string(connection_type, "", "Connection type. Available values: single,
pooled, short");
+DEFINE_string(server, "0.0.0.0:8000", "IP Address of server");
+DEFINE_string(load_balancer, "", "The algorithm for load balancing");
+DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
+DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
+DEFINE_int32(interval_ms, 1000, "Milliseconds between consecutive requests");
+
+int main(int argc, char* argv[]) {
+ // Parse gflags. We recommend you to use gflags as well.
+ GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
+
+ // brpc::FLAGS_enable_rpcz = true;
+ // A Channel represents a communication line to a Server. Notice that
+ // Channel is thread-safe and can be shared by all threads in your program.
+ brpc::Channel channel;
+
+ // Initialize the channel, NULL means using default options.
+ brpc::ChannelOptions options;
+ options.protocol = FLAGS_protocol;
+ options.connection_type = FLAGS_connection_type;
+ options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
+ options.max_retry = FLAGS_max_retry;
+ if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(),
&options) != 0) {
+ LOG(ERROR) << "Fail to initialize channel";
+ return -1;
+ }
+
+ // Normally, you should not call a Channel directly, but instead construct
+ // a stub Service wrapping it. stub can be shared by all threads as well.
+ example::EchoService_Stub stub(&channel);
+
+ // Send a request and wait for the response every 1 second.
+ int log_id = 0;
+ while (!brpc::IsAskedToQuit()) {
+ // We will receive response synchronously, safe to put variables
+ // on stack.
+ example::EchoRequest request;
+ example::EchoResponse response;
+ brpc::Controller cntl;
+
+ request.set_message("hello world");
+
+ cntl.set_log_id(log_id ++); // set by user
+ // Set attachment which is wired to network directly instead of
+ // being serialized into protobuf messages.
+ cntl.request_attachment().append(FLAGS_attachment);
+
+ // Because `done'(last parameter) is NULL, this function waits until
+ // the response comes back or error occurs(including timedout).
+ stub.Echo(&cntl, &request, &response, NULL);
+ if (!cntl.Failed()) {
+ LOG(INFO) << "Received response from " << cntl.remote_side()
+ << " to " << cntl.local_side()
+ << ": " << response.message() << " (attached="
+ << cntl.response_attachment() << ")"
+ << " latency=" << cntl.latency_us() << "us";
+ } else {
+ LOG(WARNING) << cntl.ErrorText();
+ }
+ usleep(FLAGS_interval_ms * 1000L);
+ }
+
+ LOG(INFO) << "EchoClient is going to quit";
+ return 0;
+}
+
diff --git a/example/rpcz_echo_c++/echo.proto b/example/rpcz_echo_c++/echo.proto
new file mode 100644
index 00000000..2b39627f
--- /dev/null
+++ b/example/rpcz_echo_c++/echo.proto
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax="proto2";
+package example;
+
+option cc_generic_services = true;
+
+message EchoRequest {
+ required string message = 1;
+};
+
+message EchoResponse {
+ required string message = 1;
+};
+
+service EchoService {
+ rpc Echo(EchoRequest) returns (EchoResponse);
+};
diff --git a/example/rpcz_echo_c++/server.cpp b/example/rpcz_echo_c++/server.cpp
new file mode 100644
index 00000000..0f5f7f26
--- /dev/null
+++ b/example/rpcz_echo_c++/server.cpp
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// A server to receive EchoRequest and send back EchoResponse.
+
+#include <brpc/channel.h>
+#include <brpc/server.h>
+#include <butil/logging.h>
+#include <gflags/gflags.h>
+#include <json2pb/pb_to_json.h>
+#include "echo.pb.h"
+
+DEFINE_bool(echo_attachment, true, "Echo attachment as well");
+DEFINE_int32(port, 8000, "TCP Port of this server");
+DEFINE_string(listen_addr, "",
+ "Server listen address, may be IPV4/IPV6/UDS."
+ " If this is set, the flag port will be ignored");
+DEFINE_int32(idle_timeout_s, -1,
+ "Connection will be closed if there is no "
+ "read/write operations during the last `idle_timeout_s'");
+DEFINE_string(server, "0.0.0.0:8001", "IP Address of server");
+DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in
src/brpc/options.proto");
+DEFINE_string(connection_type, "", "Connection type. Available values: single,
pooled, short");
+DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
+DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
+// Your implementation of example::EchoService
+// Notice that implementing brpc::Describable grants the ability to put
+// additional information in /status.
+namespace example {
+
+static const bthread_attr_t BTHREAD_ATTR_NORMAL_WITH_SPAN = {
+ BTHREAD_STACKTYPE_NORMAL, BTHREAD_INHERIT_SPAN, NULL, BTHREAD_TAG_INVALID};
+
+void* RunThreadFunc(void*) {
+ TRACEPRINTF("RunThreadFunc %lu", bthread_self());
+ // brpc::FLAGS_enable_rpcz = true;
+ // A Channel represents a communication line to a Server. Notice that
+ // Channel is thread-safe and can be shared by all threads in your program.
+ brpc::Channel channel;
+ // Initialize the channel, NULL means using default options.
+ brpc::ChannelOptions options;
+ options.protocol = FLAGS_protocol;
+ options.connection_type = FLAGS_connection_type;
+ options.timeout_ms = FLAGS_timeout_ms /*milliseconds*/;
+ options.max_retry = FLAGS_max_retry;
+ if (channel.Init(FLAGS_server.c_str(), "", &options) != 0) {
+ LOG(ERROR) << "Fail to initialize channel";
+ return nullptr;
+ }
+ example::EchoService_Stub stub(&channel);
+ // We will receive response synchronously, safe to put variables
+ // on stack.
+ example::EchoRequest request;
+ example::EchoResponse response;
+ brpc::Controller cntl;
+ request.set_message("hello world");
+
+ // Because `done'(last parameter) is NULL, this function waits until
+ // the response comes back or error occurs(including timedout).
+ stub.Echo(&cntl, &request, &response, NULL);
+ if (!cntl.Failed()) {
+ LOG(INFO) << "Received response from " << cntl.remote_side() << " to "
<< cntl.local_side()
+ << ": " << response.message() << " (attached=" <<
cntl.response_attachment()
+ << ")"
+ << " latency=" << cntl.latency_us() << "us";
+ } else {
+ LOG(WARNING) << cntl.ErrorText();
+ }
+
+ return nullptr;
+}
+
+class EchoServiceImpl : public EchoService {
+public:
+ EchoServiceImpl() {}
+ virtual ~EchoServiceImpl() {}
+ virtual void Echo(google::protobuf::RpcController* cntl_base, const
EchoRequest* request,
+ EchoResponse* response, google::protobuf::Closure* done)
{
+ bthread_list_t list;
+ bthread_list_init(&list, 0, 0);
+ for (int i = 0; i < 2; ++i) {
+ bthread_t tid;
+ bthread_start_background(&tid, &BTHREAD_ATTR_NORMAL_WITH_SPAN,
RunThreadFunc, nullptr);
+ bthread_list_add(&list, tid);
+ }
+ bthread_list_join(&list);
+
+ TRACEPRINTF("Handle request");
+
+ // This object helps you to call done->Run() in RAII style. If you need
+ // to process the request asynchronously, pass done_guard.release().
+ brpc::ClosureGuard done_guard(done);
+
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+
+ // The purpose of following logs is to help you to understand
+ // how clients interact with servers more intuitively. You should
+ // remove these logs in performance-sensitive servers.
+ LOG(INFO) << "Received request[log_id=" << cntl->log_id() << "] from "
+ << cntl->remote_side() << " to " << cntl->local_side() << ":
"
+ << request->message() << " (attached=" <<
cntl->request_attachment() << ")";
+
+ // Fill response.
+ response->set_message(request->message());
+
+ // You can compress the response by setting Controller, but be aware
+ // that compression may be costly, evaluate before turning on.
+ // cntl->set_response_compress_type(brpc::COMPRESS_TYPE_GZIP);
+
+ if (FLAGS_echo_attachment) {
+ // Set attachment which is wired to network directly instead of
+ // being serialized into protobuf messages.
+ cntl->response_attachment().append(cntl->request_attachment());
+ }
+ }
+};
+} // namespace example
+
+int main(int argc, char* argv[]) {
+ // Parse gflags. We recommend you to use gflags as well.
+ GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
+
+ // Generally you only need one Server.
+ brpc::Server server;
+
+ // Instance of your service.
+ example::EchoServiceImpl echo_service_impl;
+
+ // Add the service into server. Notice the second parameter, because the
+ // service is put on stack, we don't want server to delete it, otherwise
+ // use brpc::SERVER_OWNS_SERVICE.
+ if (server.AddService(&echo_service_impl, brpc::SERVER_DOESNT_OWN_SERVICE)
!= 0) {
+ LOG(ERROR) << "Fail to add service";
+ return -1;
+ }
+
+ butil::EndPoint point;
+ if (!FLAGS_listen_addr.empty()) {
+ if (butil::str2endpoint(FLAGS_listen_addr.c_str(), &point) < 0) {
+ LOG(ERROR) << "Invalid listen address:" << FLAGS_listen_addr;
+ return -1;
+ }
+ } else {
+ point = butil::EndPoint(butil::IP_ANY, FLAGS_port);
+ }
+ // Start the server.
+ brpc::ServerOptions options;
+ options.idle_timeout_sec = FLAGS_idle_timeout_s;
+ if (server.Start(point, &options) != 0) {
+ LOG(ERROR) << "Fail to start EchoServer";
+ return -1;
+ }
+
+ // Wait until Ctrl-C is pressed, then Stop() and Join() the server.
+ server.RunUntilAskedToQuit();
+ return 0;
+}
diff --git a/src/brpc/builtin/rpcz_service.cpp
b/src/brpc/builtin/rpcz_service.cpp
index 3625c068..b9e05637 100644
--- a/src/brpc/builtin/rpcz_service.cpp
+++ b/src/brpc/builtin/rpcz_service.cpp
@@ -309,6 +309,17 @@ static void PrintClientSpan(std::ostream& os,const
RpczSpan& span,
PrintClientSpan(os, span, &last_time, NULL, use_html);
}
+static void PrintBthreadSpan(std::ostream& os, const RpczSpan& span, int64_t*
last_time,
+ SpanInfoExtractor* server_extr, bool use_html) {
+ SpanInfoExtractor client_extr(span.info().c_str());
+ int num_extr = 0;
+ SpanInfoExtractor* extr[2];
+ if (server_extr) {
+ extr[num_extr++] = server_extr;
+ }
+ extr[num_extr++] = &client_extr;
+ PrintAnnotations(os, std::numeric_limits<int64_t>::max(), last_time, extr,
num_extr);
+}
static void PrintServerSpan(std::ostream& os, const RpczSpan& span,
bool use_html) {
@@ -351,8 +362,12 @@ static void PrintServerSpan(std::ostream& os, const
RpczSpan& span,
const int nclient = span.client_spans_size();
for (int i = 0; i < nclient; ++i) {
- PrintClientSpan(os, span.client_spans(i), &last_time,
- &server_extr, use_html);
+ auto& client_span = span.client_spans(i);
+ if (client_span.type() == SPAN_TYPE_CLIENT) {
+ PrintClientSpan(os, client_span, &last_time, &server_extr,
use_html);
+ } else {
+ PrintBthreadSpan(os, client_span, &last_time, &server_extr,
use_html);
+ }
}
if (PrintAnnotationsAndRealTimeSpan(
diff --git a/src/brpc/global.cpp b/src/brpc/global.cpp
index fbd669e7..121b9d97 100644
--- a/src/brpc/global.cpp
+++ b/src/brpc/global.cpp
@@ -50,6 +50,11 @@
#include "brpc/policy/hasher.h"
#include "brpc/policy/dynpart_load_balancer.h"
+
+// Span
+#include "brpc/span.h"
+#include "bthread/unstable.h"
+
// Compress handlers
#include "brpc/compress.h"
#include "brpc/policy/gzip_compress.h"
@@ -329,6 +334,9 @@ static void GlobalInitializeOrDieImpl() {
// Make GOOGLE_LOG print to comlog device
SetLogHandler(&BaiduStreamingLogHandler);
+ // Set bthread create span function
+ bthread_set_create_span_func(CreateBthreadSpan);
+
// Setting the variable here does not work, the profiler probably check
// the variable before main() for only once.
// setenv("TCMALLOC_SAMPLE_PARAMETER", "524288", 0);
diff --git a/src/brpc/span.cpp b/src/brpc/span.cpp
index 6e0e64af..356a7cd0 100644
--- a/src/brpc/span.cpp
+++ b/src/brpc/span.cpp
@@ -121,6 +121,7 @@ Span* Span::CreateClientSpan(const std::string&
full_method_name,
span->_start_send_real_us = 0;
span->_sent_real_us = 0;
span->_next_client = NULL;
+ span->_client_list = NULL;
span->_tls_next = NULL;
span->_full_method_name = full_method_name;
span->_info.clear();
@@ -129,8 +130,8 @@ Span* Span::CreateClientSpan(const std::string&
full_method_name,
span->_trace_id = parent->trace_id();
span->_parent_span_id = parent->span_id();
span->_local_parent = parent;
- span->_next_client = parent->_next_client;
- parent->_next_client = span;
+ span->_next_client = parent->_client_list;
+ parent->_client_list = span;
} else {
span->_trace_id = GenerateTraceId();
span->_parent_span_id = 0;
@@ -140,6 +141,47 @@ Span* Span::CreateClientSpan(const std::string&
full_method_name,
return span;
}
+Span* Span::CreateBthreadSpan(const std::string& full_method_name,
+ int64_t base_real_us) {
+ Span* parent = (Span*)bthread::tls_bls.rpcz_parent_span;
+ if (parent == NULL) {
+ return NULL;
+ }
+ Span* span = butil::get_object<Span>(Forbidden());
+ if (__builtin_expect(span == NULL, 0)) {
+ return NULL;
+ }
+ span->_log_id = 0;
+ span->_base_cid = INVALID_BTHREAD_ID;
+ span->_ending_cid = INVALID_BTHREAD_ID;
+ span->_type = SPAN_TYPE_BTHREAD;
+ span->_async = false;
+ span->_protocol = PROTOCOL_UNKNOWN;
+ span->_error_code = 0;
+ span->_request_size = 0;
+ span->_response_size = 0;
+ span->_base_real_us = base_real_us;
+ span->_received_real_us = 0;
+ span->_start_parse_real_us = 0;
+ span->_start_callback_real_us = 0;
+ span->_start_send_real_us = 0;
+ span->_sent_real_us = 0;
+ span->_next_client = NULL;
+ span->_client_list = NULL;
+ span->_tls_next = NULL;
+ span->_full_method_name = full_method_name;
+ span->_info.clear();
+
+ span->_trace_id = parent->trace_id();
+ span->_parent_span_id = parent->span_id();
+ span->_local_parent = parent;
+ span->_next_client = parent->_client_list;
+ parent->_client_list = span;
+
+ span->_span_id = GenerateSpanId();
+ return span;
+}
+
inline const std::string& unknown_span_name() {
// thread-safe in gcc.
static std::string s_unknown_method_name = "unknown_method";
@@ -173,6 +215,7 @@ Span* Span::CreateServerSpan(
span->_start_send_real_us = 0;
span->_sent_real_us = 0;
span->_next_client = NULL;
+ span->_client_list = NULL;
span->_tls_next = NULL;
span->_full_method_name = (!full_method_name.empty() ?
full_method_name : unknown_span_name());
@@ -195,15 +238,20 @@ void Span::ResetServerSpanName(const std::string&
full_method_name) {
void Span::destroy() {
EndAsParent();
- Span* p = _next_client;
- while (p) {
- Span* p_next = p->_next_client;
- p->_info.clear();
- butil::return_object(p);
- p = p_next;
+ traversal(this, [](Span* r) {
+ r->_info.clear();
+ butil::return_object(r);
+ });
+}
+
+void Span::traversal(Span* r, const std::function<void(Span*)>& f) const {
+ if (r == NULL) {
+ return;
}
- _info.clear();
- butil::return_object(this);
+ for (auto p = r->_client_list; p != NULL; p = p->_next_client) {
+ traversal(p, f);
+ }
+ f(r);
}
void Span::Annotate(const char* fmt, ...) {
@@ -243,8 +291,8 @@ void Span::AnnotateCStr(const char* info, size_t length) {
size_t Span::CountClientSpans() const {
size_t n = 0;
- for (Span* p = _next_client; p; p = p->_next_client, ++n);
- return n;
+ traversal(const_cast<Span*>(this), [&](Span*) { ++n; });
+ return n - 1;
}
int64_t Span::GetStartRealTimeUs() const {
@@ -577,9 +625,13 @@ leveldb::Status SpanDB::Index(const Span* span,
std::string* value_buf) {
value_proto.add_client_spans();
}
size_t i = 0;
- for (const Span* p = span->_next_client; p; p = p->_next_client, ++i) {
+ span->traversal(const_cast<Span*>(span), [&](Span* p) {
+ if (span == p) {
+ return;
+ }
Span2Proto(p, value_proto.mutable_client_spans(client_span_count - i -
1));
- }
+ ++i;
+ });
if (!value_proto.SerializeToString(value_buf)) {
return leveldb::Status::InvalidArgument(
leveldb::Slice("Fail to serialize RpczSpan"));
diff --git a/src/brpc/span.h b/src/brpc/span.h
index 43ede3d5..44273405 100644
--- a/src/brpc/span.h
+++ b/src/brpc/span.h
@@ -69,6 +69,10 @@ public:
static Span* CreateClientSpan(const std::string& full_method_name,
int64_t base_real_us);
+ // Create a span to track start bthread
+ static Span* CreateBthreadSpan(const std::string& full_method_name,
+ int64_t base_real_us);
+
static void Submit(Span* span, int64_t cpuwide_time_us);
// Set tls parent.
@@ -82,7 +86,7 @@ public:
void Annotate(const std::string& info);
// When length <= 0, use strlen instead.
void AnnotateCStr(const char* cstr, size_t length);
-
+
// #child spans, Not O(1)
size_t CountClientSpans() const;
@@ -142,6 +146,7 @@ private:
void dump_and_destroy(size_t round_index);
void destroy();
+ void traversal(Span*, const std::function<void(Span*)>&) const;
bvar::CollectorSpeedLimit* speed_limit();
bvar::CollectorPreprocessor* preprocessor();
@@ -179,6 +184,7 @@ private:
Span* _local_parent;
Span* _next_client;
+ Span* _client_list;
Span* _tls_next;
};
@@ -233,6 +239,12 @@ inline bool IsTraceable(bool is_upstream_traced) {
(FLAGS_enable_rpcz && bvar::is_collectable(&g_span_sl));
}
+inline void* CreateBthreadSpan() {
+ const int64_t received_us = butil::cpuwide_time_us();
+ const int64_t base_realtime = butil::gettimeofday_us() - received_us;
+ return Span::CreateBthreadSpan("Bthread", base_realtime);
+}
+
} // namespace brpc
diff --git a/src/brpc/span.proto b/src/brpc/span.proto
index a77a9cf6..d37a53e2 100644
--- a/src/brpc/span.proto
+++ b/src/brpc/span.proto
@@ -25,6 +25,7 @@ option java_outer_classname="Span";
enum SpanType {
SPAN_TYPE_SERVER = 0;
SPAN_TYPE_CLIENT = 1;
+ SPAN_TYPE_BTHREAD = 2;
}
// We don't unify RpczSpan and TracingSpan as one because the former one needs
diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp
index 608c1b58..07233e60 100644
--- a/src/bthread/bthread.cpp
+++ b/src/bthread/bthread.cpp
@@ -83,6 +83,7 @@ TaskControl* g_task_control = NULL;
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
extern void (*g_worker_startfn)();
extern void (*g_tagged_worker_startfn)(bthread_tag_t);
+extern void* (*g_create_span_func)();
inline TaskControl* get_task_control() {
return g_task_control;
@@ -489,6 +490,14 @@ int bthread_set_tagged_worker_startfn(void
(*start_fn)(bthread_tag_t)) {
return 0;
}
+int bthread_set_create_span_func(void* (*func)()) {
+ if (func == NULL) {
+ return EINVAL;
+ }
+ bthread::g_create_span_func = func;
+ return 0;
+}
+
void bthread_stop_world() {
bthread::TaskControl* c = bthread::get_task_control();
if (c != NULL) {
diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp
index 46bd2460..45bd89ed 100644
--- a/src/bthread/task_group.cpp
+++ b/src/bthread/task_group.cpp
@@ -76,6 +76,15 @@ const size_t OFFSET_TABLE[] = {
#include "bthread/offset_inl.list"
};
+void* (*g_create_span_func)() = NULL;
+
+void* run_create_span_func() {
+ if (g_create_span_func) {
+ return g_create_span_func();
+ }
+ return tls_bls.rpcz_parent_span;
+}
+
int TaskGroup::get_attr(bthread_t tid, bthread_attr_t* out) {
TaskMeta* const m = address_meta(tid);
if (m != NULL) {
@@ -383,7 +392,7 @@ int TaskGroup::start_foreground(TaskGroup** pg,
m->attr = using_attr;
m->local_storage = LOCAL_STORAGE_INIT;
if (using_attr.flags & BTHREAD_INHERIT_SPAN) {
- m->local_storage.rpcz_parent_span = tls_bls.rpcz_parent_span;
+ m->local_storage.rpcz_parent_span = run_create_span_func();
}
m->cpuwide_start_ns = start_ns;
m->stat = EMPTY_STAT;
@@ -443,7 +452,7 @@ int TaskGroup::start_background(bthread_t* __restrict th,
m->attr = using_attr;
m->local_storage = LOCAL_STORAGE_INIT;
if (using_attr.flags & BTHREAD_INHERIT_SPAN) {
- m->local_storage.rpcz_parent_span = tls_bls.rpcz_parent_span;
+ m->local_storage.rpcz_parent_span = run_create_span_func();
}
m->cpuwide_start_ns = start_ns;
m->stat = EMPTY_STAT;
diff --git a/src/bthread/unstable.h b/src/bthread/unstable.h
index 5922cc2f..f5bdeecb 100644
--- a/src/bthread/unstable.h
+++ b/src/bthread/unstable.h
@@ -91,6 +91,9 @@ extern int bthread_set_worker_startfn(void (*start_fn)());
// Add a startup function with tag
extern int bthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t));
+// Add a create span function
+extern int bthread_set_create_span_func(void* (*func)());
+
// Stop all bthread and worker pthreads.
// You should avoid calling this function which may cause bthread after main()
// suspend indefinitely.
diff --git a/test/brpc_alpn_protocol_unittest.cpp
b/test/brpc_alpn_protocol_unittest.cpp
index 7884b3fe..3040355a 100644
--- a/test/brpc_alpn_protocol_unittest.cpp
+++ b/test/brpc_alpn_protocol_unittest.cpp
@@ -31,7 +31,7 @@ DEFINE_string(listen_addr, "0.0.0.0:8011", "Server listen
address.");
int main(int argc, char* argv[]) {
testing::InitGoogleTest(&argc, argv);
- google::ParseCommandLineFlags(&argc, &argv, true);
+ GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
return RUN_ALL_TESTS();
}
diff --git a/test/bthread_unittest.cpp b/test/bthread_unittest.cpp
index 72b8da3f..5ed8aba8 100644
--- a/test/bthread_unittest.cpp
+++ b/test/bthread_unittest.cpp
@@ -513,7 +513,7 @@ TEST_F(BthreadTest, bthread_usleep) {
}
static const bthread_attr_t BTHREAD_ATTR_NORMAL_WITH_SPAN =
-{ BTHREAD_STACKTYPE_NORMAL, BTHREAD_INHERIT_SPAN, NULL };
+{ BTHREAD_STACKTYPE_NORMAL, BTHREAD_INHERIT_SPAN, NULL, BTHREAD_TAG_INVALID };
void* test_parent_span(void* p) {
uint64_t *q = (uint64_t *)p;
@@ -522,6 +522,32 @@ void* test_parent_span(void* p) {
return NULL;
}
+void* test_grandson_parent_span(void* p) {
+ uint64_t* q = (uint64_t*)p;
+ *q = (uint64_t)(bthread::tls_bls.rpcz_parent_span);
+ LOG(INFO) << "parent span id in thread is " << *q;
+ return NULL;
+}
+
+void* test_son_parent_span(void* p) {
+ uint64_t* q = (uint64_t*)p;
+ *q = (uint64_t)(bthread::tls_bls.rpcz_parent_span);
+ LOG(INFO) << "parent span id in thread is " << *q;
+ bthread_t th;
+ uint64_t multi_p;
+ bthread_start_urgent(&th, &BTHREAD_ATTR_NORMAL_WITH_SPAN,
test_grandson_parent_span, &multi_p);
+ bthread_join(th, NULL);
+ return NULL;
+}
+
+static uint64_t targets[] = {0xBADBEB0UL, 0xBADBEB1UL, 0xBADBEB2UL,
0xBADBEB3UL};
+void* create_span_func() {
+ static std::atomic<int> index(0);
+ auto idx = index.fetch_add(1);
+ LOG(INFO) << "Bthread create span " << targets[idx];
+ return (void*)targets[idx];
+}
+
TEST_F(BthreadTest, test_span) {
uint64_t p1 = 0;
uint64_t p2 = 0;
@@ -531,17 +557,32 @@ TEST_F(BthreadTest, test_span) {
bthread::tls_bls.rpcz_parent_span = (void*)target;
bthread_t th1;
- ASSERT_EQ(0, bthread_start_urgent(&th1, &BTHREAD_ATTR_NORMAL_WITH_SPAN,
- test_parent_span, &p1));
+ ASSERT_EQ(0, bthread_start_urgent(&th1, &BTHREAD_ATTR_NORMAL_WITH_SPAN,
test_parent_span, &p1));
ASSERT_EQ(0, bthread_join(th1, NULL));
bthread_t th2;
- ASSERT_EQ(0, bthread_start_background(&th2, NULL,
- test_parent_span, &p2));
+ ASSERT_EQ(0, bthread_start_background(&th2, NULL, test_parent_span, &p2));
ASSERT_EQ(0, bthread_join(th2, NULL));
ASSERT_EQ(p1, target);
ASSERT_NE(p2, target);
+
+ LOG(INFO) << "Test bthread create span";
+
+ bthread_set_create_span_func(create_span_func);
+
+ bthread_t multi_th1;
+ bthread_t multi_th2;
+ uint64_t multi_p1;
+ uint64_t multi_p2;
+ ASSERT_EQ(0, bthread_start_background(&multi_th1,
&BTHREAD_ATTR_NORMAL_WITH_SPAN,
+ test_son_parent_span, &multi_p1));
+ ASSERT_EQ(0, bthread_start_background(&multi_th2,
&BTHREAD_ATTR_NORMAL_WITH_SPAN,
+ test_son_parent_span, &multi_p2));
+ ASSERT_EQ(0, bthread_join(multi_th1, NULL));
+ ASSERT_EQ(0, bthread_join(multi_th2, NULL));
+ ASSERT_EQ(multi_p1, targets[0]);
+ ASSERT_EQ(multi_p2, targets[1]);
}
void* dummy_thread(void*) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]