This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new d94c88f6 Optimize keytablelist implementation (#2768)
d94c88f6 is described below

commit d94c88f6b29bebdc274ea6f20e19dbc74eb7e868
Author: Jingyuan <52315061+mjy-h...@users.noreply.github.com>
AuthorDate: Mon Oct 7 14:57:15 2024 +0800

    Optimize keytablelist implementation (#2768)
    
    * Optimize keytablelist implementation
    
    * fix
    
    * add some ut and small fix
    
    * add size in bthread_keytable_pool_t
---
 src/bthread/key.cpp           | 219 ++++++++++++++++++++++++++++++++++--------
 src/bthread/types.h           |   1 +
 src/bthread/unstable.h        |   4 +
 test/bthread_key_unittest.cpp | 160 +++++++++++++++++++++++++++++-
 4 files changed, 344 insertions(+), 40 deletions(-)

diff --git a/src/bthread/key.cpp b/src/bthread/key.cpp
index 433e1e14..5720a675 100644
--- a/src/bthread/key.cpp
+++ b/src/bthread/key.cpp
@@ -20,21 +20,35 @@
 // Date: Sun Aug  3 12:46:15 CST 2014
 
 #include <pthread.h>
-#include "butil/macros.h"
+#include <gflags/gflags.h>
+
+#include "bthread/errno.h"       // EAGAIN
+#include "bthread/task_group.h"  // TaskGroup
 #include "butil/atomicops.h"
+#include "butil/macros.h"
 #include "butil/thread_key.h"
+#include "butil/thread_local.h"
 #include "bvar/passive_status.h"
-#include "bthread/errno.h"                       // EAGAIN
-#include "bthread/task_group.h"                  // TaskGroup
 
 // Implement bthread_key_t related functions
 
 namespace bthread {
 
+DEFINE_uint32(key_table_list_size, 5000,
+              "The maximum length of the KeyTableList. Once this value is "
+              "exceeded, a portion of the KeyTables will be moved to the "
+              "global free_keytables list.");
+
+DEFINE_uint32(borrow_from_globle_size, 100,
+              "The maximum number of KeyTables retrieved in a single operation 
"
+              "from the global free_keytables when no KeyTable exists in the "
+              "current thread's keytable_list.");
+
+EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(TaskGroup*, tls_task_group);
+
 class KeyTable;
 
 // defined in task_group.cpp
-extern __thread TaskGroup* tls_task_group;
 extern __thread LocalStorage tls_bls;
 static __thread bool tls_ever_created_keytable = false;
 
@@ -52,7 +66,7 @@ static const uint32_t KEY_2NDLEVEL_SIZE = 32;
 static const uint32_t KEY_1STLEVEL_SIZE = 31;
 
 // Max tls in one thread, currently the value is 992 which should be enough
-// for most projects throughout baidu. 
+// for most projects throughout baidu.
 static const uint32_t KEYS_MAX = KEY_2NDLEVEL_SIZE * KEY_1STLEVEL_SIZE;
 
 // destructors/version of TLS.
@@ -94,7 +108,7 @@ public:
                 // Set the position to NULL before calling dtor which may set
                 // the position again.
                 _data[i].ptr = NULL;
-                
+
                 KeyInfo info = bthread::s_key_info[offset + i];
                 if (info.dtor && _data[i].version == info.version) {
                     info.dtor(p, info.dtor_args);
@@ -205,17 +219,20 @@ private:
     SubKeyTable* _subs[KEY_1STLEVEL_SIZE];
 };
 
-struct KeyTableList {
-    KeyTableList() {
-        keytable = NULL;
+class BAIDU_CACHELINE_ALIGNMENT KeyTableList {
+public:
+    KeyTableList() :
+        _head(NULL), _tail(NULL), _length(0) {
     }
+
     ~KeyTableList() {
-        bthread::TaskGroup* g = bthread::tls_task_group;
-        bthread::KeyTable* old_kt = bthread::tls_bls.keytable;
+        TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
+        KeyTable* old_kt = tls_bls.keytable;
+        KeyTable* keytable = _head;
         while (keytable) {
-            bthread::KeyTable* kt = keytable;
+            KeyTable* kt = keytable;
             keytable = kt->next;
-            bthread::tls_bls.keytable = kt;
+            tls_bls.keytable = kt;
             if (g) {
                 g->current_task()->local_storage.keytable = kt;
             }
@@ -223,35 +240,127 @@ struct KeyTableList {
             if (old_kt == kt) {
                 old_kt = NULL;
             }
-            g = bthread::tls_task_group;
+            g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
         }
-        bthread::tls_bls.keytable = old_kt;
-        if(g) {
+        tls_bls.keytable = old_kt;
+        if (g) {
             g->current_task()->local_storage.keytable = old_kt;
         }
     }
-    KeyTable* keytable;
+
+    void append(KeyTable* keytable) {
+        if (keytable == NULL) {
+            return;
+        }
+        if (_head == NULL) {
+            _head = _tail = keytable;
+        } else {
+            _tail->next = keytable;
+            _tail = keytable;
+        }
+        keytable->next = NULL;
+        _length++;
+    }
+
+    KeyTable* remove_front() {
+        if (_head == NULL) {
+            return NULL;
+        }
+        KeyTable* temp = _head;
+        _head = _head->next;
+        _length--;
+        if (_head == NULL) {
+            _tail = NULL;
+        }
+        return temp;
+    }
+
+    int move_first_n_to_target(KeyTable** target, uint32_t size) {
+        if (size > _length || _head == NULL) {
+            return 0;
+        }
+
+        KeyTable* current = _head;
+        KeyTable* prev = NULL;
+        uint32_t count = 0;
+        while (current != NULL && count < size) {
+            prev = current;
+            current = current->next;
+            count++;
+        }
+        if (prev != NULL) {
+            prev->next = NULL;
+            if (*target == NULL) {
+                *target = _head;
+            } else {
+                (*target)->next = _head;
+            }
+            _head = current;
+            _length -= count;
+            if (_head == NULL) {
+                _tail = NULL;
+            }
+        }
+        return count;
+    }
+
+    inline uint32_t get_length() {
+        return _length;
+    }
+
+    // Only for test
+    inline bool check_length() {
+        KeyTable* current = _head;
+        uint32_t count = 0;
+        while (current != NULL) {
+            current = current->next;
+            count++;
+        }
+        return count == _length;
+    }
+
+private:
+    KeyTable* _head;
+    KeyTable* _tail;
+    uint32_t _length;
 };
 
-static KeyTable* borrow_keytable(bthread_keytable_pool_t* pool) {
+KeyTable* borrow_keytable(bthread_keytable_pool_t* pool) {
     if (pool != NULL && (pool->list || pool->free_keytables)) {
         KeyTable* p;
         pthread_rwlock_rdlock(&pool->rwlock);
         auto list = (butil::ThreadLocal<bthread::KeyTableList>*)pool->list;
-        if (list && list->get()->keytable) {
-            p = list->get()->keytable;
-            list->get()->keytable = p->next;
-            pthread_rwlock_unlock(&pool->rwlock);
-            return p;
+        if (list) {
+            p = list->get()->remove_front();
+            if (p) {
+                pthread_rwlock_unlock(&pool->rwlock);
+                return p;
+            }
         }
         pthread_rwlock_unlock(&pool->rwlock);
         if (pool->free_keytables) {
             pthread_rwlock_wrlock(&pool->rwlock);
             p = (KeyTable*)pool->free_keytables;
-            if (p) {
-                pool->free_keytables = p->next;
+            if (list) {
+                for (uint32_t i = 0; i < FLAGS_borrow_from_globle_size; ++i) {
+                    if (p) {
+                        pool->free_keytables = p->next;
+                        list->get()->append(p);
+                        p = (KeyTable*)pool->free_keytables;
+                        --pool->size;
+                    } else {
+                        break;
+                    }
+                }
+                KeyTable* result = list->get()->remove_front();
                 pthread_rwlock_unlock(&pool->rwlock);
-                return p;
+                return result;
+            } else {
+                if (p) {
+                    pool->free_keytables = p->next;
+                    pthread_rwlock_unlock(&pool->rwlock);
+                    return p;
+                }
             }
             pthread_rwlock_unlock(&pool->rwlock);
         }
@@ -276,8 +385,17 @@ void return_keytable(bthread_keytable_pool_t* pool, 
KeyTable* kt) {
         return;
     }
     auto list = (butil::ThreadLocal<bthread::KeyTableList>*)pool->list;
-    kt->next = list->get()->keytable;
-    list->get()->keytable = kt;
+    list->get()->append(kt);
+    if (list->get()->get_length() > FLAGS_key_table_list_size) {
+        pthread_rwlock_unlock(&pool->rwlock);
+        pthread_rwlock_wrlock(&pool->rwlock);
+        if (!pool->destroyed) {
+            int out = list->get()->move_first_n_to_target(
+                (KeyTable**)(&pool->free_keytables),
+                FLAGS_key_table_list_size / 2);
+            pool->size += out;
+        }
+    }
     pthread_rwlock_unlock(&pool->rwlock);
 }
 
@@ -327,6 +445,7 @@ int bthread_keytable_pool_init(bthread_keytable_pool_t* 
pool) {
     pthread_rwlock_init(&pool->rwlock, NULL);
     pool->list = new butil::ThreadLocal<bthread::KeyTableList>();
     pool->free_keytables = NULL;
+    pool->size = 0;
     pool->destroyed = 0;
     return 0;
 }
@@ -339,6 +458,7 @@ int bthread_keytable_pool_destroy(bthread_keytable_pool_t* 
pool) {
     bthread::KeyTable* saved_free_keytables = NULL;
     pthread_rwlock_wrlock(&pool->rwlock);
     pool->destroyed = 1;
+    pool->size = 0;
     delete (butil::ThreadLocal<bthread::KeyTableList>*)pool->list;
     saved_free_keytables = (bthread::KeyTable*)pool->free_keytables;
     pool->list = NULL;
@@ -346,7 +466,8 @@ int bthread_keytable_pool_destroy(bthread_keytable_pool_t* 
pool) {
     pthread_rwlock_unlock(&pool->rwlock);
 
     // Cheat get/setspecific and destroy the keytables.
-    bthread::TaskGroup* g = bthread::tls_task_group;
+    bthread::TaskGroup* g =
+        bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
     bthread::KeyTable* old_kt = bthread::tls_bls.keytable;
     while (saved_free_keytables) {
         bthread::KeyTable* kt = saved_free_keytables;
@@ -356,7 +477,7 @@ int bthread_keytable_pool_destroy(bthread_keytable_pool_t* 
pool) {
             g->current_task()->local_storage.keytable = kt;
         }
         delete kt;
-        g = bthread::tls_task_group;
+        g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
     }
     bthread::tls_bls.keytable = old_kt;
     if (g) {
@@ -374,15 +495,34 @@ int 
bthread_keytable_pool_getstat(bthread_keytable_pool_t* pool,
         LOG(ERROR) << "Param[pool] or Param[stat] is NULL";
         return EINVAL;
     }
-    pthread_rwlock_rdlock(&pool->rwlock);
-    size_t count = 0;
-    bthread::KeyTable* p = (bthread::KeyTable*)pool->free_keytables;
-    for (; p; p = p->next, ++count) {}
-    stat->nfree = count;
+    pthread_rwlock_wrlock(&pool->rwlock);
+    stat->nfree = pool->size;
     pthread_rwlock_unlock(&pool->rwlock);
     return 0;
 }
 
+int get_thread_local_keytable_list_length(bthread_keytable_pool_t* pool) {
+    if (pool == NULL) {
+        LOG(ERROR) << "Param[pool] is NULL";
+        return EINVAL;
+    }
+    int length = 0;
+    pthread_rwlock_rdlock(&pool->rwlock);
+    if (pool->destroyed) {
+        pthread_rwlock_unlock(&pool->rwlock);
+        return length;
+    }
+    auto list = (butil::ThreadLocal<bthread::KeyTableList>*)pool->list;
+    if (list) {
+        length = (int)(list->get()->get_length());
+        if (!list->get()->check_length()) {
+            LOG(ERROR) << "Length is not equal";
+        }
+    }
+    pthread_rwlock_unlock(&pool->rwlock);
+    return length;
+}
+
 // TODO: this is not strict `reserve' because we only check #free.
 // Currently there's no way to track KeyTables that may be returned
 // to the pool in future.
@@ -418,6 +558,7 @@ void bthread_keytable_pool_reserve(bthread_keytable_pool_t* 
pool,
         }
         kt->next = (bthread::KeyTable*)pool->free_keytables;
         pool->free_keytables = kt;
+        ++pool->size;
         pthread_rwlock_unlock(&pool->rwlock);
         if (data == NULL) {
             break;
@@ -467,10 +608,10 @@ int bthread_key_delete(bthread_key_t key) {
                 ++bthread::s_key_info[key.index].version;
             }
             bthread::s_key_info[key.index].dtor = NULL;
-            bthread::s_key_info[key.index].dtor_args = NULL;        
+            bthread::s_key_info[key.index].dtor_args = NULL;
             bthread::s_free_keys[bthread::nfreekey++] = key.index;
             return 0;
-        } 
+        }
     }
     CHECK(false) << "bthread_key_delete is called on invalid " << key;
     return EINVAL;
@@ -489,7 +630,7 @@ int bthread_setspecific(bthread_key_t key, void* data) {
             return ENOMEM;
         }
         bthread::tls_bls.keytable = kt;
-        bthread::TaskGroup* const g = bthread::tls_task_group;
+        bthread::TaskGroup* const g = 
bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
         if (g) {
             g->current_task()->local_storage.keytable = kt;
         } else {
@@ -510,7 +651,7 @@ void* bthread_getspecific(bthread_key_t key) {
     if (kt) {
         return kt->get_data(key);
     }
-    bthread::TaskGroup* const g = bthread::tls_task_group;
+    bthread::TaskGroup* const g = 
bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
     if (g) {
         bthread::TaskMeta* const task = g->current_task();
         kt = bthread::borrow_keytable(task->attr.keytable_pool);
diff --git a/src/bthread/types.h b/src/bthread/types.h
index ff99bb1c..0124fcbd 100644
--- a/src/bthread/types.h
+++ b/src/bthread/types.h
@@ -88,6 +88,7 @@ typedef struct {
     pthread_rwlock_t rwlock;
     void* list;
     void* free_keytables;
+    size_t size;
     int destroyed;
 } bthread_keytable_pool_t;
 
diff --git a/src/bthread/unstable.h b/src/bthread/unstable.h
index f5bdeecb..dde5e6f6 100644
--- a/src/bthread/unstable.h
+++ b/src/bthread/unstable.h
@@ -128,6 +128,10 @@ extern int 
bthread_keytable_pool_destroy(bthread_keytable_pool_t*);
 extern int bthread_keytable_pool_getstat(bthread_keytable_pool_t* pool,
                                          bthread_keytable_pool_stat_t* stat);
 
+// [RPC INTERNAL]
+// Return thread local keytable list length if exist.
+extern int get_thread_local_keytable_list_length(bthread_keytable_pool_t* 
pool);
+
 // [RPC INTERNAL]
 // Reserve at most `nfree' keytables with `key' pointing to data created by
 // ctor(args).
diff --git a/test/bthread_key_unittest.cpp b/test/bthread_key_unittest.cpp
index c01ae7fe..a7f11e76 100644
--- a/test/bthread_key_unittest.cpp
+++ b/test/bthread_key_unittest.cpp
@@ -16,14 +16,33 @@
 // under the License.
 
 #include <algorithm>                         // std::sort
+#include <gflags/gflags.h>
 #include "butil/atomicops.h"
 #include <gtest/gtest.h>
+#include "butil/thread_key.h"
 #include "butil/time.h"
 #include "butil/macros.h"
 #include "butil/scoped_lock.h"
 #include "butil/logging.h"
 #include "bthread/bthread.h"
 #include "bthread/unstable.h"
+using namespace bthread;
+namespace bthread {
+DECLARE_uint32(key_table_list_size);
+DECLARE_uint32(borrow_from_globle_size);
+class KeyTable;
+// defined in bthread/key.cpp
+extern void return_keytable(bthread_keytable_pool_t*, KeyTable*);
+extern KeyTable* borrow_keytable(bthread_keytable_pool_t*);
+}  // namespace bthread
+
+int main(int argc, char* argv[]) {
+    bthread::FLAGS_key_table_list_size = 20;
+    bthread::FLAGS_borrow_from_globle_size = 20;
+    testing::InitGoogleTest(&argc, argv);
+    GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
+    return RUN_ALL_TESTS();
+}
 
 extern "C" {
 int bthread_keytable_pool_size(bthread_keytable_pool_t* pool) {
@@ -407,6 +426,144 @@ TEST(KeyTest, using_pool) {
     ASSERT_EQ(0, bthread_key_delete(key));
 }
 
+bthread_keytable_pool_t test_pool;
+struct PoolData2 {
+    bthread_key_t key;
+    bthread_attr_t attr;
+};
+
+static void pool_dtor2(void* tls) {
+    delete static_cast<PoolData2*>(tls);
+}
+
+static void usleep_thread_impl(PoolData2* data) {
+    if (NULL == bthread_getspecific(data->key)) {
+        PoolData2* data_new = new PoolData2();
+        ASSERT_EQ(0, bthread_setspecific(data->key, data_new));
+    }
+    bthread_usleep(1000L);
+    int length = get_thread_local_keytable_list_length(&test_pool);
+    ASSERT_LE((size_t)length, bthread::FLAGS_key_table_list_size);
+}
+
+static void* usleep_thread(void* args) {
+    usleep_thread_impl((PoolData2*)args);
+    return NULL;
+}
+
+static void launch_many_bthreads(PoolData2* data) {
+    std::vector<bthread_t> tids;
+    tids.reserve(25000);
+    for (size_t i = 0; i < 25000; ++i) {
+        bthread_t t0;
+        PoolData2* data_tmp = new PoolData2();
+        data_tmp->key = data->key;
+        ASSERT_EQ(0, bthread_start_background(&t0, &(data->attr), 
usleep_thread, data_tmp));
+        tids.push_back(t0);
+    }
+
+    usleep(3 * 1000 * 1000L);
+    for (size_t i = 0; i < tids.size(); ++i) {
+        bthread_join(tids[i], NULL);
+    }
+}
+
+static void* run_launch_many_bthreads(void* args) {
+    PoolData2* data = (PoolData2*)args;
+    launch_many_bthreads(data);
+    return NULL;
+}
+
+TEST(KeyTest, frequently_borrow_keytable_when_using_pool) {
+    PoolData2 data;
+    ASSERT_EQ(0, bthread_key_create(&data.key, pool_dtor2));
+
+    ASSERT_EQ(0, bthread_keytable_pool_init(&test_pool));
+    ASSERT_EQ(0, bthread_keytable_pool_size(&test_pool));
+
+    ASSERT_EQ(0, bthread_attr_init(&data.attr));
+    data.attr.keytable_pool = &test_pool;
+
+    bthread_t bth;
+    ASSERT_EQ(0, bthread_start_urgent(&bth, &data.attr, 
run_launch_many_bthreads, &data));
+    ASSERT_EQ(0, bthread_join(bth, NULL));
+    std::cout << "Free keytable size is "
+              << bthread_keytable_pool_size(&test_pool)
+              << " use keytable size is 25000" << std::endl;
+    ASSERT_EQ(0, bthread_keytable_pool_destroy(&test_pool));
+    ASSERT_EQ(0, bthread_key_delete(data.key));
+}
+
+std::vector<bthread::KeyTable*> table_list;
+pthread_mutex_t table_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static void return_thread_impl() {
+    for (int i = 0; i < 32768; i++) {
+        {
+            BAIDU_SCOPED_LOCK(table_mutex);
+            if (table_list.size() > 0) {
+                bthread::KeyTable* keytable = table_list[0];
+                table_list.erase(table_list.begin());
+                bthread::return_keytable(&test_pool, keytable);
+            }
+        }
+        int length = get_thread_local_keytable_list_length(&test_pool);
+        ASSERT_LE((size_t)length, bthread::FLAGS_key_table_list_size);
+    }
+}
+
+static void* return_thread(void*) {
+    return_thread_impl();
+    return NULL;
+}
+
+static void borrow_thread_impl() {
+    for (int i = 0; i < 32768; i++) {
+        bthread::KeyTable* keytable = bthread::borrow_keytable(&test_pool);
+        BAIDU_SCOPED_LOCK(table_mutex);
+        table_list.push_back(keytable);
+    }
+}
+
+static void* borrow_thread(void*) {
+    borrow_thread_impl();
+    return NULL;
+}
+
+TEST(KeyTest, borrow_and_return_keytable_when_using_pool) {
+    ASSERT_EQ(0, bthread_keytable_pool_init(&test_pool));
+    ASSERT_EQ(0, bthread_keytable_pool_size(&test_pool));
+
+    bthread_attr_t attr;
+    ASSERT_EQ(0, bthread_attr_init(&attr));
+    attr.keytable_pool = &test_pool;
+
+    bthread_t borrow_bth[8];
+    bthread_t return_bth[8];
+    for (size_t i = 0; i < arraysize(borrow_bth); ++i) {
+        ASSERT_EQ(0, bthread_start_background(&borrow_bth[i], &attr,
+                                              borrow_thread, NULL));
+    }
+    for (size_t i = 0; i < arraysize(return_bth); ++i) {
+        ASSERT_EQ(0, bthread_start_background(&return_bth[i], &attr,
+                                              return_thread, NULL));
+    }
+
+    for (size_t i = 0; i < arraysize(borrow_bth); ++i) {
+        ASSERT_EQ(0, bthread_join(borrow_bth[i], NULL));
+    }
+    for (size_t i = 0; i < arraysize(return_bth); ++i) {
+        ASSERT_EQ(0, bthread_join(return_bth[i], NULL));
+    }
+
+    for (size_t i = 0; i < table_list.size(); i++) {
+        bthread::return_keytable(&test_pool, table_list[i]);
+    }
+    table_list.clear();
+
+    ASSERT_EQ(0, bthread_keytable_pool_destroy(&test_pool));
+}
+
 // NOTE: lid is short for 'lock in dtor'.
 butil::atomic<size_t> lid_seq(1);
 std::vector<size_t> lid_seqs;
@@ -454,9 +611,10 @@ TEST(KeyTest, use_bthread_mutex_in_dtor) {
     std::sort(lid_seqs.begin(), lid_seqs.end());
     ASSERT_EQ(lid_seqs.end(), std::unique(lid_seqs.begin(), lid_seqs.end()));
     ASSERT_EQ(arraysize(th) + arraysize(bth) - 1,
-                *(lid_seqs.end() - 1) - *lid_seqs.begin());
+              *(lid_seqs.end() - 1) - *lid_seqs.begin());
 
     ASSERT_EQ(0, bthread_key_delete(key));
+    ASSERT_EQ(0, bthread_mutex_destroy(&mu));
 }
 
 }  // namespace


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to