This is an automated email from the ASF dual-hosted git repository.
chenBright pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 56445c50 Support LeakSanitizer (#3361)
56445c50 is described below
commit 56445c507626b13c63e7331c8e917e8ab5aa3488
Author: Bright Chen <[email protected]>
AuthorDate: Sun Jun 28 14:04:39 2026 +0800
Support LeakSanitizer (#3361)
---
.github/workflows/ci-linux.yml | 1 +
src/brpc/controller.cpp | 2 +-
src/brpc/global.cpp | 19 +++++++-
src/brpc/rdma/block_pool.cpp | 7 +++
src/brpc/server.cpp | 84 ++++++++++++++++++--------------
src/brpc/socket_map.cpp | 9 ++++
src/brpc/span.cpp | 5 +-
src/bthread/task_group.cpp | 23 +++++----
src/bthread/timer_thread.cpp | 4 +-
src/butil/debug/leak_annotations.h | 11 +++--
src/butil/find_cstr.h | 8 +++
src/butil/lazy_instance.h | 1 -
src/butil/logging.cc | 2 +
src/butil/memory/singleton.h | 11 +----
src/butil/object_pool_inl.h | 51 +++++++++++++++++++
test/CMakeLists.txt | 5 ++
test/brpc_alpn_protocol_unittest.cpp | 8 ++-
test/brpc_event_dispatcher_unittest.cpp | 17 +++++++
test/brpc_http_rpc_protocol_unittest.cpp | 3 ++
test/brpc_input_messenger_unittest.cpp | 4 ++
test/brpc_load_balancer_unittest.cpp | 10 ++--
test/brpc_proto_unittest.cpp | 3 +-
test/brpc_protobuf_json_unittest.cpp | 2 +-
test/brpc_redis_cluster_unittest.cpp | 32 +++++++-----
test/brpc_redis_unittest.cpp | 79 ++++++++++++++++++------------
test/brpc_socket_unittest.cpp | 18 +++++--
test/brpc_ssl_unittest.cpp | 6 +++
test/bthread_dispatcher_unittest.cpp | 36 ++++++++++----
test/bthread_fd_unittest.cpp | 17 ++++---
test/bthread_key_unittest.cpp | 4 +-
test/iobuf_unittest.cpp | 3 ++
test/object_pool_unittest.cpp | 7 ++-
test/run_tests.sh | 3 +-
test/thread_key_unittest.cpp | 11 ++++-
34 files changed, 360 insertions(+), 146 deletions(-)
diff --git a/.github/workflows/ci-linux.yml b/.github/workflows/ci-linux.yml
index 06c58721..03e35f0c 100644
--- a/.github/workflows/ci-linux.yml
+++ b/.github/workflows/ci-linux.yml
@@ -264,5 +264,6 @@ jobs:
|| { echo "ERROR: failed to override protobuf version in
MODULE.bazel to ${TEST_PROTOBUF_VERSION}"; exit 1; }
- run: |
bazel test --action_env=CC=clang --config=rdma \
+ --define with_bthread_tracer=true \
--define with_babylon_counter=true \
//test/... --test_arg=--gtest_filter=-RdmaRpcTest.*
diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp
index 15c8c918..377d3cf1 100644
--- a/src/brpc/controller.cpp
+++ b/src/brpc/controller.cpp
@@ -186,6 +186,7 @@ void Controller::ResetNonPods() {
if (auto span = _span.lock()) {
Span::Submit(span, butil::cpuwide_time_us());
}
+ _span.reset();
_error_text.clear();
_remote_side = butil::EndPoint();
_local_side = butil::EndPoint();
@@ -240,7 +241,6 @@ void Controller::ResetNonPods() {
void Controller::ResetPods() {
// NOTE: Make the sequence of assignments same with the order that they're
// defined in header. Better for cpu cache and faster for lookup.
- _span.reset();
_flags = 0;
#ifndef BAIDU_INTERNAL
set_pb_bytes_to_base64(true);
diff --git a/src/brpc/global.cpp b/src/brpc/global.cpp
index 90f19cd5..641cb397 100644
--- a/src/brpc/global.cpp
+++ b/src/brpc/global.cpp
@@ -28,6 +28,7 @@
#include <signal.h>
#include "butil/build_config.h" // OS_LINUX
+#include "butil/debug/leak_annotations.h"
// Naming services
#ifdef BAIDU_INTERNAL
#include "brpc/policy/baidu_naming_service.h"
@@ -216,6 +217,14 @@ static int GetRunningServerCount(void*) {
// Update global stuff periodically.
static void* GlobalUpdate(void*) {
+ // This bthread runs for the whole process lifetime and never returns, so
+ // the local objects below live until the process exits and their
+ // destructors never run. They are reachable from this bthread's stack, so
+ // the objects themselves are not reported as leaks, but the heap buffers
+ // they allocate while exposing themselves (variable names, watched path)
+ // would be. Disable leak detection only around their construction and
+ // re-enable it right after.
+ ANNOTATE_MEMORY_LEAK_DISABLE();
// Expose variables.
bvar::PassiveStatus<int64_t> var_iobuf_block_count(
"iobuf_block_count", GetIOBufBlockCount, NULL);
@@ -232,7 +241,9 @@ static void* GlobalUpdate(void*) {
"rpc_server_count", GetRunningServerCount, NULL);
butil::FileWatcher fw;
- if (fw.init_from_not_exist(DUMMY_SERVER_PORT_FILE) < 0) {
+ const int fw_rc = fw.init_from_not_exist(DUMMY_SERVER_PORT_FILE);
+ ANNOTATE_MEMORY_LEAK_ENABLE();
+ if (fw_rc < 0) {
LOG(FATAL) << "Fail to init FileWatcher on `" <<
DUMMY_SERVER_PORT_FILE << "'";
return NULL;
}
@@ -270,7 +281,11 @@ static void* GlobalUpdate(void*) {
}
}
- SocketMapList(&conns);
+ {
+ // See detail above.
+ ANNOTATE_SCOPED_MEMORY_LEAK;
+ SocketMapList(&conns);
+ }
const int64_t now_ms = butil::cpuwide_time_ms();
for (size_t i = 0; i < conns.size(); ++i) {
SocketUniquePtr ptr;
diff --git a/src/brpc/rdma/block_pool.cpp b/src/brpc/rdma/block_pool.cpp
index 36c763ec..d8dbb8ab 100644
--- a/src/brpc/rdma/block_pool.cpp
+++ b/src/brpc/rdma/block_pool.cpp
@@ -615,10 +615,17 @@ void DestroyBlockPool() {
node = tmp;
}
g_info->idle_list[i][j] = NULL;
+ // Release the per-bucket mutexes allocated in InitBlockPool.
+ delete g_info->lock[i][j];
+ g_info->lock[i][j] = NULL;
}
}
delete g_info;
g_info = NULL;
+ delete g_dump_mutex;
+ g_dump_mutex = NULL;
+ delete g_tls_info_mutex;
+ g_tls_info_mutex = NULL;
for (int i = 0; i < g_region_num; ++i) {
if (g_regions[i].start == 0) {
break;
diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp
index 57f665da..6e1f9e88 100644
--- a/src/brpc/server.cpp
+++ b/src/brpc/server.cpp
@@ -500,107 +500,107 @@ Server::~Server() {
int Server::AddBuiltinServices() {
// Firstly add services shown in tabs.
- if (AddBuiltinService(new (std::nothrow) StatusService)) {
+ if (AddBuiltinService(new StatusService)) {
LOG(ERROR) << "Fail to add StatusService";
return -1;
}
- if (AddBuiltinService(new (std::nothrow) VarsService)) {
+ if (AddBuiltinService(new VarsService)) {
LOG(ERROR) << "Fail to add VarsService";
return -1;
}
- if (AddBuiltinService(new (std::nothrow) ConnectionsService)) {
+ if (AddBuiltinService(new ConnectionsService)) {
LOG(ERROR) << "Fail to add ConnectionsService";
return -1;
}
- if (AddBuiltinService(new (std::nothrow) FlagsService)) {
+ if (AddBuiltinService(new FlagsService)) {
LOG(ERROR) << "Fail to add FlagsService";
return -1;
}
- if (AddBuiltinService(new (std::nothrow) RpczService)) {
+ if (AddBuiltinService(new RpczService)) {
LOG(ERROR) << "Fail to add RpczService";
return -1;
}
- if (AddBuiltinService(new (std::nothrow) HotspotsService)) {
+ if (AddBuiltinService(new HotspotsService)) {
LOG(ERROR) << "Fail to add HotspotsService";
return -1;
}
- if (AddBuiltinService(new (std::nothrow) IndexService)) {
+ if (AddBuiltinService(new IndexService)) {
LOG(ERROR) << "Fail to add IndexService";
return -1;
}
// Add other services.
- if (AddBuiltinService(new (std::nothrow) VersionService(this))) {
+ if (AddBuiltinService(new VersionService(this))) {
LOG(ERROR) << "Fail to add VersionService";
return -1;
}
- if (AddBuiltinService(new (std::nothrow) HealthService)) {
+ if (AddBuiltinService(new HealthService)) {
LOG(ERROR) << "Fail to add HealthService";
return -1;
}
- if (AddBuiltinService(new (std::nothrow) ProtobufsService(this))) {
+ if (AddBuiltinService(new ProtobufsService(this))) {
LOG(ERROR) << "Fail to add ProtobufsService";
return -1;
}
- if (AddBuiltinService(new (std::nothrow) BadMethodService)) {
+ if (AddBuiltinService(new BadMethodService)) {
LOG(ERROR) << "Fail to add BadMethodService";
return -1;
}
- if (AddBuiltinService(new (std::nothrow) ListService(this))) {
+ if (AddBuiltinService(new ListService(this))) {
LOG(ERROR) << "Fail to add ListService";
return -1;
}
- if (AddBuiltinService(new (std::nothrow) PrometheusMetricsService)) {
+ if (AddBuiltinService(new PrometheusMetricsService)) {
LOG(ERROR) << "Fail to add MetricsService";
return -1;
}
if (FLAGS_enable_threads_service &&
- AddBuiltinService(new (std::nothrow) ThreadsService)) {
+ AddBuiltinService(new ThreadsService)) {
LOG(ERROR) << "Fail to add ThreadsService";
return -1;
}
- if (AddBuiltinService(new (std::nothrow) MemoryService)) {
+ if (AddBuiltinService(new MemoryService)) {
LOG(ERROR) << "Fail to add MemoryService";
return -1;
}
#if !BRPC_WITH_GLOG
- if (AddBuiltinService(new (std::nothrow) VLogService)) {
+ if (AddBuiltinService(new VLogService)) {
LOG(ERROR) << "Fail to add VLogService";
return -1;
}
#endif
- if (AddBuiltinService(new (std::nothrow) PProfService)) {
+ if (AddBuiltinService(new PProfService)) {
LOG(ERROR) << "Fail to add PProfService";
return -1;
}
if (FLAGS_enable_dir_service &&
- AddBuiltinService(new (std::nothrow) DirService)) {
+ AddBuiltinService(new DirService)) {
LOG(ERROR) << "Fail to add DirService";
return -1;
}
- if (AddBuiltinService(new (std::nothrow) BthreadsService)) {
+ if (AddBuiltinService(new BthreadsService)) {
LOG(ERROR) << "Fail to add BthreadsService";
return -1;
}
- if (AddBuiltinService(new (std::nothrow) IdsService)) {
+ if (AddBuiltinService(new IdsService)) {
LOG(ERROR) << "Fail to add IdsService";
return -1;
}
- if (AddBuiltinService(new (std::nothrow) SocketsService)) {
+ if (AddBuiltinService(new SocketsService)) {
LOG(ERROR) << "Fail to add SocketsService";
return -1;
}
- if (AddBuiltinService(new (std::nothrow) GetFaviconService)) {
+ if (AddBuiltinService(new GetFaviconService)) {
LOG(ERROR) << "Fail to add GetFaviconService";
return -1;
}
- if (AddBuiltinService(new (std::nothrow) GetJsService)) {
+ if (AddBuiltinService(new GetJsService)) {
LOG(ERROR) << "Fail to add GetJsService";
return -1;
}
- if (AddBuiltinService(new (std::nothrow) GrpcHealthCheckService)) {
+ if (AddBuiltinService(new GrpcHealthCheckService)) {
LOG(ERROR) << "Fail to add GrpcHealthCheckService";
return -1;
}
@@ -927,18 +927,20 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
_session_local_data_pool->Reserve(_options.reserved_session_local_data);
}
- // Leak of `_keytable_pool' and others is by design.
- // See comments in Server::Join() for details.
- // Instruct LeakSanitizer to ignore the designated memory leak.
- ANNOTATE_SCOPED_MEMORY_LEAK;
- // Init _keytable_pool always. If the server was stopped before, the pool
- // should be destroyed in Join().
- _keytable_pool = new bthread_keytable_pool_t;
- if (bthread_keytable_pool_init(_keytable_pool) != 0) {
- LOG(ERROR) << "Fail to init _keytable_pool";
- delete _keytable_pool;
- _keytable_pool = NULL;
- return -1;
+ {
+ // Leak of `_keytable_pool' and others is by design.
+ // See comments in Server::Join() for details.
+ // Instruct LeakSanitizer to ignore the designated memory leak.
+ ANNOTATE_SCOPED_MEMORY_LEAK;
+ // Init _keytable_pool always. If the server was stopped before, the
pool
+ // should be destroyed in Join().
+ _keytable_pool = new bthread_keytable_pool_t;
+ if (bthread_keytable_pool_init(_keytable_pool) != 0) {
+ LOG(ERROR) << "Fail to init _keytable_pool";
+ delete _keytable_pool;
+ _keytable_pool = NULL;
+ return -1;
+ }
}
if (_options.thread_local_data_factory) {
@@ -1635,7 +1637,15 @@ int Server::AddService(google::protobuf::Service*
service,
int Server::AddBuiltinService(google::protobuf::Service* service) {
ServiceOptions options;
options.ownership = SERVER_OWNS_SERVICE;
- return AddServiceInternal(service, true, options);
+ int rc = AddServiceInternal(service, true, options);
+ if (rc != 0) {
+ // AddServiceInternal does not take ownership of `service' on failure:
+ // for builtin services the only failure paths (name/fullname conflict)
+ // return before the service is inserted into any map. Delete it here
to
+ // avoid leaking the object allocated by the caller.
+ delete service;
+ }
+ return rc;
}
void Server::RemoveMethodsOf(google::protobuf::Service* service) {
diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp
index 8d934f58..1562e0a3 100644
--- a/src/brpc/socket_map.cpp
+++ b/src/brpc/socket_map.cpp
@@ -22,6 +22,7 @@
#include "butil/time.h"
#include "butil/scoped_lock.h"
#include "butil/logging.h"
+#include "butil/debug/leak_annotations.h"
#include "brpc/log.h"
#include "brpc/protocol.h"
#include "brpc/input_messenger.h"
@@ -384,11 +385,19 @@ void* SocketMap::RunWatchConnections(void* arg) {
}
void SocketMap::WatchConnections() {
+ // This bthread of SocketMap Singleton runs for the whole process lifetime
and
+ // never returns, so the local objects below live until the process exits
and
+ // their destructors never run. They are reachable from this bthread's
stack,
+ // so the objects themselves are not reported as leaks, but the heap
buffers
+ // they allocate while exposing themselves (variable names, watched path)
would
+ // be. Disable leak detection only around their construction and re-enable
it
+ // right after
std::vector<SocketId> main_sockets;
std::vector<SocketId> pooled_sockets;
std::vector<SocketMapKey> orphan_sockets;
const uint64_t CHECK_INTERVAL_US = 1000000UL;
while (bthread_usleep(CHECK_INTERVAL_US) == 0) {
+ ANNOTATE_SCOPED_MEMORY_LEAK;
// NOTE: save the gflag which may be reloaded at any time.
const int idle_seconds = _options.idle_timeout_second_dynamic ?
*_options.idle_timeout_second_dynamic
diff --git a/src/brpc/span.cpp b/src/brpc/span.cpp
index 3a53f33a..1863f01a 100644
--- a/src/brpc/span.cpp
+++ b/src/brpc/span.cpp
@@ -337,14 +337,11 @@ void Span::ResetServerSpanName(const std::string&
full_method_name) {
void Span::submit(int64_t cpuwide_us) {
// Note: this method is not called for client-side spans.
EndAsParent();
- SpanContainer* container = new(std::nothrow)
SpanContainer(shared_from_this());
// If memory allocation fails, the server span will not be submitted for
persistence.
// The server span will be destroyed later when its shared_ptr refcount
drops to zero
// Child spans (held in _client_list) will also be destroyed when
// their refcounts reach zero.
- if (container) {
- container->submit(cpuwide_us);
- }
+ (new SpanContainer(shared_from_this()))->submit(cpuwide_us);
}
void Span::Annotate(const char* fmt, ...) {
diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp
index 0fd09955..ddfa4738 100644
--- a/src/bthread/task_group.cpp
+++ b/src/bthread/task_group.cpp
@@ -417,10 +417,22 @@ void TaskGroup::task_runner(intptr_t skip_remained) {
<< m->stat.cputime_ns / 1000000.0 << "ms";
}
+ // Clean up span if it exists. This must be done before keytable
cleanup
+ // because span cleanup may use bthread local storage (e.g. logging,
+ // which allocates bthread-local stream arrays via
bthread_setspecific).
+ // If span cleanup ran after keytable cleanup, such allocations would
+ // re-populate the keytable and never be reclaimed, causing memory
leak.
+ LocalStorage* tls_bls_ptr =
BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(tls_bls);
+ if (tls_bls_ptr->rpcz_parent_span && g_rpcz_parent_span_dtor) {
+ g_rpcz_parent_span_dtor(tls_bls_ptr->rpcz_parent_span);
+ tls_bls_ptr = BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(tls_bls);
+ tls_bls_ptr->rpcz_parent_span = NULL;
+ m->local_storage.rpcz_parent_span = NULL;
+ }
+
// Clean tls variables, must be done before changing version_butex
// otherwise another thread just joined this thread may not see side
// effects of destructing tls variables.
- LocalStorage* tls_bls_ptr =
BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(tls_bls);
KeyTable* kt = tls_bls_ptr->keytable;
if (kt != NULL) {
return_keytable(m->attr.keytable_pool, kt);
@@ -430,15 +442,6 @@ void TaskGroup::task_runner(intptr_t skip_remained) {
m->local_storage.keytable = NULL; // optional
}
- // Clean up span if it exists. This must be done after keytable cleanup
- // because span cleanup may use bthread local storage.
- tls_bls_ptr = BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(tls_bls);
- if (tls_bls_ptr->rpcz_parent_span && g_rpcz_parent_span_dtor) {
- g_rpcz_parent_span_dtor(tls_bls_ptr->rpcz_parent_span);
- tls_bls_ptr->rpcz_parent_span = NULL;
- m->local_storage.rpcz_parent_span = NULL;
- }
-
// During running the function in TaskMeta and deleting the KeyTable in
// return_KeyTable, the group is probably changed.
g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
diff --git a/src/bthread/timer_thread.cpp b/src/bthread/timer_thread.cpp
index 813104a4..a7ebfa4c 100644
--- a/src/bthread/timer_thread.cpp
+++ b/src/bthread/timer_thread.cpp
@@ -445,7 +445,9 @@ void TimerThread::run() {
}
void TimerThread::stop_and_join() {
- _stop.store(true, butil::memory_order_relaxed);
+ if (_stop.exchange(true, butil::memory_order_relaxed)) {
+ return;
+ }
if (_started) {
{
BAIDU_SCOPED_LOCK(_mutex);
diff --git a/src/butil/debug/leak_annotations.h
b/src/butil/debug/leak_annotations.h
index 47387acb..bcbaa00e 100644
--- a/src/butil/debug/leak_annotations.h
+++ b/src/butil/debug/leak_annotations.h
@@ -26,16 +26,13 @@ extern "C" {
void __lsan_disable();
void __lsan_enable();
void __lsan_ignore_object(const void *p);
-
-// Invoke leak detection immediately. If leaks are found, the process will
exit.
-void __lsan_do_leak_check();
} // extern "C"
class ScopedLeakSanitizerDisabler {
public:
ScopedLeakSanitizerDisabler() { __lsan_disable(); }
~ScopedLeakSanitizerDisabler() { __lsan_enable(); }
-private:
+
DISALLOW_COPY_AND_ASSIGN(ScopedLeakSanitizerDisabler);
};
@@ -44,11 +41,17 @@ private:
#define ANNOTATE_LEAKING_OBJECT_PTR(X) __lsan_ignore_object(X)
+// Manually pair these to mark allocations made in between as intentional
non-leaks.
+#define ANNOTATE_MEMORY_LEAK_DISABLE() __lsan_disable()
+#define ANNOTATE_MEMORY_LEAK_ENABLE() __lsan_enable()
+
#else
// If neither HeapChecker nor LSan are used, the annotations should be no-ops.
#define ANNOTATE_SCOPED_MEMORY_LEAK ((void)0)
#define ANNOTATE_LEAKING_OBJECT_PTR(X) ((void)(X))
+#define ANNOTATE_MEMORY_LEAK_DISABLE() ((void)0)
+#define ANNOTATE_MEMORY_LEAK_ENABLE() ((void)0)
#endif
diff --git a/src/butil/find_cstr.h b/src/butil/find_cstr.h
index fd997139..103f9e70 100644
--- a/src/butil/find_cstr.h
+++ b/src/butil/find_cstr.h
@@ -24,6 +24,7 @@
#include <map>
#include <algorithm>
#include "butil/thread_local.h"
+#include "butil/debug/leak_annotations.h"
// Find c-string in maps with std::string as keys without memory allocations.
// Example:
@@ -57,6 +58,11 @@ struct StringMapThreadLocalTemp {
}
inline std::string* get_string(const char* key) {
+ // This thread-local string (and any buffer it reallocates) is
reclaimed
+ // via thread_atexit when the thread exits. If a thread is still alive
at
+ // leak-check time the allocation would be reported; it is a
thread-local
+ // cache, so mark its allocations as intentional non-leaks.
+ ANNOTATE_SCOPED_MEMORY_LEAK;
if (!initialized) {
initialized = true;
std::string* tmp = new (buf) std::string(key);
@@ -70,6 +76,8 @@ struct StringMapThreadLocalTemp {
}
inline std::string* get_string(const char* key, size_t length) {
+ // See the note in the other get_string overload.
+ ANNOTATE_SCOPED_MEMORY_LEAK;
if (!initialized) {
initialized = true;
std::string* tmp = new (buf) std::string(key, length);
diff --git a/src/butil/lazy_instance.h b/src/butil/lazy_instance.h
index e1daeb58..b300faf4 100644
--- a/src/butil/lazy_instance.h
+++ b/src/butil/lazy_instance.h
@@ -39,7 +39,6 @@
#include "butil/atomicops.h"
#include "butil/base_export.h"
-#include "butil/basictypes.h"
#include "butil/debug/leak_annotations.h"
#include "butil/logging.h"
#include "butil/memory/aligned_memory.h"
diff --git a/src/butil/logging.cc b/src/butil/logging.cc
index 69ecf182..29d4111e 100644
--- a/src/butil/logging.cc
+++ b/src/butil/logging.cc
@@ -69,6 +69,7 @@ typedef pthread_mutex_t* MutexHandle;
#include "butil/debug/alias.h"
#include "butil/debug/debugger.h"
#include "butil/debug/stack_trace.h"
+#include "butil/debug/leak_annotations.h"
#include "butil/posix/eintr_wrapper.h"
#include "butil/strings/string_util.h"
#include "butil/strings/stringprintf.h"
@@ -622,6 +623,7 @@ void AsyncLogger::Log(LogInfo&& log_info) {
DoLog(log_info);
return;
}
+ ANNOTATE_SCOPED_MEMORY_LEAK;
log_req->log_info = std::move(log_info);
LogImpl(log_req);
}
diff --git a/src/butil/memory/singleton.h b/src/butil/memory/singleton.h
index f76a8317..ff132bc4 100644
--- a/src/butil/memory/singleton.h
+++ b/src/butil/memory/singleton.h
@@ -24,8 +24,6 @@
#include "butil/base_export.h"
#include "butil/memory/aligned_memory.h"
#include "butil/third_party/dynamic_annotations/dynamic_annotations.h"
-#include "butil/threading/thread_restrictions.h"
-#include "butil/debug/leak_annotations.h"
namespace butil {
namespace internal {
@@ -267,13 +265,8 @@ class Singleton {
butil::subtle::Release_Store(
&instance_, reinterpret_cast<butil::subtle::AtomicWord>(newval));
- if (newval != NULL) {
- if (Traits::kRegisterAtExit) {
- butil::AtExitManager::RegisterCallback(OnExit, NULL);
- } else {
- // Instruct LeakSanitizer to ignore the designated memory leak.
- ANNOTATE_LEAKING_OBJECT_PTR(newval);
- }
+ if (newval != NULL && Traits::kRegisterAtExit) {
+ butil::AtExitManager::RegisterCallback(OnExit, NULL);
}
return newval;
diff --git a/src/butil/object_pool_inl.h b/src/butil/object_pool_inl.h
index cc6b411c..c98ec16f 100644
--- a/src/butil/object_pool_inl.h
+++ b/src/butil/object_pool_inl.h
@@ -26,6 +26,7 @@
#include <pthread.h> // pthread_mutex_t
#include <algorithm> // std::max, std::min
#include <vector>
+
#include "butil/atomicops.h" // butil::atomic
#include "butil/macros.h" // BAIDU_CACHELINE_ALIGNMENT
#include "butil/scoped_lock.h" // BAIDU_SCOPED_LOCK
@@ -357,12 +358,62 @@ private:
ObjectPool() {
_free_chunks.reserve(OP_INITIAL_FREE_LIST_SIZE);
pthread_mutex_init(&_free_chunks_mutex, NULL);
+#if defined(BUTIL_USE_ASAN) && \
+ !defined(BAIDU_CLEAR_OBJECT_POOL_AFTER_ALL_THREADS_QUIT)
+ // Objects returned to the pool stay ASan-poisoned (see
return_object()).
+ // LeakSanitizer skips poisoned memory when scanning for live pointers
+ // (its `use_poisoned' option is off by default), so heap memory that
is
+ // only reachable through pointers stored inside pooled objects (e.g.
+ // std::string buffers owned by cached protobuf messages) would be
+ // falsely reported as leaked at process exit. Un-poison all pooled
+ // objects right before LSan runs. LSan registers its leak check via
+ // atexit() very early during sanitizer init, so this handler
(registered
+ // lazily on the first singleton creation) runs before it because
atexit
+ // handlers execute in LIFO order.
+ // Not needed when BAIDU_CLEAR_OBJECT_POOL_AFTER_ALL_THREADS_QUIT is
+ // defined: clear_from_destructor_of_local_pool() then destructs and
+ // frees all pooled objects after the last thread quits.
+ if (ObjectPoolWithASanPoison<T>::value) {
+ atexit(unpoison_all_objects_before_leak_check);
+ }
+#endif
}
~ObjectPool() {
pthread_mutex_destroy(&_free_chunks_mutex);
}
+#if defined(BUTIL_USE_ASAN) && \
+ !defined(BAIDU_CLEAR_OBJECT_POOL_AFTER_ALL_THREADS_QUIT)
+ // Un-poison every constructed object still held by the pool so that
+ // LeakSanitizer can follow the pointers inside them. Only un-poisons, does
+ // not destruct: the pool intentionally keeps objects alive for reuse, and
+ // they remain reachable from the singleton, so they are not real leaks.
+ static void unpoison_all_objects_before_leak_check() {
+ if (NULL == _singleton.load(butil::memory_order_consume)) {
+ return;
+ }
+ const size_t ngroup = _ngroup.load(butil::memory_order_acquire);
+ for (size_t i = 0; i < ngroup; ++i) {
+ BlockGroup* bg =
_block_groups[i].load(butil::memory_order_consume);
+ if (NULL == bg) {
+ break;
+ }
+ const size_t nblock = std::min(
+ bg->nblock.load(butil::memory_order_relaxed), OP_GROUP_NBLOCK);
+ for (size_t j = 0; j < nblock; ++j) {
+ Block* b = bg->blocks[j].load(butil::memory_order_consume);
+ if (NULL == b) {
+ continue;
+ }
+ for (size_t k = 0; k < b->nitem; ++k) {
+ asan_unpoison_memory_region((T*)&b->items[k]);
+ }
+ }
+ }
+ }
+#endif
+
// Create a Block and append it to right-most BlockGroup.
static Block* add_block(size_t* index) {
Block* const new_block = new(std::nothrow) Block;
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index df8fedce..cd149408 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -216,6 +216,11 @@ if(BRPC_WITH_GLOG)
target_link_libraries(brpc-shared-debug ${GLOG_LIB})
endif()
+# AddressSanitizer is incompatible with tcmalloc/gperftools, do not link it
when WITH_ASAN is on
+if(WITH_ASAN)
+ set(GPERFTOOLS_LIBRARIES "")
+endif()
+
# test_butil
add_executable(test_butil ${TEST_BUTIL_SOURCES}
${CMAKE_CURRENT_BINARY_DIR}/iobuf.pb.cc)
diff --git a/test/brpc_alpn_protocol_unittest.cpp
b/test/brpc_alpn_protocol_unittest.cpp
index 9c150671..3e0fd138 100644
--- a/test/brpc_alpn_protocol_unittest.cpp
+++ b/test/brpc_alpn_protocol_unittest.cpp
@@ -99,13 +99,17 @@ public:
// SSL handshake.
SSL* ssl = brpc::CreateSSLSession(ssl_ctx, 0, cli_fd, false);
EXPECT_NE(nullptr, ssl);
- EXPECT_EQ(1, SSL_do_handshake(ssl));
+ EXPECT_EQ(1, SSL_do_handshake(ssl));
// Get handshake result.
const unsigned char* select_alpn = nullptr;
unsigned int len = 0;
SSL_get0_alpn_selected(ssl, &select_alpn, &len);
- return std::string(reinterpret_cast<const char*>(select_alpn), len);
+ std::string result(reinterpret_cast<const char*>(select_alpn), len);
+
+ SSL_free(ssl);
+ SSL_CTX_free(ssl_ctx);
+ return result;
}
private:
diff --git a/test/brpc_event_dispatcher_unittest.cpp
b/test/brpc_event_dispatcher_unittest.cpp
index 5c0aa064..dcca305f 100644
--- a/test/brpc_event_dispatcher_unittest.cpp
+++ b/test/brpc_event_dispatcher_unittest.cpp
@@ -202,6 +202,10 @@ struct BAIDU_CACHELINE_ALIGNMENT SocketExtra : public
brpc::SocketUser {
times = 0;
}
+ ~SocketExtra() {
+ free(buf);
+ }
+
void BeforeRecycle(brpc::Socket* m) override {
pthread_mutex_lock(&rel_fd_mutex);
rel_fd.push_back(m->fd());
@@ -293,6 +297,7 @@ void* client_thread(void* arg) {
}
}
}
+ free(buf);
EXPECT_EQ(0, close(m->fd));
return NULL;
}
@@ -320,6 +325,7 @@ TEST_F(EventDispatcherTest, dispatch_tasks) {
pthread_t cth[NCLIENT];
ClientMeta* cm[NCLIENT];
SocketExtra* sm[NCLIENT];
+ brpc::SocketId socket_ids[NCLIENT];
for (size_t i = 0; i < NCLIENT; ++i) {
ASSERT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, fds + 2 * i));
@@ -334,6 +340,7 @@ TEST_F(EventDispatcherTest, dispatch_tasks) {
options.on_edge_triggered_events = SocketExtra::OnEdgeTriggeredEvents;
ASSERT_EQ(0, brpc::Socket::Create(options, &socket_id));
+ socket_ids[i] = socket_id;
cm[i] = new ClientMeta;
cm[i]->fd = fds[i * 2 + 1];
cm[i]->times = 0;
@@ -387,6 +394,16 @@ TEST_F(EventDispatcherTest, dispatch_tasks) {
#ifdef BUTIL_RESOURCE_POOL_NEED_FREE_ITEM_NUM
ASSERT_EQ(NCLIENT, info.free_item_num - old_info.free_item_num);
#endif
+
+ // Release sockets (SocketExtra::BeforeRecycle deletes the user) and the
+ // per-client metadata to avoid leaking them.
+ for (size_t i = 0; i < NCLIENT; ++i) {
+ brpc::SocketUniquePtr s;
+ if (brpc::Socket::Address(socket_ids[i], &s) == 0) {
+ s->SetFailed();
+ }
+ delete cm[i];
+ }
}
// Unique identifier of a EventPipe.
diff --git a/test/brpc_http_rpc_protocol_unittest.cpp
b/test/brpc_http_rpc_protocol_unittest.cpp
index c7022ed2..3f3290bd 100644
--- a/test/brpc_http_rpc_protocol_unittest.cpp
+++ b/test/brpc_http_rpc_protocol_unittest.cpp
@@ -1772,6 +1772,9 @@ TEST_F(HttpTest, http2_goaway_sanity) {
butil::Status st = socket_message->AppendAndDestroySelf(&dummy,
_h2_client_sock.get());
ASSERT_EQ(st.error_code(), brpc::ELOGOFF);
ASSERT_TRUE(st.error_data().ends_with("the connection just issued
GOAWAY"));
+ // Release the reference held by stream_user_data (which is normally
released
+ // by Controller::Call::OnComplete) to avoid leaking the H2UnsentRequest.
+ h2_req->DestroyStreamUserData(_h2_client_sock, &cntl, 0, false);
}
class AfterRecevingGoAway : public ::google::protobuf::Closure {
diff --git a/test/brpc_input_messenger_unittest.cpp
b/test/brpc_input_messenger_unittest.cpp
index 812c499a..ae4afb6f 100644
--- a/test/brpc_input_messenger_unittest.cpp
+++ b/test/brpc_input_messenger_unittest.cpp
@@ -143,6 +143,7 @@ void* client_thread(void* arg) {
}
}
}
+ free(buf);
return NULL;
}
@@ -217,5 +218,8 @@ TEST_F(MessengerTest, dispatch_tasks) {
messenger[i].StopAccept(0);
}
sleep(1);
+ for (size_t i = 0; i < NCLIENT; ++i) {
+ delete cm[i];
+ }
LOG(WARNING) << "begin to exit!!!!";
}
diff --git a/test/brpc_load_balancer_unittest.cpp
b/test/brpc_load_balancer_unittest.cpp
index 76ad005e..757f9c9b 100644
--- a/test/brpc_load_balancer_unittest.cpp
+++ b/test/brpc_load_balancer_unittest.cpp
@@ -911,9 +911,11 @@ TEST_F(LoadBalancerTest,
weighted_round_robin_no_valid_server) {
brpc::ServerId id(8888);
brpc::SocketOptions options;
options.remote_side = dummy;
- options.user = new SaveRecycle;
id.tag = weight[i];
if (i < 2) {
+ // `user` is owned by the Socket; only allocate it when a Socket is
+ // actually created, otherwise it would leak.
+ options.user = new SaveRecycle;
ASSERT_EQ(0, brpc::Socket::Create(options, &id.id));
}
EXPECT_TRUE(wrrlb.AddServer(id));
@@ -1107,14 +1109,14 @@ TEST_F(LoadBalancerTest,
revived_from_all_failed_sanity) {
"10.92.115.19:8832",
"10.42.122.201:8833",
};
- brpc::LoadBalancer* lb = NULL;
+ std::unique_ptr<brpc::LoadBalancer> lb;
int rand = butil::fast_rand_less_than(2);
if (rand == 0) {
brpc::policy::RandomizedLoadBalancer rlb;
- lb = rlb.New("min_working_instances=2 hold_seconds=2");
+ lb.reset(rlb.New("min_working_instances=2 hold_seconds=2"));
} else if (rand == 1) {
brpc::policy::RoundRobinLoadBalancer rrlb;
- lb = rrlb.New("min_working_instances=2 hold_seconds=2");
+ lb.reset(rrlb.New("min_working_instances=2 hold_seconds=2"));
}
brpc::SocketUniquePtr ptr[2];
for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) {
diff --git a/test/brpc_proto_unittest.cpp b/test/brpc_proto_unittest.cpp
index 052a0671..88ed40da 100644
--- a/test/brpc_proto_unittest.cpp
+++ b/test/brpc_proto_unittest.cpp
@@ -19,6 +19,7 @@
#include <gtest/gtest.h>
+#include <memory>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/descriptor.pb.h>
#include <google/protobuf/dynamic_message.h>
@@ -58,7 +59,7 @@ TEST(ProtoTest, proto) {
meta.set_correlation_id(123);
std::string data;
ASSERT_TRUE(meta.SerializeToString(&data));
- Message *msg = factory.GetPrototype(new_desc)->New();
+ std::unique_ptr<Message> msg(factory.GetPrototype(new_desc)->New());
ASSERT_TRUE(msg != NULL);
ASSERT_TRUE(msg->ParseFromString(data));
ASSERT_TRUE(msg->SerializeToString(&data));
diff --git a/test/brpc_protobuf_json_unittest.cpp
b/test/brpc_protobuf_json_unittest.cpp
index aa73fbd8..b5ad0bb2 100644
--- a/test/brpc_protobuf_json_unittest.cpp
+++ b/test/brpc_protobuf_json_unittest.cpp
@@ -1136,7 +1136,7 @@ TEST_F(ProtobufJsonTest, pb_to_json_control_char_case) {
person->set_id(100);
char ch = 0x01;
- char* name = new char[17];
+ char name[17];
memcpy(name, "baidu ", 6);
name[6] = ch;
char c = 0x08;
diff --git a/test/brpc_redis_cluster_unittest.cpp
b/test/brpc_redis_cluster_unittest.cpp
index 3047159c..34a6d9ba 100644
--- a/test/brpc_redis_cluster_unittest.cpp
+++ b/test/brpc_redis_cluster_unittest.cpp
@@ -424,20 +424,26 @@ private:
class ClusterRedisService : public brpc::RedisService {
public:
- explicit ClusterRedisService(NodeData* data) {
- AddCommandHandler("asking", new AskingHandler());
- AddCommandHandler("cluster", new ClusterCommandHandler(data->meta));
-
- KVCommandHandler* handler = new KVCommandHandler(data);
- AddCommandHandler("ping", handler);
- AddCommandHandler("get", handler);
- AddCommandHandler("set", handler);
- AddCommandHandler("del", handler);
- AddCommandHandler("exists", handler);
- AddCommandHandler("unlink", handler);
- AddCommandHandler("eval", handler);
- AddCommandHandler("evalsha", handler);
+ explicit ClusterRedisService(NodeData* data)
+ : _asking_handler(new AskingHandler())
+ , _cluster_handler(new ClusterCommandHandler(data->meta))
+ , _kv_handler(new KVCommandHandler(data)) {
+ AddCommandHandler("asking", _asking_handler.get());
+ AddCommandHandler("cluster", _cluster_handler.get());
+ AddCommandHandler("ping", _kv_handler.get());
+ AddCommandHandler("get", _kv_handler.get());
+ AddCommandHandler("set", _kv_handler.get());
+ AddCommandHandler("del", _kv_handler.get());
+ AddCommandHandler("exists", _kv_handler.get());
+ AddCommandHandler("unlink", _kv_handler.get());
+ AddCommandHandler("eval", _kv_handler.get());
+ AddCommandHandler("evalsha", _kv_handler.get());
}
+
+private:
+ std::unique_ptr<AskingHandler> _asking_handler;
+ std::unique_ptr<ClusterCommandHandler> _cluster_handler;
+ std::unique_ptr<KVCommandHandler> _kv_handler;
};
class Done : public google::protobuf::Closure {
diff --git a/test/brpc_redis_unittest.cpp b/test/brpc_redis_unittest.cpp
index 9597f6ab..39cf5cf5 100644
--- a/test/brpc_redis_unittest.cpp
+++ b/test/brpc_redis_unittest.cpp
@@ -17,6 +17,7 @@
#include <iostream>
+#include <memory>
#include <unordered_map>
#include <butil/time.h>
#include <butil/logging.h>
@@ -1144,24 +1145,26 @@ private:
TEST_F(RedisTest, server_sanity) {
std::string password = GeneratePassword();
+ std::unique_ptr<brpc::policy::RedisAuthenticator> redis_auth_holder(
+ new brpc::policy::RedisAuthenticator(password));
+ RedisServiceImpl* rsimpl = new RedisServiceImpl(password);
+ std::unique_ptr<GetCommandHandler> gh(new GetCommandHandler(rsimpl));
+ std::unique_ptr<SetCommandHandler> sh(new SetCommandHandler(rsimpl));
+ std::unique_ptr<AuthCommandHandler> ah(new AuthCommandHandler(rsimpl));
+ std::unique_ptr<IncrCommandHandler> ih(new IncrCommandHandler(rsimpl));
brpc::Server server;
brpc::ServerOptions server_options;
- RedisServiceImpl* rsimpl = new RedisServiceImpl(password);
- GetCommandHandler *gh = new GetCommandHandler(rsimpl);
- SetCommandHandler *sh = new SetCommandHandler(rsimpl);
- AuthCommandHandler *ah = new AuthCommandHandler(rsimpl);
- IncrCommandHandler *ih = new IncrCommandHandler(rsimpl);
- rsimpl->AddCommandHandler("get", gh);
- rsimpl->AddCommandHandler("set", sh);
- rsimpl->AddCommandHandler("incr", ih);
- rsimpl->AddCommandHandler("auth", ah);
+ rsimpl->AddCommandHandler("get", gh.get());
+ rsimpl->AddCommandHandler("set", sh.get());
+ rsimpl->AddCommandHandler("incr", ih.get());
+ rsimpl->AddCommandHandler("auth", ah.get());
server_options.redis_service = rsimpl;
brpc::PortRange pr(8081, 8900);
ASSERT_EQ(0, server.Start("127.0.0.1", pr, &server_options));
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_REDIS;
- options.auth = new brpc::policy::RedisAuthenticator(password);
+ options.auth = redis_auth_holder.get();
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port,
&options));
@@ -1238,13 +1241,15 @@ void* incr_thread(void* arg) {
TEST_F(RedisTest, server_concurrency) {
std::string password = GeneratePassword();
int N = 10;
+ std::unique_ptr<brpc::policy::RedisAuthenticator> redis_auth_holder(
+ new brpc::policy::RedisAuthenticator(password));
+ RedisServiceImpl* rsimpl = new RedisServiceImpl(password);
+ std::unique_ptr<AuthCommandHandler> ah(new AuthCommandHandler(rsimpl));
+ std::unique_ptr<IncrCommandHandler> ih(new IncrCommandHandler(rsimpl));
brpc::Server server;
brpc::ServerOptions server_options;
- RedisServiceImpl* rsimpl = new RedisServiceImpl(password);
- AuthCommandHandler *ah = new AuthCommandHandler(rsimpl);
- IncrCommandHandler *ih = new IncrCommandHandler(rsimpl);
- rsimpl->AddCommandHandler("incr", ih);
- rsimpl->AddCommandHandler("auth", ah);
+ rsimpl->AddCommandHandler("incr", ih.get());
+ rsimpl->AddCommandHandler("auth", ah.get());
server_options.redis_service = rsimpl;
brpc::PortRange pr(8081, 8900);
ASSERT_EQ(0, server.Start("0.0.0.0", pr, &server_options));
@@ -1252,7 +1257,7 @@ TEST_F(RedisTest, server_concurrency) {
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_REDIS;
options.connection_type = "pooled";
- options.auth = new brpc::policy::RedisAuthenticator(password);
+ options.auth = redis_auth_holder.get();
std::vector<bthread_t> bths;
std::vector<brpc::Channel*> channels;
for (int i = 0; i < N; ++i) {
@@ -1324,21 +1329,28 @@ public:
TEST_F(RedisTest, server_command_continue) {
std::string password = GeneratePassword();
+ std::unique_ptr<brpc::policy::RedisAuthenticator> redis_auth_holder(
+ new brpc::policy::RedisAuthenticator(password));
+ RedisServiceImpl* rsimpl = new RedisServiceImpl(password);
+ std::unique_ptr<AuthCommandHandler> ah(new AuthCommandHandler(rsimpl));
+ std::unique_ptr<GetCommandHandler> gh(new GetCommandHandler(rsimpl));
+ std::unique_ptr<SetCommandHandler> sh(new SetCommandHandler(rsimpl));
+ std::unique_ptr<IncrCommandHandler> ih(new IncrCommandHandler(rsimpl));
+ std::unique_ptr<MultiCommandHandler> mh(new MultiCommandHandler);
brpc::Server server;
brpc::ServerOptions server_options;
- RedisServiceImpl* rsimpl = new RedisServiceImpl(password);
- rsimpl->AddCommandHandler("auth", new AuthCommandHandler(rsimpl));
- rsimpl->AddCommandHandler("get", new GetCommandHandler(rsimpl));
- rsimpl->AddCommandHandler("set", new SetCommandHandler(rsimpl));
- rsimpl->AddCommandHandler("incr", new IncrCommandHandler(rsimpl));
- rsimpl->AddCommandHandler("multi", new MultiCommandHandler);
+ rsimpl->AddCommandHandler("auth", ah.get());
+ rsimpl->AddCommandHandler("get", gh.get());
+ rsimpl->AddCommandHandler("set", sh.get());
+ rsimpl->AddCommandHandler("incr", ih.get());
+ rsimpl->AddCommandHandler("multi", mh.get());
server_options.redis_service = rsimpl;
brpc::PortRange pr(8081, 8900);
ASSERT_EQ(0, server.Start("127.0.0.1", pr, &server_options));
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_REDIS;
- options.auth = new brpc::policy::RedisAuthenticator(password);
+ options.auth = redis_auth_holder.get();
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port,
&options));
{
@@ -1401,23 +1413,26 @@ TEST_F(RedisTest, server_command_continue) {
TEST_F(RedisTest, server_handle_pipeline) {
std::string password = GeneratePassword();
+ std::unique_ptr<brpc::policy::RedisAuthenticator> redis_auth_holder(
+ new brpc::policy::RedisAuthenticator(password));
+ RedisServiceImpl* rsimpl = new RedisServiceImpl(password);
+ std::unique_ptr<GetCommandHandler> getch(new GetCommandHandler(rsimpl,
true));
+ std::unique_ptr<SetCommandHandler> setch(new SetCommandHandler(rsimpl,
true));
+ std::unique_ptr<AuthCommandHandler> authch(new AuthCommandHandler(rsimpl));
+ std::unique_ptr<MultiCommandHandler> multich(new MultiCommandHandler);
brpc::Server server;
brpc::ServerOptions server_options;
- RedisServiceImpl* rsimpl = new RedisServiceImpl(password);
- GetCommandHandler* getch = new GetCommandHandler(rsimpl, true);
- SetCommandHandler* setch = new SetCommandHandler(rsimpl, true);
- AuthCommandHandler* authch = new AuthCommandHandler(rsimpl);
- rsimpl->AddCommandHandler("auth", authch);
- rsimpl->AddCommandHandler("get", getch);
- rsimpl->AddCommandHandler("set", setch);
- rsimpl->AddCommandHandler("multi", new MultiCommandHandler);
+ rsimpl->AddCommandHandler("auth", authch.get());
+ rsimpl->AddCommandHandler("get", getch.get());
+ rsimpl->AddCommandHandler("set", setch.get());
+ rsimpl->AddCommandHandler("multi", multich.get());
server_options.redis_service = rsimpl;
brpc::PortRange pr(8081, 8900);
ASSERT_EQ(0, server.Start("127.0.0.1", pr, &server_options));
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_REDIS;
- options.auth = new brpc::policy::RedisAuthenticator(password);
+ options.auth = redis_auth_holder.get();
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port,
&options));
diff --git a/test/brpc_socket_unittest.cpp b/test/brpc_socket_unittest.cpp
index 98512874..a283ffcf 100644
--- a/test/brpc_socket_unittest.cpp
+++ b/test/brpc_socket_unittest.cpp
@@ -28,6 +28,7 @@
#include "butil/time.h"
#include "butil/macros.h"
#include "butil/fd_utility.h"
+#include "butil/debug/leak_annotations.h"
#include <butil/fd_guard.h>
#include "bthread/unstable.h"
#include "bthread/task_control.h"
@@ -209,7 +210,9 @@ public:
explicit MyErrorMessage(const butil::Status& st) : _status(st) {}
private:
butil::Status AppendAndDestroySelf(butil::IOBuf*, brpc::Socket*) {
- return _status;
+ butil::Status st = _status;
+ delete this;
+ return st;
};
butil::Status _status;
};
@@ -328,7 +331,9 @@ private:
TEST_F(SocketTest, single_threaded_connect_and_write) {
// FIXME(gejun): Messenger has to be new otherwise quitting may crash.
+ // It is intentionally never deleted; mark it so it is not a reported leak.
brpc::Acceptor* messenger = new brpc::Acceptor;
+ ANNOTATE_LEAKING_OBJECT_PTR(messenger);
const brpc::InputMessageHandler pairs[] = {
{ brpc::policy::ParseHuluMessage,
EchoProcessHuluRequest, NULL, NULL, "dummy_hulu" }
@@ -659,7 +664,9 @@ TEST_F(SocketTest, app_level_health_check) {
TEST_F(SocketTest, health_check) {
// FIXME(gejun): Messenger has to be new otherwise quitting may crash.
+ // It is intentionally never deleted; mark it so it is not a reported leak.
brpc::Acceptor* messenger = new brpc::Acceptor;
+ ANNOTATE_LEAKING_OBJECT_PTR(messenger);
brpc::SocketId id = 8888;
butil::EndPoint point(butil::IP_ANY, 7878);
@@ -976,13 +983,14 @@ void* reader(void* void_arg) {
ssize_t nr = read(arg->fd, buf, LEN);
if (nr < 0) {
printf("Fail to read, %m\n");
- return NULL;
+ break;
} else if (nr == 0) {
printf("Far end closed\n");
- return NULL;
+ break;
}
arg->nread += nr;
}
+ free(buf);
return NULL;
}
@@ -1233,7 +1241,9 @@ TEST_F(SocketTest, keepalive) {
}
TEST_F(SocketTest, keepalive_input_message) {
+ // It is intentionally never deleted; mark it so it is not a reported leak.
brpc::Acceptor* messenger = new brpc::Acceptor;
+ ANNOTATE_LEAKING_OBJECT_PTR(messenger);
int listening_fd = -1;
butil::EndPoint point(butil::IP_ANY, 7878);
for (int i = 0; i < 100; ++i) {
@@ -1424,7 +1434,9 @@ void CheckTCPUserTimeout(int fd, int
expect_tcp_user_timeout) {
}
TEST_F(SocketTest, tcp_user_timeout) {
+ // It is intentionally never deleted; mark it so it is not a reported leak.
brpc::Acceptor* messenger = new brpc::Acceptor;
+ ANNOTATE_LEAKING_OBJECT_PTR(messenger);
int listening_fd = -1;
butil::EndPoint point(butil::IP_ANY, 7878);
for (int i = 0; i < 100; ++i) {
diff --git a/test/brpc_ssl_unittest.cpp b/test/brpc_ssl_unittest.cpp
index 00fe705e..6512eea4 100644
--- a/test/brpc_ssl_unittest.cpp
+++ b/test/brpc_ssl_unittest.cpp
@@ -339,6 +339,7 @@ void CheckCert(const char* cname, const char* cert) {
std::vector<std::string> cnames;
brpc::ExtractHostnames(x509, &cnames);
ASSERT_EQ(cert, cnames[0]) << x509;
+ X509_free(x509);
}
std::string GetRawPemString(const char* fname) {
@@ -495,6 +496,11 @@ TEST_F(SSLTest, ssl_perf) {
ASSERT_EQ(0, pthread_create(&spid, NULL, ssl_perf_server , serv_ssl));
ASSERT_EQ(0, pthread_join(cpid, NULL));
ASSERT_EQ(0, pthread_join(spid, NULL));
+
+ SSL_free(cli_ssl);
+ SSL_free(serv_ssl);
+ SSL_CTX_free(cli_ctx);
+ SSL_CTX_free(serv_ctx);
close(clifd);
close(servfd);
}
diff --git a/test/bthread_dispatcher_unittest.cpp
b/test/bthread_dispatcher_unittest.cpp
index 669d9c4e..411c392e 100644
--- a/test/bthread_dispatcher_unittest.cpp
+++ b/test/bthread_dispatcher_unittest.cpp
@@ -118,9 +118,18 @@ void* epoll_thread(void* arg) {
while (!server_stop) {
#if defined(OS_LINUX)
- const int n = epoll_wait(em->epfd, e, ARRAY_SIZE(e), -1);
+ // Use a finite timeout so the loop can observe server_stop without
+ // relying on an external fd to wake up epoll_wait.
+ const int n = epoll_wait(em->epfd, e, ARRAY_SIZE(e), 100);
+ if (n == 0) {
+ continue;
+ }
#elif defined(OS_MACOSX)
- const int n = kevent(em->epfd, NULL, 0, e, ARRAY_SIZE(e), NULL);
+ timespec ts = { 0, 100L * 1000L * 1000L };
+ const int n = kevent(em->epfd, NULL, 0, e, ARRAY_SIZE(e), &ts);
+ if (n == 0) {
+ continue;
+ }
#endif
if (server_stop) {
break;
@@ -298,15 +307,9 @@ TEST(DispatcherTest, dispatch_tasks) {
pthread_join(cth[i], NULL);
}
server_stop = true;
+ // epoll_thread polls server_stop with a finite timeout, so it exits on its
+ // own without needing an external fd to wake up epoll_wait.
for (size_t i = 0; i < NEPOLL; ++i) {
-#if defined(OS_LINUX)
- epoll_event evt = { EPOLLOUT, { NULL } };
- ASSERT_EQ(0, epoll_ctl(epfd[i], EPOLL_CTL_ADD, 0, &evt));
-#elif defined(OS_MACOSX)
- struct kevent kqueue_event;
- EV_SET(&kqueue_event, 0, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, NULL);
- ASSERT_EQ(0, kevent(epfd[i], &kqueue_event, 1, NULL, 0, NULL));
-#endif
#ifdef RUN_EPOLL_IN_BTHREAD
bthread_join(eth[i], NULL);
#else
@@ -315,5 +318,18 @@ TEST(DispatcherTest, dispatch_tasks) {
}
bthread::stop_and_join_epoll_threads();
bthread_usleep(100000);
+
+ for (size_t i = 0; i < NCLIENT; ++i) {
+ free(sm[i]->buf);
+ delete sm[i];
+ delete cm[i];
+ }
+ for (size_t i = 0; i < NEPOLL; ++i) {
+ delete em[i];
+ close(epfd[i]);
+ }
+ for (size_t i = 0; i < 2 * NCLIENT; ++i) {
+ close(fds[i]);
+ }
}
} // namespace
diff --git a/test/bthread_fd_unittest.cpp b/test/bthread_fd_unittest.cpp
index fac6a4f2..1f68ffc4 100644
--- a/test/bthread_fd_unittest.cpp
+++ b/test/bthread_fd_unittest.cpp
@@ -23,6 +23,7 @@
#include <fcntl.h>
#include <gtest/gtest.h>
#include <pthread.h>
+#include <memory>
#include "gperftools_helper.h"
#include "butil/time.h"
#include "butil/macros.h"
@@ -252,7 +253,9 @@ TEST(FDTest, ping_pong) {
#else
pthread_t cth[NCLIENT];
#endif
- ClientMeta* cm[NCLIENT];
+ std::unique_ptr<ClientMeta> cm[NCLIENT];
+ std::unique_ptr<SocketMeta> sm[NCLIENT];
+ std::unique_ptr<EpollMeta> em_arr[NEPOLL];
for (size_t i = 0; i < NEPOLL; ++i) {
#if defined(OS_LINUX)
@@ -266,7 +269,8 @@ TEST(FDTest, ping_pong) {
for (size_t i = 0; i < NCLIENT; ++i) {
ASSERT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, fds + 2 * i));
//printf("Created fd=%d,%d i=%lu\n", fds[2*i], fds[2*i+1], i);
- SocketMeta* m = new SocketMeta;
+ sm[i].reset(new SocketMeta);
+ SocketMeta* m = sm[i].get();
m->fd = fds[i * 2];
m->epfd = epfd[fmix32(i) % NEPOLL];
ASSERT_EQ(0, fcntl(m->fd, F_SETFL, fcntl(m->fd, F_GETFL, 0) |
O_NONBLOCK));
@@ -293,15 +297,15 @@ TEST(FDTest, ping_pong) {
#elif defined(OS_MACOSX)
ASSERT_EQ(0, kevent(m->epfd, &kqueue_event, 1, NULL, 0, NULL));
#endif
- cm[i] = new ClientMeta;
+ cm[i].reset(new ClientMeta);
cm[i]->fd = fds[i * 2 + 1];
cm[i]->count = i;
cm[i]->times = REP;
#ifdef RUN_CLIENT_IN_BTHREAD
butil::make_non_blocking(cm[i]->fd);
- ASSERT_EQ(0, bthread_start_urgent(&cth[i], NULL, client_thread,
cm[i]));
+ ASSERT_EQ(0, bthread_start_urgent(&cth[i], NULL, client_thread,
cm[i].get()));
#else
- ASSERT_EQ(0, pthread_create(&cth[i], NULL, client_thread, cm[i]));
+ ASSERT_EQ(0, pthread_create(&cth[i], NULL, client_thread,
cm[i].get()));
#endif
}
@@ -310,7 +314,8 @@ TEST(FDTest, ping_pong) {
tm.start();
for (size_t i = 0; i < NEPOLL; ++i) {
- EpollMeta *em = new EpollMeta;
+ em_arr[i].reset(new EpollMeta);
+ EpollMeta* em = em_arr[i].get();
em->epfd = epfd[i];
#ifdef RUN_EPOLL_IN_BTHREAD
ASSERT_EQ(0, bthread_start_urgent(ð[i], epoll_thread, em, NULL);
diff --git a/test/bthread_key_unittest.cpp b/test/bthread_key_unittest.cpp
index 4319fb41..92f4aaca 100644
--- a/test/bthread_key_unittest.cpp
+++ b/test/bthread_key_unittest.cpp
@@ -16,6 +16,7 @@
// under the License.
#include <algorithm> // std::sort
+#include <memory> // std::unique_ptr
#include <gflags/gflags.h>
#include "butil/atomicops.h"
#include <gtest/gtest.h>
@@ -447,7 +448,8 @@ static void usleep_thread_impl(PoolData2* data) {
}
static void* usleep_thread(void* args) {
- usleep_thread_impl((PoolData2*)args);
+ std::unique_ptr<PoolData2> data((PoolData2*)args);
+ usleep_thread_impl(data.get());
return NULL;
}
diff --git a/test/iobuf_unittest.cpp b/test/iobuf_unittest.cpp
index 82112045..18d312eb 100644
--- a/test/iobuf_unittest.cpp
+++ b/test/iobuf_unittest.cpp
@@ -1788,6 +1788,9 @@ TEST_F(IOBufTest, acquire_tls_block) {
b = butil::iobuf::acquire_tls_block();
ASSERT_EQ(0, butil::iobuf::get_tls_block_count());
ASSERT_NE(butil::iobuf::block_cap(b), butil::iobuf::block_size(b));
+ // acquire_tls_block() transfers ownership of a non-full block to the
+ // caller; return it to TLS so it is not leaked.
+ butil::iobuf::release_tls_block_chain(b);
}
TEST_F(IOBufTest, reserve_aligned) {
diff --git a/test/object_pool_unittest.cpp b/test/object_pool_unittest.cpp
index cfc891af..d7fecf5e 100644
--- a/test/object_pool_unittest.cpp
+++ b/test/object_pool_unittest.cpp
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+#include <memory>
#include <gtest/gtest.h>
#include "butil/time.h"
#include "butil/macros.h"
@@ -184,9 +185,13 @@ TEST_F(ObjectPoolTest, get_int) {
tm.stop();
printf("get a int takes %.1fns\n", tm.n_elapsed()/(double)N);
+ std::vector<std::unique_ptr<int>> new_ints;
+ new_ints.reserve(N);
tm.start();
for (size_t i = 0; i < N; ++i) {
- *(new int) = i;
+ int* pi = new int;
+ *pi = i;
+ new_ints.emplace_back(pi);
}
tm.stop();
printf("new a int takes %" PRId64 "ns\n", tm.n_elapsed()/N);
diff --git a/test/run_tests.sh b/test/run_tests.sh
index 510a2c70..ced02976 100755
--- a/test/run_tests.sh
+++ b/test/run_tests.sh
@@ -23,10 +23,11 @@ test_num=0
failed_test=""
rc=0
test_bins="test_butil test_bvar bthread*unittest brpc*unittest"
+export ASAN_OPTIONS="detect_leaks=1:detect_stack_use_after_return=1"
for test_bin in $test_bins; do
test_num=$((test_num + 1))
>&2 echo "[runtest] $test_bin"
- ASAN_OPTIONS="detect_leaks=0:detect_stack_use_after_return=1" ./$test_bin
+ ./$test_bin
# If ASan abort without detailed call stack of new/delete,
# try to disable fast_unwind_on_malloc, which would be a performance
killer.
# ASAN_OPTIONS="fast_unwind_on_malloc=0:detect_leaks=0" ./$test_bin
diff --git a/test/thread_key_unittest.cpp b/test/thread_key_unittest.cpp
index 758a6791..06cbabaa 100644
--- a/test/thread_key_unittest.cpp
+++ b/test/thread_key_unittest.cpp
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+#include <memory>
#include <gtest/gtest.h>
#include <gflags/gflags.h>
@@ -230,9 +231,12 @@ void* ThreadKeyFunc(void* arg) {
auto thread_key_arg = (ThreadKeyArg*)arg;
auto thread_keys = thread_key_arg->thread_keys;
std::vector<int> expects(thread_keys.size(), 0);
+ std::vector<std::unique_ptr<int>> owned_data;
+ owned_data.reserve(thread_keys.size());
for (auto key : thread_keys) {
EXPECT_TRUE(butil::thread_getspecific(*key) == NULL);
- EXPECT_EQ(0, butil::thread_setspecific(*key, new int(0)));
+ owned_data.emplace_back(new int(0));
+ EXPECT_EQ(0, butil::thread_setspecific(*key, owned_data.back().get()));
EXPECT_EQ(*(static_cast<int*>(butil::thread_getspecific(*key))), 0);
}
while (!g_stopped) {
@@ -262,14 +266,17 @@ TEST(ThreadLocalTest, thread_key_multi_thread) {
g_stopped = false;
g_deleted = false;
std::vector<ThreadKey*> thread_keys;
+ std::vector<std::unique_ptr<int>> owned_data;
int key_num = 20480;
+ owned_data.reserve(key_num);
for (int i = 0; i < key_num; ++i) {
thread_keys.push_back(new ThreadKey());
ASSERT_EQ(0, butil::thread_key_create(*thread_keys.back(), [](void*
data) {
delete static_cast<int*>(data);
}));
ASSERT_TRUE(butil::thread_getspecific(*thread_keys.back()) == NULL);
- ASSERT_EQ(0, butil::thread_setspecific(*thread_keys.back(), new
int(0)));
+ owned_data.emplace_back(new int(0));
+ ASSERT_EQ(0, butil::thread_setspecific(*thread_keys.back(),
owned_data.back().get()));
ASSERT_EQ(*(static_cast<int*>(butil::thread_getspecific(*thread_keys.back()))),
0);
}
const int thread_num = 8;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]