This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new d94c88f6 Optimize keytablelist implementation (#2768)
d94c88f6 is described below
commit d94c88f6b29bebdc274ea6f20e19dbc74eb7e868
Author: Jingyuan <[email protected]>
AuthorDate: Mon Oct 7 14:57:15 2024 +0800
Optimize keytablelist implementation (#2768)
* Optimize keytablelist implementation
* fix
* add some ut and small fix
* add size in bthread_keytable_pool_t
---
src/bthread/key.cpp | 219 ++++++++++++++++++++++++++++++++++--------
src/bthread/types.h | 1 +
src/bthread/unstable.h | 4 +
test/bthread_key_unittest.cpp | 160 +++++++++++++++++++++++++++++-
4 files changed, 344 insertions(+), 40 deletions(-)
diff --git a/src/bthread/key.cpp b/src/bthread/key.cpp
index 433e1e14..5720a675 100644
--- a/src/bthread/key.cpp
+++ b/src/bthread/key.cpp
@@ -20,21 +20,35 @@
// Date: Sun Aug 3 12:46:15 CST 2014
#include <pthread.h>
-#include "butil/macros.h"
+#include <gflags/gflags.h>
+
+#include "bthread/errno.h" // EAGAIN
+#include "bthread/task_group.h" // TaskGroup
#include "butil/atomicops.h"
+#include "butil/macros.h"
#include "butil/thread_key.h"
+#include "butil/thread_local.h"
#include "bvar/passive_status.h"
-#include "bthread/errno.h" // EAGAIN
-#include "bthread/task_group.h" // TaskGroup
// Implement bthread_key_t related functions
namespace bthread {
+DEFINE_uint32(key_table_list_size, 5000,
+ "The maximum length of the KeyTableList. Once this value is "
+ "exceeded, a portion of the KeyTables will be moved to the "
+ "global free_keytables list.");
+
+DEFINE_uint32(borrow_from_globle_size, 100,
+ "The maximum number of KeyTables retrieved in a single operation
"
+ "from the global free_keytables when no KeyTable exists in the "
+ "current thread's keytable_list.");
+
+EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(TaskGroup*, tls_task_group);
+
class KeyTable;
// defined in task_group.cpp
-extern __thread TaskGroup* tls_task_group;
extern __thread LocalStorage tls_bls;
static __thread bool tls_ever_created_keytable = false;
@@ -52,7 +66,7 @@ static const uint32_t KEY_2NDLEVEL_SIZE = 32;
static const uint32_t KEY_1STLEVEL_SIZE = 31;
// Max tls in one thread, currently the value is 992 which should be enough
-// for most projects throughout baidu.
+// for most projects throughout baidu.
static const uint32_t KEYS_MAX = KEY_2NDLEVEL_SIZE * KEY_1STLEVEL_SIZE;
// destructors/version of TLS.
@@ -94,7 +108,7 @@ public:
// Set the position to NULL before calling dtor which may set
// the position again.
_data[i].ptr = NULL;
-
+
KeyInfo info = bthread::s_key_info[offset + i];
if (info.dtor && _data[i].version == info.version) {
info.dtor(p, info.dtor_args);
@@ -205,17 +219,20 @@ private:
SubKeyTable* _subs[KEY_1STLEVEL_SIZE];
};
-struct KeyTableList {
- KeyTableList() {
- keytable = NULL;
+class BAIDU_CACHELINE_ALIGNMENT KeyTableList {
+public:
+ KeyTableList() :
+ _head(NULL), _tail(NULL), _length(0) {
}
+
~KeyTableList() {
- bthread::TaskGroup* g = bthread::tls_task_group;
- bthread::KeyTable* old_kt = bthread::tls_bls.keytable;
+ TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
+ KeyTable* old_kt = tls_bls.keytable;
+ KeyTable* keytable = _head;
while (keytable) {
- bthread::KeyTable* kt = keytable;
+ KeyTable* kt = keytable;
keytable = kt->next;
- bthread::tls_bls.keytable = kt;
+ tls_bls.keytable = kt;
if (g) {
g->current_task()->local_storage.keytable = kt;
}
@@ -223,35 +240,127 @@ struct KeyTableList {
if (old_kt == kt) {
old_kt = NULL;
}
- g = bthread::tls_task_group;
+ g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
}
- bthread::tls_bls.keytable = old_kt;
- if(g) {
+ tls_bls.keytable = old_kt;
+ if (g) {
g->current_task()->local_storage.keytable = old_kt;
}
}
- KeyTable* keytable;
+
+ void append(KeyTable* keytable) {
+ if (keytable == NULL) {
+ return;
+ }
+ if (_head == NULL) {
+ _head = _tail = keytable;
+ } else {
+ _tail->next = keytable;
+ _tail = keytable;
+ }
+ keytable->next = NULL;
+ _length++;
+ }
+
+ KeyTable* remove_front() {
+ if (_head == NULL) {
+ return NULL;
+ }
+ KeyTable* temp = _head;
+ _head = _head->next;
+ _length--;
+ if (_head == NULL) {
+ _tail = NULL;
+ }
+ return temp;
+ }
+
+ int move_first_n_to_target(KeyTable** target, uint32_t size) {
+ if (size > _length || _head == NULL) {
+ return 0;
+ }
+
+ KeyTable* current = _head;
+ KeyTable* prev = NULL;
+ uint32_t count = 0;
+ while (current != NULL && count < size) {
+ prev = current;
+ current = current->next;
+ count++;
+ }
+ if (prev != NULL) {
+ prev->next = NULL;
+ if (*target == NULL) {
+ *target = _head;
+ } else {
+ (*target)->next = _head;
+ }
+ _head = current;
+ _length -= count;
+ if (_head == NULL) {
+ _tail = NULL;
+ }
+ }
+ return count;
+ }
+
+ inline uint32_t get_length() {
+ return _length;
+ }
+
+ // Only for test
+ inline bool check_length() {
+ KeyTable* current = _head;
+ uint32_t count = 0;
+ while (current != NULL) {
+ current = current->next;
+ count++;
+ }
+ return count == _length;
+ }
+
+private:
+ KeyTable* _head;
+ KeyTable* _tail;
+ uint32_t _length;
};
-static KeyTable* borrow_keytable(bthread_keytable_pool_t* pool) {
+KeyTable* borrow_keytable(bthread_keytable_pool_t* pool) {
if (pool != NULL && (pool->list || pool->free_keytables)) {
KeyTable* p;
pthread_rwlock_rdlock(&pool->rwlock);
auto list = (butil::ThreadLocal<bthread::KeyTableList>*)pool->list;
- if (list && list->get()->keytable) {
- p = list->get()->keytable;
- list->get()->keytable = p->next;
- pthread_rwlock_unlock(&pool->rwlock);
- return p;
+ if (list) {
+ p = list->get()->remove_front();
+ if (p) {
+ pthread_rwlock_unlock(&pool->rwlock);
+ return p;
+ }
}
pthread_rwlock_unlock(&pool->rwlock);
if (pool->free_keytables) {
pthread_rwlock_wrlock(&pool->rwlock);
p = (KeyTable*)pool->free_keytables;
- if (p) {
- pool->free_keytables = p->next;
+ if (list) {
+ for (uint32_t i = 0; i < FLAGS_borrow_from_globle_size; ++i) {
+ if (p) {
+ pool->free_keytables = p->next;
+ list->get()->append(p);
+ p = (KeyTable*)pool->free_keytables;
+ --pool->size;
+ } else {
+ break;
+ }
+ }
+ KeyTable* result = list->get()->remove_front();
pthread_rwlock_unlock(&pool->rwlock);
- return p;
+ return result;
+ } else {
+ if (p) {
+ pool->free_keytables = p->next;
+ pthread_rwlock_unlock(&pool->rwlock);
+ return p;
+ }
}
pthread_rwlock_unlock(&pool->rwlock);
}
@@ -276,8 +385,17 @@ void return_keytable(bthread_keytable_pool_t* pool,
KeyTable* kt) {
return;
}
auto list = (butil::ThreadLocal<bthread::KeyTableList>*)pool->list;
- kt->next = list->get()->keytable;
- list->get()->keytable = kt;
+ list->get()->append(kt);
+ if (list->get()->get_length() > FLAGS_key_table_list_size) {
+ pthread_rwlock_unlock(&pool->rwlock);
+ pthread_rwlock_wrlock(&pool->rwlock);
+ if (!pool->destroyed) {
+ int out = list->get()->move_first_n_to_target(
+ (KeyTable**)(&pool->free_keytables),
+ FLAGS_key_table_list_size / 2);
+ pool->size += out;
+ }
+ }
pthread_rwlock_unlock(&pool->rwlock);
}
@@ -327,6 +445,7 @@ int bthread_keytable_pool_init(bthread_keytable_pool_t*
pool) {
pthread_rwlock_init(&pool->rwlock, NULL);
pool->list = new butil::ThreadLocal<bthread::KeyTableList>();
pool->free_keytables = NULL;
+ pool->size = 0;
pool->destroyed = 0;
return 0;
}
@@ -339,6 +458,7 @@ int bthread_keytable_pool_destroy(bthread_keytable_pool_t*
pool) {
bthread::KeyTable* saved_free_keytables = NULL;
pthread_rwlock_wrlock(&pool->rwlock);
pool->destroyed = 1;
+ pool->size = 0;
delete (butil::ThreadLocal<bthread::KeyTableList>*)pool->list;
saved_free_keytables = (bthread::KeyTable*)pool->free_keytables;
pool->list = NULL;
@@ -346,7 +466,8 @@ int bthread_keytable_pool_destroy(bthread_keytable_pool_t*
pool) {
pthread_rwlock_unlock(&pool->rwlock);
// Cheat get/setspecific and destroy the keytables.
- bthread::TaskGroup* g = bthread::tls_task_group;
+ bthread::TaskGroup* g =
+ bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
bthread::KeyTable* old_kt = bthread::tls_bls.keytable;
while (saved_free_keytables) {
bthread::KeyTable* kt = saved_free_keytables;
@@ -356,7 +477,7 @@ int bthread_keytable_pool_destroy(bthread_keytable_pool_t*
pool) {
g->current_task()->local_storage.keytable = kt;
}
delete kt;
- g = bthread::tls_task_group;
+ g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
}
bthread::tls_bls.keytable = old_kt;
if (g) {
@@ -374,15 +495,34 @@ int
bthread_keytable_pool_getstat(bthread_keytable_pool_t* pool,
LOG(ERROR) << "Param[pool] or Param[stat] is NULL";
return EINVAL;
}
- pthread_rwlock_rdlock(&pool->rwlock);
- size_t count = 0;
- bthread::KeyTable* p = (bthread::KeyTable*)pool->free_keytables;
- for (; p; p = p->next, ++count) {}
- stat->nfree = count;
+ pthread_rwlock_wrlock(&pool->rwlock);
+ stat->nfree = pool->size;
pthread_rwlock_unlock(&pool->rwlock);
return 0;
}
+int get_thread_local_keytable_list_length(bthread_keytable_pool_t* pool) {
+ if (pool == NULL) {
+ LOG(ERROR) << "Param[pool] is NULL";
+ return EINVAL;
+ }
+ int length = 0;
+ pthread_rwlock_rdlock(&pool->rwlock);
+ if (pool->destroyed) {
+ pthread_rwlock_unlock(&pool->rwlock);
+ return length;
+ }
+ auto list = (butil::ThreadLocal<bthread::KeyTableList>*)pool->list;
+ if (list) {
+ length = (int)(list->get()->get_length());
+ if (!list->get()->check_length()) {
+ LOG(ERROR) << "Length is not equal";
+ }
+ }
+ pthread_rwlock_unlock(&pool->rwlock);
+ return length;
+}
+
// TODO: this is not strict `reserve' because we only check #free.
// Currently there's no way to track KeyTables that may be returned
// to the pool in future.
@@ -418,6 +558,7 @@ void bthread_keytable_pool_reserve(bthread_keytable_pool_t*
pool,
}
kt->next = (bthread::KeyTable*)pool->free_keytables;
pool->free_keytables = kt;
+ ++pool->size;
pthread_rwlock_unlock(&pool->rwlock);
if (data == NULL) {
break;
@@ -467,10 +608,10 @@ int bthread_key_delete(bthread_key_t key) {
++bthread::s_key_info[key.index].version;
}
bthread::s_key_info[key.index].dtor = NULL;
- bthread::s_key_info[key.index].dtor_args = NULL;
+ bthread::s_key_info[key.index].dtor_args = NULL;
bthread::s_free_keys[bthread::nfreekey++] = key.index;
return 0;
- }
+ }
}
CHECK(false) << "bthread_key_delete is called on invalid " << key;
return EINVAL;
@@ -489,7 +630,7 @@ int bthread_setspecific(bthread_key_t key, void* data) {
return ENOMEM;
}
bthread::tls_bls.keytable = kt;
- bthread::TaskGroup* const g = bthread::tls_task_group;
+ bthread::TaskGroup* const g =
bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
if (g) {
g->current_task()->local_storage.keytable = kt;
} else {
@@ -510,7 +651,7 @@ void* bthread_getspecific(bthread_key_t key) {
if (kt) {
return kt->get_data(key);
}
- bthread::TaskGroup* const g = bthread::tls_task_group;
+ bthread::TaskGroup* const g =
bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
if (g) {
bthread::TaskMeta* const task = g->current_task();
kt = bthread::borrow_keytable(task->attr.keytable_pool);
diff --git a/src/bthread/types.h b/src/bthread/types.h
index ff99bb1c..0124fcbd 100644
--- a/src/bthread/types.h
+++ b/src/bthread/types.h
@@ -88,6 +88,7 @@ typedef struct {
pthread_rwlock_t rwlock;
void* list;
void* free_keytables;
+ size_t size;
int destroyed;
} bthread_keytable_pool_t;
diff --git a/src/bthread/unstable.h b/src/bthread/unstable.h
index f5bdeecb..dde5e6f6 100644
--- a/src/bthread/unstable.h
+++ b/src/bthread/unstable.h
@@ -128,6 +128,10 @@ extern int
bthread_keytable_pool_destroy(bthread_keytable_pool_t*);
extern int bthread_keytable_pool_getstat(bthread_keytable_pool_t* pool,
bthread_keytable_pool_stat_t* stat);
+// [RPC INTERNAL]
+// Return thread local keytable list length if exist.
+extern int get_thread_local_keytable_list_length(bthread_keytable_pool_t*
pool);
+
// [RPC INTERNAL]
// Reserve at most `nfree' keytables with `key' pointing to data created by
// ctor(args).
diff --git a/test/bthread_key_unittest.cpp b/test/bthread_key_unittest.cpp
index c01ae7fe..a7f11e76 100644
--- a/test/bthread_key_unittest.cpp
+++ b/test/bthread_key_unittest.cpp
@@ -16,14 +16,33 @@
// under the License.
#include <algorithm> // std::sort
+#include <gflags/gflags.h>
#include "butil/atomicops.h"
#include <gtest/gtest.h>
+#include "butil/thread_key.h"
#include "butil/time.h"
#include "butil/macros.h"
#include "butil/scoped_lock.h"
#include "butil/logging.h"
#include "bthread/bthread.h"
#include "bthread/unstable.h"
+using namespace bthread;
+namespace bthread {
+DECLARE_uint32(key_table_list_size);
+DECLARE_uint32(borrow_from_globle_size);
+class KeyTable;
+// defined in bthread/key.cpp
+extern void return_keytable(bthread_keytable_pool_t*, KeyTable*);
+extern KeyTable* borrow_keytable(bthread_keytable_pool_t*);
+} // namespace bthread
+
+int main(int argc, char* argv[]) {
+ bthread::FLAGS_key_table_list_size = 20;
+ bthread::FLAGS_borrow_from_globle_size = 20;
+ testing::InitGoogleTest(&argc, argv);
+ GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
+ return RUN_ALL_TESTS();
+}
extern "C" {
int bthread_keytable_pool_size(bthread_keytable_pool_t* pool) {
@@ -407,6 +426,144 @@ TEST(KeyTest, using_pool) {
ASSERT_EQ(0, bthread_key_delete(key));
}
+bthread_keytable_pool_t test_pool;
+struct PoolData2 {
+ bthread_key_t key;
+ bthread_attr_t attr;
+};
+
+static void pool_dtor2(void* tls) {
+ delete static_cast<PoolData2*>(tls);
+}
+
+static void usleep_thread_impl(PoolData2* data) {
+ if (NULL == bthread_getspecific(data->key)) {
+ PoolData2* data_new = new PoolData2();
+ ASSERT_EQ(0, bthread_setspecific(data->key, data_new));
+ }
+ bthread_usleep(1000L);
+ int length = get_thread_local_keytable_list_length(&test_pool);
+ ASSERT_LE((size_t)length, bthread::FLAGS_key_table_list_size);
+}
+
+static void* usleep_thread(void* args) {
+ usleep_thread_impl((PoolData2*)args);
+ return NULL;
+}
+
+static void launch_many_bthreads(PoolData2* data) {
+ std::vector<bthread_t> tids;
+ tids.reserve(25000);
+ for (size_t i = 0; i < 25000; ++i) {
+ bthread_t t0;
+ PoolData2* data_tmp = new PoolData2();
+ data_tmp->key = data->key;
+ ASSERT_EQ(0, bthread_start_background(&t0, &(data->attr),
usleep_thread, data_tmp));
+ tids.push_back(t0);
+ }
+
+ usleep(3 * 1000 * 1000L);
+ for (size_t i = 0; i < tids.size(); ++i) {
+ bthread_join(tids[i], NULL);
+ }
+}
+
+static void* run_launch_many_bthreads(void* args) {
+ PoolData2* data = (PoolData2*)args;
+ launch_many_bthreads(data);
+ return NULL;
+}
+
+TEST(KeyTest, frequently_borrow_keytable_when_using_pool) {
+ PoolData2 data;
+ ASSERT_EQ(0, bthread_key_create(&data.key, pool_dtor2));
+
+ ASSERT_EQ(0, bthread_keytable_pool_init(&test_pool));
+ ASSERT_EQ(0, bthread_keytable_pool_size(&test_pool));
+
+ ASSERT_EQ(0, bthread_attr_init(&data.attr));
+ data.attr.keytable_pool = &test_pool;
+
+ bthread_t bth;
+ ASSERT_EQ(0, bthread_start_urgent(&bth, &data.attr,
run_launch_many_bthreads, &data));
+ ASSERT_EQ(0, bthread_join(bth, NULL));
+ std::cout << "Free keytable size is "
+ << bthread_keytable_pool_size(&test_pool)
+ << " use keytable size is 25000" << std::endl;
+ ASSERT_EQ(0, bthread_keytable_pool_destroy(&test_pool));
+ ASSERT_EQ(0, bthread_key_delete(data.key));
+}
+
+std::vector<bthread::KeyTable*> table_list;
+pthread_mutex_t table_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static void return_thread_impl() {
+ for (int i = 0; i < 32768; i++) {
+ {
+ BAIDU_SCOPED_LOCK(table_mutex);
+ if (table_list.size() > 0) {
+ bthread::KeyTable* keytable = table_list[0];
+ table_list.erase(table_list.begin());
+ bthread::return_keytable(&test_pool, keytable);
+ }
+ }
+ int length = get_thread_local_keytable_list_length(&test_pool);
+ ASSERT_LE((size_t)length, bthread::FLAGS_key_table_list_size);
+ }
+}
+
+static void* return_thread(void*) {
+ return_thread_impl();
+ return NULL;
+}
+
+static void borrow_thread_impl() {
+ for (int i = 0; i < 32768; i++) {
+ bthread::KeyTable* keytable = bthread::borrow_keytable(&test_pool);
+ BAIDU_SCOPED_LOCK(table_mutex);
+ table_list.push_back(keytable);
+ }
+}
+
+static void* borrow_thread(void*) {
+ borrow_thread_impl();
+ return NULL;
+}
+
+TEST(KeyTest, borrow_and_return_keytable_when_using_pool) {
+ ASSERT_EQ(0, bthread_keytable_pool_init(&test_pool));
+ ASSERT_EQ(0, bthread_keytable_pool_size(&test_pool));
+
+ bthread_attr_t attr;
+ ASSERT_EQ(0, bthread_attr_init(&attr));
+ attr.keytable_pool = &test_pool;
+
+ bthread_t borrow_bth[8];
+ bthread_t return_bth[8];
+ for (size_t i = 0; i < arraysize(borrow_bth); ++i) {
+ ASSERT_EQ(0, bthread_start_background(&borrow_bth[i], &attr,
+ borrow_thread, NULL));
+ }
+ for (size_t i = 0; i < arraysize(return_bth); ++i) {
+ ASSERT_EQ(0, bthread_start_background(&return_bth[i], &attr,
+ return_thread, NULL));
+ }
+
+ for (size_t i = 0; i < arraysize(borrow_bth); ++i) {
+ ASSERT_EQ(0, bthread_join(borrow_bth[i], NULL));
+ }
+ for (size_t i = 0; i < arraysize(return_bth); ++i) {
+ ASSERT_EQ(0, bthread_join(return_bth[i], NULL));
+ }
+
+ for (size_t i = 0; i < table_list.size(); i++) {
+ bthread::return_keytable(&test_pool, table_list[i]);
+ }
+ table_list.clear();
+
+ ASSERT_EQ(0, bthread_keytable_pool_destroy(&test_pool));
+}
+
// NOTE: lid is short for 'lock in dtor'.
butil::atomic<size_t> lid_seq(1);
std::vector<size_t> lid_seqs;
@@ -454,9 +611,10 @@ TEST(KeyTest, use_bthread_mutex_in_dtor) {
std::sort(lid_seqs.begin(), lid_seqs.end());
ASSERT_EQ(lid_seqs.end(), std::unique(lid_seqs.begin(), lid_seqs.end()));
ASSERT_EQ(arraysize(th) + arraysize(bth) - 1,
- *(lid_seqs.end() - 1) - *lid_seqs.begin());
+ *(lid_seqs.end() - 1) - *lid_seqs.begin());
ASSERT_EQ(0, bthread_key_delete(key));
+ ASSERT_EQ(0, bthread_mutex_destroy(&mu));
}
} // namespace
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]