This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new eac19012 Support bthread_once and bthread singleton (#2520)
eac19012 is described below
commit eac190125d19eee150d886cea6e6dfc7482f8874
Author: Bright Chen <[email protected]>
AuthorDate: Mon Feb 26 10:46:24 2024 +0800
Support bthread_once and bthread singleton (#2520)
---
src/bthread/bthread.h | 9 +++
src/bthread/bthread_once.cpp | 81 +++++++++++++++++++
src/bthread/singleton_on_bthread_once.h | 61 ++++++++++++++
src/bthread/types.h | 23 ++++++
test/bthread_once_unittest.cpp | 136 ++++++++++++++++++++++++++++++++
5 files changed, 310 insertions(+)
diff --git a/src/bthread/bthread.h b/src/bthread/bthread.h
index f91bc9af..a4c05867 100644
--- a/src/bthread/bthread.h
+++ b/src/bthread/bthread.h
@@ -336,6 +336,15 @@ extern void* bthread_getspecific(bthread_key_t key);
// Return current bthread tag
extern bthread_tag_t bthread_self_tag(void);
+// The first call to bthread_once() by any thread in a process, with a given
+// once_control, will call the init_routine() with no arguments. Subsequent
+// calls of bthread_once() with the same once_control will not call the
+// init_routine(). On return from bthread_once(), it is guaranteed that
+// init_routine() has completed. The once_control parameter is used to
+// determine whether the associated initialisation routine has been called.
+// Returns 0 on success, error code otherwise.
+extern int bthread_once(bthread_once_t* once_control, void (*init_routine)());
+
__END_DECLS
#endif // BTHREAD_BTHREAD_H
diff --git a/src/bthread/bthread_once.cpp b/src/bthread/bthread_once.cpp
new file mode 100644
index 00000000..a5751bc7
--- /dev/null
+++ b/src/bthread/bthread_once.cpp
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "bthread/types.h"
+#include "bthread/butex.h"
+
+bthread_once_t::bthread_once_t()
+ : _butex(bthread::butex_create_checked<butil::atomic<int>>()) {
+ _butex->store(UNINITIALIZED, butil::memory_order_relaxed);
+}
+
+bthread_once_t::~bthread_once_t() {
+ bthread::butex_destroy(_butex);
+}
+
+namespace bthread {
+
+int bthread_once_impl(bthread_once_t* once_control, void (*init_routine)()) {
+ butil::atomic<int>* butex = once_control->_butex;
+ // We need acquire memory order for this load because if the value
+ // signals that initialization has finished, we need to see any
+ // data modifications done during initialization.
+ int val = butex->load(butil::memory_order_acquire);
+ if (BAIDU_LIKELY(val == bthread_once_t::INITIALIZED)) {
+ // The initialization has already been done.
+ return 0;
+ }
+ val = bthread_once_t::UNINITIALIZED;
+ if (butex->compare_exchange_strong(val, bthread_once_t::INPROGRESS,
+ butil::memory_order_relaxed,
+ butil::memory_order_relaxed)) {
+ // This (b)thread is the first and the Only one here. Do the
initialization.
+ init_routine();
+ // Mark *once_control as having finished the initialization. We need
+ // release memory order here because we need to synchronize with other
+ // (b)threads that want to use the initialized data.
+ butex->store(bthread_once_t::INITIALIZED, butil::memory_order_release);
+ // Wake up all other (b)threads.
+ bthread::butex_wake_all(butex);
+ return 0;
+ }
+
+ while (true) {
+ // Same as above, we need acquire memory order.
+ val = butex->load(butil::memory_order_acquire);
+ if (BAIDU_LIKELY(val == bthread_once_t::INITIALIZED)) {
+ // The initialization has already been done.
+ return 0;
+ }
+ // Unless your constructor can be very time consuming, it is very
unlikely o hit
+ // this race. When it does, we just wait the thread until the object
has been created.
+ if (bthread::butex_wait(butex, val, NULL) < 0 &&
+ errno != EWOULDBLOCK && errno != EINTR/*note*/) {
+ return errno;
+ }
+ }
+}
+
+} // namespace bthread
+
+__BEGIN_DECLS
+
+int bthread_once(bthread_once_t* once_control, void (*init_routine)()) {
+ return bthread::bthread_once_impl(once_control, init_routine);
+}
+
+__END_DECLS
\ No newline at end of file
diff --git a/src/bthread/singleton_on_bthread_once.h
b/src/bthread/singleton_on_bthread_once.h
new file mode 100644
index 00000000..9ea507d7
--- /dev/null
+++ b/src/bthread/singleton_on_bthread_once.h
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_SINGLETON_ON_BTHREAD_ONCE_H
+#define BRPC_SINGLETON_ON_BTHREAD_ONCE_H
+
+#include "bthread/bthread.h"
+
+namespace bthread {
+
+template <typename T>
+class GetLeakySingleton {
+public:
+ static T* _instance;
+ static bthread_once_t* g_create_leaky_singleton_once;
+ static void create_leaky_singleton();
+};
+
+template <typename T>
+T* GetLeakySingleton<T>::_instance = NULL;
+
+template <typename T>
+bthread_once_t* GetLeakySingleton<T>::g_create_leaky_singleton_once
+ = new bthread_once_t;
+
+template <typename T>
+void GetLeakySingleton<T>::create_leaky_singleton() {
+ _instance = new T;
+}
+
+// To get a never-deleted singleton of a type T, just call
+// bthread::get_leaky_singleton<T>(). Most daemon (b)threads
+// or objects that need to be always-on can be created by
+// this function. This function can be called safely not only
+// before main() w/o initialization issues of global variables,
+// but also on bthread with hanging operation.
+template <typename T>
+inline T* get_leaky_singleton() {
+ using LeakySingleton = GetLeakySingleton<T>;
+ bthread_once(LeakySingleton::g_create_leaky_singleton_once,
+ LeakySingleton::create_leaky_singleton);
+ return LeakySingleton::_instance;
+}
+
+} // namespace bthread
+
+#endif // BRPC_SINGLETON_ON_BTHREAD_ONCE_H
diff --git a/src/bthread/types.h b/src/bthread/types.h
index cb39ae3c..d91b85aa 100644
--- a/src/bthread/types.h
+++ b/src/bthread/types.h
@@ -192,6 +192,29 @@ typedef struct {
typedef struct {
} bthread_barrierattr_t;
+#if defined(__cplusplus)
+class bthread_once_t;
+namespace bthread {
+extern int bthread_once_impl(bthread_once_t* once_control, void
(*init_routine)());
+}
+
+class bthread_once_t {
+public:
+friend int bthread::bthread_once_impl(bthread_once_t* once_control, void
(*init_routine)());
+ enum State {
+ UNINITIALIZED = 0,
+ INPROGRESS,
+ INITIALIZED,
+ };
+
+ bthread_once_t();
+ ~bthread_once_t();
+
+private:
+ butil::atomic<int>* _butex;
+};
+#endif
+
typedef struct {
uint64_t value;
} bthread_id_t;
diff --git a/test/bthread_once_unittest.cpp b/test/bthread_once_unittest.cpp
new file mode 100644
index 00000000..618798e8
--- /dev/null
+++ b/test/bthread_once_unittest.cpp
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include "bthread/bthread.h"
+#include "bthread/singleton_on_bthread_once.h"
+#include "bthread/task_control.h"
+
+namespace bthread {
+extern TaskControl* g_task_control;
+}
+
+namespace {
+
+bthread_once_t g_bthread_once_control;
+bool g_bthread_once_started = false;
+butil::atomic<int> g_bthread_once_count(0);
+
+void init_routine() {
+ bthread_usleep(2000 * 1000);
+ g_bthread_once_count.fetch_add(1, butil::memory_order_relaxed);
+}
+
+void bthread_once_task() {
+ bthread_once(&g_bthread_once_control, init_routine);
+ // `init_routine' only be called once.
+ ASSERT_EQ(1, g_bthread_once_count.load(butil::memory_order_relaxed));
+}
+
+void* first_bthread_once_task(void*) {
+ g_bthread_once_started = true;
+ bthread_once_task();
+ return NULL;
+}
+
+
+void* other_bthread_once_task(void*) {
+ bthread_once_task();
+ return NULL;
+}
+
+TEST(BthreadOnceTest, once) {
+ bthread_t bid;
+ ASSERT_EQ(0, bthread_start_background(
+ &bid, NULL, first_bthread_once_task, NULL));
+ while (!g_bthread_once_started) {
+ bthread_usleep(1000);
+ }
+ ASSERT_NE(nullptr, bthread::g_task_control);
+ int concurrency = bthread::g_task_control->concurrency();
+ LOG(INFO) << "concurrency: " << concurrency;
+ ASSERT_GT(concurrency, 0);
+ std::vector<bthread_t> bids(concurrency * 100);
+ for (auto& id : bids) {
+ ASSERT_EQ(0, bthread_start_background(
+ &id, NULL, other_bthread_once_task, NULL));
+ }
+ bthread_once_task();
+
+ for (auto& id : bids) {
+ bthread_join(id, NULL);
+ }
+ bthread_join(bid, NULL);
+}
+
+bool g_bthread_started = false;
+butil::atomic<int> g_bthread_singleton_count(0);
+
+class BthreadSingleton {
+public:
+ BthreadSingleton() {
+ bthread_usleep(2000 * 1000);
+ g_bthread_singleton_count.fetch_add(1, butil::memory_order_relaxed);
+ }
+};
+
+void get_bthread_singleton() {
+ auto instance = bthread::get_leaky_singleton<BthreadSingleton>();
+ ASSERT_NE(nullptr, instance);
+ // Only one BthreadSingleton instance has been created.
+ ASSERT_EQ(1, g_bthread_singleton_count.load(butil::memory_order_relaxed));
+}
+
+void* first_get_bthread_singleton(void*) {
+ g_bthread_started = true;
+ get_bthread_singleton();
+ return NULL;
+}
+
+
+void* get_bthread_singleton(void*) {
+ get_bthread_singleton();
+ return NULL;
+}
+
+// Singleton will definitely not cause deadlock,
+// even if constructor of T will hang the bthread.
+TEST(BthreadOnceTest, singleton) {
+ bthread_t bid;
+ ASSERT_EQ(0, bthread_start_background(
+ &bid, NULL, first_get_bthread_singleton, NULL));
+ while (!g_bthread_started) {
+ bthread_usleep(1000);
+ }
+ ASSERT_NE(nullptr, bthread::g_task_control);
+ int concurrency = bthread::g_task_control->concurrency();
+ LOG(INFO) << "concurrency: " << concurrency;
+ ASSERT_GT(concurrency, 0);
+ std::vector<bthread_t> bids(concurrency * 100);
+ for (auto& id : bids) {
+ ASSERT_EQ(0, bthread_start_background(
+ &id, NULL, get_bthread_singleton, NULL));
+ }
+ get_bthread_singleton();
+
+ for (auto& id : bids) {
+ bthread_join(id, NULL);
+ }
+ bthread_join(bid, NULL);
+}
+
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]