This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 210ebfb3 Support c++20 coroutine (#2121)
210ebfb3 is described below
commit 210ebfb312b5bfff867a4712ee1f01342f03aefa
Author: Weibing Wang <[email protected]>
AuthorDate: Tue Jan 23 12:12:59 2024 +0800
Support c++20 coroutine (#2121)
* Support c++20 coroutine
* Fix CI UT failed
* Add example coroutine Makefile
* Add usercode_in_coroutine flag
* Add coroutine document
* Add experimental namespace for coroutine
---
docs/cn/coroutine.md | 193 ++++++++++++++++++++
example/coroutine/Makefile | 87 +++++++++
example/coroutine/coroutine_server.cpp | 145 +++++++++++++++
example/coroutine/echo.proto | 34 ++++
src/brpc/controller.cpp | 3 +-
src/brpc/coroutine.h | 148 ++++++++++++++++
src/brpc/coroutine_inl.h | 312 +++++++++++++++++++++++++++++++++
src/brpc/event_dispatcher.cpp | 2 +
src/brpc/input_messenger.cpp | 3 +-
src/brpc/socket.cpp | 5 +-
src/butil/containers/stack_container.h | 15 +-
src/butil/type_traits.h | 8 +
test/brpc_coroutine_unittest.cpp | 221 +++++++++++++++++++++++
13 files changed, 1171 insertions(+), 5 deletions(-)
diff --git a/docs/cn/coroutine.md b/docs/cn/coroutine.md
new file mode 100644
index 00000000..cdce5b9e
--- /dev/null
+++ b/docs/cn/coroutine.md
@@ -0,0 +1,193 @@
+# C++20 协程支持
+
+bRPC 支持 C++20 协程说明文档。
+
+> 注:该功能是实验性的,请勿在生产环境下使用。
+
+## 使用说明
+
+### 适用场景
+
+C++协程适用于极高并发的场景。由于bthread使用了mmap,存在系统限制,一个进程bthread数量一般最多到万级别,如果采用同步方式用一个bthread来处理一个请求,那么请求的并发度也只能到万级别。如果采用异步方式来写代码,可以达到更高的并发,但又会导致代码难以维护。这时我们就可以使用C++协程,以类似同步的方式来写代码,而达到异步的性能效果。
+
+### 使用前提
+
+1. 需要使用支持c++20的编译器,如gcc 11
+2. 需要编译选项中加上 `-std=c++20`
+
+### 简单示例
+
+以下例子显示了如何在bRPC中启动一个C++20协程,在协程中发起 RPC调用,并等待返回结果。
+
+```cpp
+#include <brpc/channel.h>
+#include <brpc/coroutine.h>
+
+// 协程函数的返回类型,需要是brpc::experimental::Awaitable<T>
+// T是函数返回的实际数据类型
+brpc::experimental::Awaitable<int> RpcCall(brpc::Channel& channel) {
+ EchoRequest request;
+ EchoResponse response;
+ EchoService_Stub stub(&_channel);
+ brpc::Controller cntl;
+ brpc::experimental::AwaitableDone done;
+ stub.Echo(&cntl, &request, &response, &done);
+ // 等待RPC返回结果
+ co_await done.awaitable();
+ // 返回数据,注意这里用co_return而不是return
+ // 因为函数返回值类型是brpc::experimental::Awaitable<int>而不是int
+ co_return cntl.ErrorCode();
+}
+
+brpc::experimental::Awaitable<void> CoroutineMain(const char* server) {
+ brpc::Channel channel;
+ channel.Init(server, NULL);
+ // co_await会从Awaitable<int>得到int类型的返回值
+ int code = co_await RpcCall(channel);
+ printf("Rpc result:%d\n", code);
+}
+
+int main() {
+ // 启动协程
+ brpc::experimental::Coroutine coro(CoroutineMain("127.0.0.1:8080"));
+ // 等待协程执行完成
+ coro.join();
+ return 0;
+}
+```
+
+更完整的例子可以查看源码中的`example/coroutine/coroutine_server.cpp`文件。
+
+### 更多用法
+
+1. 在非协程环境下等待一个协程执行完成:
+
+```cpp
+brpc::experimental::Coroutine coro(func(args));
+coro.join();
+```
+
+2. 在非协程环境下等待协程完成并获取返回值:
+
+```cpp
+brpc::experimental::Coroutine coro(func(args)); // func的返回值类型为Awaitable<int>
+int result = coro.join<int>();
+```
+
+3. 在协程环境下等待协程执行完成:
+
+```cpp
+brpc::experimental::Coroutine coro(func(args));
+... // 做一些其它事情
+co_await coro.awaitable();
+```
+
+4. 在协程环境下等待协程执行完成并获取返回值:
+
+```cpp
+brpc::experimental::Coroutine coro(func(args)); // func的返回值类型为Awaitable<int>
+... // 做一些其它事情
+int ret = co_await coro.awaitable<int>();
+```
+
+5. 在协程环境下sleep:
+```cpp
+co_await brpc::experimental::Coroutine::usleep(1000);
+```
+
+### 注意事项
+
+1.
协程不保证一个函数的上下文都在同一个pthread或同一个bthread下执行。在co_await之后,代码所在的pthread或bthread可能发生变化,因此依赖于pthread或bthread的线程局部变量的代码(比如rpcz功能)将无法正确工作。
+2. 不应在协程中使用阻塞bthread(如bthread_join、同步RPC)或阻塞pthread的函数,否则可能导致死锁或者长耗时。
+3. 不要在不必要的地方使用协程,如下面的代码,虽然也能正常工作,但没有意义:
+
+```cpp
+brpc::experimental::Awaitable<int> inplace_func() {
+ co_return 123;
+}
+```
+
+### 实现极致性能
+
+如果确保服务的处理代码都运行在协程之中,并且没有任何阻塞bthread或阻塞pthread操作,则可以开启`usercode_in_coroutine`这个flag。开启这个flag之后,bRPC会简化服务端处理逻辑,减少不必要的bthread开销。在这种情况下,实际的工作线程数量将由event_dispatcher_num控制,而不再是由bthread
worker数量控制。
+
+## 实现原理
+
+### C++20协程实现原理
+
+为了方便理解,我们把上面的CoroutineMain函数稍微改写一下,把co_await前后的逻辑分成两部分:
+
+```cpp
+brpc::experimental::Awaitable<void> CoroutineMain(const char* server) {
+ brpc::Channel channel;
+ channel.Init(server, NULL);
+ brpc::experimental::Awaitable<int> awaitable = RpcCall(channel);
+
+ int code = co_await awaitable;
+ printf("Rpc result:%d\n", code);
+}
+```
+
+上面的代码实际上是怎么执行的呢?当你使用co_await关键字的时候,编译器会把co_await后面的步骤转换成一个callback函数,把这个callback传给实际co_await的那个`Awaitable<T>`对象,比如上面的CoroutineMain函数,经过编译器转换后会变成大概如下的逻辑(简化版,实际要比这个复杂得多):
+
+```cpp
+
+brpc::experimental::Awaitable<void> CoroutineMain(const char* server) {
+ // 根据函数返回类型,找到Awaitable<void>的名为promise_type的子类
+ // 在函数的入口,创建一个promise_type类型的对象
+ auto promise = new brpc::experimental::Awaitable<void>::promise_type();
+ // 从promise对象中创建返回Awaitable对象
+ Awaitable<void> ret = promise->get_return_object();
+
+ // co_await之前的逻辑,保持不变
+ brpc::Channel channel;
+ channel.Init(server, NULL);
+ brpc::experimental::Awaitable<int> awaitable = RpcCall(channel);
+
+ // co_await的逻辑,转成一个await_suspend的函数调用,传入一个callback函数
+ awaitable.await_suspend([promise, &awaitable]() {
+ // co_await之后的逻辑,转移到callback函数中
+ int code = awaitable.await_resume();
+ printf("Rpc result:%d\n", code);
+ // 在final_suspend里面,会做一些唤醒调用者、资源释放的工作
+ promise->final_suspend();
+ delete promise;
+ })
+ // 返回Awaitable<void>对象,以便上层函数进行处理
+ return ret;
+}
+```
+
+也就是说,co_await就是一个语法转换器,把看似同步的代码转化成异步调用的代码,仅此而已。至于Awaitable类和promise类的具体实现,编译器就不关心了,这是基础库需要做的。比如在brpc中封装了brpc::experimental::Awaitable类和promise子类,实现了await_suspend/await_resume等逻辑,使协程可以正确的工作起来。
+
+### 原子等待操作
+
+上面我们看到的是一个中间函数,它co_await一个子函数返回的Awaitable对象,然后自己也返回一个Awaitable对象。这样层层调用一定有一个尽头,即原子等待操作,它会返回Awaitable对象,但是它内部不再有co_await/co_return这样的语句了。目前实现了3种原子等待操作,未来可以扩展更多。
+
+1. 等待RPC返回结果: `AwaitableDone::awaitable()`
+2. 等待sleep: `Coroutine::usleep()`
+3. 等待另一个协程完成: `Coroutine::awaitable()`
+
+下面是一个原子等待操作的示例实现,我们需要手动创建一个promise对象,设置set_needs_suspend(),然后发起一个异步调用(如bthread_timer_add),在回调函数里设置好返回值、调用promise->on_done(),最后根据promise返回Awaitable对象即可。
+
+```cpp
+inline Awaitable<int> Coroutine::usleep(int sleep_us) {
+ auto promise = new detail::AwaitablePromise<int>();
+ promise->set_needs_suspend();
+ bthread_timer_t timer;
+ auto abstime = butil::microseconds_from_now(sleep_us);
+ auto cb = [](void* p) {
+ auto promise = static_cast<detail::AwaitablePromise<int>*>(p);
+ promise->set_value(0);
+ promise->on_done();
+ };
+ bthread_timer_add(&timer, abstime, cb, promise);
+ return Awaitable<int>(promise);
+}
+```
+
+### 协程与多线程
+
+上面我们可以看到,协程本质上就是一种callback,和线程没有直接关系。它可以是单线程的,也可以是多线程的,这完全取决于它的原子等待操作里是怎么调用callback的。在bRPC的环境里,callback有可能从另一个pthread或bthread发起,所以协程也是需要考虑多线程问题。比如,有可能在调用co_await语句之前,要等待的事情就已经结束了,对于这种情况co_await应该立即返回。
+
+协程和线程可以一起使用,比如我们可以使用bthread将任务scale到多核,然后在任务内部的子任务用协程来实现异步化。
\ No newline at end of file
diff --git a/example/coroutine/Makefile b/example/coroutine/Makefile
new file mode 100644
index 00000000..2dd5631d
--- /dev/null
+++ b/example/coroutine/Makefile
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+LINK_SO=1
+NEED_GPERFTOOLS=0
+BRPC_PATH=../..
+include $(BRPC_PATH)/config.mk
+# Notes on the flags:
+# 1. Added -fno-omit-frame-pointer: perf/tcmalloc-profiler use frame pointers
by default
+CXXFLAGS+=$(CPPFLAGS) -std=c++20 -DNDEBUG -O2 -pipe -W -Wall
-Wno-unused-parameter -fPIC -fno-omit-frame-pointer
+#CXXFLAGS+= -fsanitize=address
+#STATIC_LINKINGS+= -Wl,-Bstatic -lasan -Wl,-Bdynamic
+ifeq ($(NEED_GPERFTOOLS), 1)
+ CXXFLAGS+=-DBRPC_ENABLE_CPU_PROFILER
+endif
+HDRS+=$(BRPC_PATH)/src
+#HDRS+=$(BRPC_PATH)/output/include
+LIBS+=$(BRPC_PATH)/output/lib
+
+HDRPATHS=$(addprefix -I, $(HDRS))
+LIBPATHS=$(addprefix -L, $(LIBS))
+COMMA=,
+SOPATHS=$(addprefix -Wl$(COMMA)-rpath$(COMMA), $(LIBS))
+
+SERVER_SOURCES = coroutine_server.cpp
+PROTOS = $(wildcard *.proto)
+
+PROTO_OBJS = $(PROTOS:.proto=.pb.o)
+PROTO_GENS = $(PROTOS:.proto=.pb.h) $(PROTOS:.proto=.pb.cc)
+SERVER_OBJS = $(addsuffix .o, $(basename $(SERVER_SOURCES)))
+
+ifeq ($(SYSTEM),Darwin)
+ ifneq ("$(LINK_SO)", "")
+ STATIC_LINKINGS += -lbrpc
+ else
+ # *.a must be explicitly specified in clang
+ STATIC_LINKINGS += $(BRPC_PATH)/output/lib/libbrpc.a
+ endif
+ LINK_OPTIONS_SO = $^ $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS)
+ LINK_OPTIONS = $^ $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS)
+else ifeq ($(SYSTEM),Linux)
+ STATIC_LINKINGS += -lbrpc
+ LINK_OPTIONS_SO = -Xlinker "-(" $^ -Xlinker "-)" $(STATIC_LINKINGS)
$(DYNAMIC_LINKINGS)
+ LINK_OPTIONS = -Xlinker "-(" $^ -Wl,-Bstatic $(STATIC_LINKINGS)
-Wl,-Bdynamic -Xlinker "-)" $(DYNAMIC_LINKINGS)
+endif
+
+.PHONY:all
+all: coroutine_server
+
+.PHONY:clean
+clean:
+ @echo "> Cleaning"
+ rm -rf coroutine_server $(PROTO_GENS) $(PROTO_OBJS) $(SERVER_OBJS)
+
+coroutine_server:$(PROTO_OBJS) $(SERVER_OBJS)
+ @echo "> Linking $@"
+ifneq ("$(LINK_SO)", "")
+ $(CXX) $(LIBPATHS) $(SOPATHS) $(LINK_OPTIONS_SO) -o $@
+else
+ $(CXX) $(LIBPATHS) $(LINK_OPTIONS) -o $@
+endif
+
+%.pb.cc %.pb.h:%.proto
+ @echo "> Generating $@"
+ $(PROTOC) --cpp_out=. --proto_path=. $(PROTOC_EXTRA_ARGS) $<
+
+%.o:%.cpp
+ @echo "> Compiling $@"
+ $(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@
+
+%.o:%.cc
+ @echo "> Compiling $@"
+ $(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@
diff --git a/example/coroutine/coroutine_server.cpp
b/example/coroutine/coroutine_server.cpp
new file mode 100644
index 00000000..9df50b04
--- /dev/null
+++ b/example/coroutine/coroutine_server.cpp
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// A server to receive EchoRequest and send back EchoResponse.
+
+#include <gflags/gflags.h>
+#include <butil/logging.h>
+#include <brpc/server.h>
+#include <brpc/channel.h>
+#include <brpc/coroutine.h>
+#include "echo.pb.h"
+
+DEFINE_int32(port, 8000, "TCP Port of this server");
+DEFINE_int32(sleep_us, 1000000, "Server sleep us");
+DEFINE_bool(enable_coroutine, true, "Enable coroutine");
+
+using brpc::experimental::Awaitable;
+using brpc::experimental::AwaitableDone;
+using brpc::experimental::Coroutine;
+
+namespace example {
+class EchoServiceImpl : public EchoService {
+public:
+ EchoServiceImpl() {
+ brpc::ChannelOptions options;
+ options.timeout_ms = FLAGS_sleep_us / 1000 * 2 + 100;
+ options.max_retry = 0;
+ CHECK(_channel.Init(butil::EndPoint(butil::IP_ANY, FLAGS_port),
&options) == 0);
+ }
+
+ virtual ~EchoServiceImpl() {}
+
+ void Echo(google::protobuf::RpcController* cntl_base,
+ const EchoRequest* request,
+ EchoResponse* response,
+ google::protobuf::Closure* done) override {
+ // brpc::Controller* cntl =
+ // static_cast<brpc::Controller*>(cntl_base);
+
+ if (FLAGS_enable_coroutine) {
+ Coroutine(EchoAsync(request, response, done), true);
+ } else {
+ brpc::ClosureGuard done_guard(done);
+ bthread_usleep(FLAGS_sleep_us);
+ response->set_message(request->message());
+ }
+ }
+
+ Awaitable<void> EchoAsync(const EchoRequest* request,
+ EchoResponse* response,
+ google::protobuf::Closure* done) {
+ brpc::ClosureGuard done_guard(done);
+ co_await Coroutine::usleep(FLAGS_sleep_us);
+ response->set_message(request->message());
+ }
+
+ void Proxy(google::protobuf::RpcController* cntl_base,
+ const EchoRequest* request,
+ EchoResponse* response,
+ google::protobuf::Closure* done) override {
+ // brpc::Controller* cntl =
+ // static_cast<brpc::Controller*>(cntl_base);
+
+ if (FLAGS_enable_coroutine) {
+ Coroutine(ProxyAsync(request, response, done), true);
+ } else {
+ brpc::ClosureGuard done_guard(done);
+ EchoService_Stub stub(&_channel);
+ brpc::Controller cntl;
+ stub.Echo(&cntl, request, response, NULL);
+ if (cntl.Failed()) {
+ response->set_message(cntl.ErrorText());
+ }
+ }
+ }
+
+ Awaitable<void> ProxyAsync(const EchoRequest* request,
+ EchoResponse* response,
+ google::protobuf::Closure* done) {
+ brpc::ClosureGuard done_guard(done);
+ EchoService_Stub stub(&_channel);
+ brpc::Controller cntl;
+ AwaitableDone done2;
+ stub.Echo(&cntl, request, response, &done2);
+ co_await done2.awaitable();
+ if (cntl.Failed()) {
+ response->set_message(cntl.ErrorText());
+ }
+ }
+
+private:
+ brpc::Channel _channel;
+};
+} // namespace example
+
+int main(int argc, char* argv[]) {
+ bthread_setconcurrency(BTHREAD_MIN_CONCURRENCY);
+
+ // Parse gflags. We recommend you to use gflags as well.
+ GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
+ if (FLAGS_enable_coroutine) {
+ GFLAGS_NS::SetCommandLineOption("usercode_in_coroutine", "true");
+ }
+
+ // Generally you only need one Server.
+ brpc::Server server;
+
+ // Instance of your service.
+ example::EchoServiceImpl echo_service_impl;
+
+ // Add the service into server. Notice the second parameter, because the
+ // service is put on stack, we don't want server to delete it, otherwise
+ // use brpc::SERVER_OWNS_SERVICE.
+ if (server.AddService(&echo_service_impl,
+ brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
+ LOG(ERROR) << "Fail to add service";
+ return -1;
+ }
+
+ // Start the server.
+ brpc::ServerOptions options;
+ options.num_threads = BTHREAD_MIN_CONCURRENCY;
+ if (server.Start(FLAGS_port, &options) != 0) {
+ LOG(ERROR) << "Fail to start EchoServer";
+ return -1;
+ }
+
+ // Wait until Ctrl-C is pressed, then Stop() and Join() the server.
+ server.RunUntilAskedToQuit();
+ return 0;
+}
\ No newline at end of file
diff --git a/example/coroutine/echo.proto b/example/coroutine/echo.proto
new file mode 100644
index 00000000..ef5cc8ab
--- /dev/null
+++ b/example/coroutine/echo.proto
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax="proto2";
+package example;
+
+option cc_generic_services = true;
+
+message EchoRequest {
+ required string message = 1;
+};
+
+message EchoResponse {
+ required string message = 1;
+};
+
+service EchoService {
+ rpc Echo(EchoRequest) returns (EchoResponse);
+ rpc Proxy(EchoRequest) returns (EchoResponse);
+};
diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp
index f49a27a9..bfe278ff 100644
--- a/src/brpc/controller.cpp
+++ b/src/brpc/controller.cpp
@@ -125,6 +125,7 @@ const Controller* GetSubControllerOfSelectiveChannel(
const RPCSender* sender, int index);
DECLARE_bool(usercode_in_pthread);
+DECLARE_bool(usercode_in_coroutine);
static const int MAX_RETRY_COUNT = 1000;
static bvar::Adder<int64_t>* g_ncontroller = NULL;
@@ -684,7 +685,7 @@ void Controller::OnVersionedRPCReturned(const
CompletionInfo& info,
}
END_OF_RPC:
- if (new_bthread) {
+ if (new_bthread && !FLAGS_usercode_in_coroutine) {
// [ Essential for -usercode_in_pthread=true ]
// When -usercode_in_pthread is on, the reserved threads (set by
// -usercode_backup_threads) may all block on bthread_id_lock in
diff --git a/src/brpc/coroutine.h b/src/brpc/coroutine.h
new file mode 100644
index 00000000..513f4022
--- /dev/null
+++ b/src/brpc/coroutine.h
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_COROUTINE_H
+#define BRPC_COROUTINE_H
+
+#if __cplusplus >= 202002L
+
+#define BRPC_ENABLE_COROUTINE 1
+
+#include <coroutine>
+#include <functional>
+#include <atomic>
+#include "brpc/callback.h"
+
+namespace brpc {
+namespace experimental {
+
+namespace detail {
+class AwaitablePromiseBase;
+template <typename T>
+class AwaitablePromise;
+}
+
+class AwaitableDone;
+class Coroutine;
+
+// WARN:The bRPC coroutine feature is experimental, DO NOT use in production
environment!
+
+// Awaitable<T> is used as coroutine return type, for example:
+// Awaitable<int> func1() {
+// co_return 42;
+// }
+// Awaitable<std::string> func2() {
+// int ret = co_await func1();
+// co_return std::to_string(ret);
+// }
+template <typename T>
+class Awaitable {
+public:
+ using promise_type = detail::AwaitablePromise<T>;
+
+ ~Awaitable() {}
+
+ // NOTE: compiler will generate calls to these functions automatically,
+ // DO NOT call them manually
+ bool await_ready();
+ template <typename U>
+ void await_suspend(std::coroutine_handle<detail::AwaitablePromise<U> >
awaiting);
+ T await_resume();
+
+private:
+friend class detail::AwaitablePromise<T>;
+friend class AwaitableDone;
+friend class Coroutine;
+
+ Awaitable() = delete;
+ Awaitable(promise_type* p) : _promise(p) {}
+
+ promise_type* promise() {
+ return _promise;
+ }
+
+ promise_type* _promise;
+};
+
+// Utility for a coroutine to wait for RPC call. Usage:
+// AwaitableDone done;
+// stub.CallMethod(&cntl, &req, &resp, &done);
+// co_await done.awaitable();
+//
+class AwaitableDone : public google::protobuf::Closure {
+public:
+ AwaitableDone();
+
+ void Run() override;
+
+ Awaitable<void>& awaitable() {
+ return _awaitable;
+ }
+private:
+ Awaitable<void> _awaitable;
+};
+
+// Class for management of coroutine
+// 1. To create a new coroutine and wait it finish:
+// Awaitable<void> func(double val);
+//
+// int main() {
+// Coroutine coro(func(1.0));
+// coro.join();
+// }
+// 2. To wait a coroutine in another coroutine:
+// Awaitable<void> another_func() {
+// Coroutine coro(func(1.0));
+// co_await coro.awaitable<void>();
+// }
+// 3. To create a detached coroutine without waiting:
+// Coroutine coro(func(1.0), true);
+// 4. To sleep in a coroutine:
+// co_await Coroutine::usleep(100);
+//
+// NOTE: Inside coroutine function, DO NOT call pthread-blocking or
+// bthread-blocking functions (eg. bthread_join(), bthread_usleep(),
syncronized RPC),
+// otherwise may cause dead lock or long latency.
+class Coroutine {
+public:
+ template <typename T>
+ Coroutine(Awaitable<T>&& aw, bool detach = false);
+
+ ~Coroutine();
+
+ template <typename T = void>
+ T join();
+
+ template <typename T = void>
+ Awaitable<T> awaitable();
+
+ static Awaitable<int> usleep(int sleep_us);
+
+private:
+ detail::AwaitablePromiseBase* _promise{nullptr};
+ bool _waited{false};
+ std::atomic<int>* _butex{nullptr};
+};
+
+} // namespace experimental
+} // namespace brpc
+
+#include "brpc/coroutine_inl.h"
+
+#endif // __cplusplus >= 202002L
+
+#endif // BRPC_COROUTINE_H
\ No newline at end of file
diff --git a/src/brpc/coroutine_inl.h b/src/brpc/coroutine_inl.h
new file mode 100644
index 00000000..1ff40054
--- /dev/null
+++ b/src/brpc/coroutine_inl.h
@@ -0,0 +1,312 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_COROUTINE_INL_H
+#define BRPC_COROUTINE_INL_H
+
+#include "bthread/unstable.h" // bthread_timer_add
+#include "bthread/butex.h" // butex_wake/butex_wait
+
+namespace brpc {
+namespace experimental {
+
+namespace detail {
+
+class AwaitablePromiseBase {
+public:
+ AwaitablePromiseBase() {
+ }
+
+ virtual ~AwaitablePromiseBase() {
+ delete _suspended_or_done;
+ }
+
+ virtual void resume() = 0;
+ virtual void destroy() = 0;
+
+ bool needs_suspend() {
+ return _suspended_or_done != nullptr;
+ }
+
+ void set_needs_suspend() {
+ _suspended_or_done = new std::atomic<bool>();
+ _suspended_or_done->store(false);
+ }
+
+ // For a Coroutine's leaf function
+ // Its caller will be suspended, waiting for its done.
+ // But the suspend and done are always in different threads.
+ // It may suspend before done, or done before suspend.
+ // So we use an atomic<bool>, after first suspend_or_done() it will become
true.
+ // Then the second suspend_or_done(), exchange(true) will returns true.
+ // Then we can safely delete this.
+ void suspend_or_done() {
+ if (_suspended_or_done->exchange(true)) {
+ // Already suspend AND done
+ if (_caller) {
+ // The leaf function has finished, resume its caller.
+ _caller->resume();
+ }
+ delete this;
+ }
+ }
+
+ void on_suspend() { suspend_or_done(); }
+ void on_done() { suspend_or_done(); }
+
+ void set_callback(std::function<void()> cb) {
+ _callback = cb;
+ }
+
+ void set_caller(AwaitablePromiseBase* caller) {
+ _caller = caller;
+ }
+
+ // When the coroutine function begins, initial_suspend() will be called
+ auto initial_suspend() {
+ // Always suspend the function, later resume() will make it start to
run
+ return std::suspend_always{};
+ }
+
+ // When the coroutine function throws unhandled exception,
unhandled_exception() will be called
+ void unhandled_exception() {
+ LOG(ERROR) << "Coroutine throws unhandled exception!";
+ std::exit(1);
+ }
+
+ // When the coroutine function ends, final_suspend() will be called
+ auto final_suspend() noexcept {
+ if (_caller) {
+ // The caller is waiting for this function to return
+ // Now it can be resumed
+ _caller->resume();
+ }
+ if (_callback) {
+ _callback();
+ _callback = nullptr;
+ }
+ // Returns suspend_never{} so that the coroutine will be destroyed and
the AwaitablePromise be deleted.
+ // DO NOT call destroy() here, which will cause double destruct of
RAII objects.
+ // DO NOT call delete this here, which will cause malloc and free not
match.
+ return std::suspend_never{};
+ }
+
+private:
+ // For a Coroutine's root function, it needs a callback to notify its
waiter
+ std::function<void()> _callback;
+ // For a Coroutine's leaf function, it is always resumed from another
thread.
+ // It needs an atomic variable to keep thread safety.
+ // Non-leaf function does't need this, so we defined it as an optional
pointer.
+ std::atomic<bool>* _suspended_or_done{nullptr};
+ // For a Coroutine's non-root function, it needs to resume its caller when
it finished.
+ AwaitablePromiseBase* _caller{nullptr};
+};
+
+template <typename T>
+class AwaitablePromise : public AwaitablePromiseBase {
+public:
+ T value() {
+ return _value;
+ }
+
+ void set_value(T value) {
+ _value = value;
+ }
+
+ void resume() override {
+ _coro.resume();
+ }
+
+ void destroy() override {
+ _coro.destroy();
+ }
+
+ // When we call a coroutine function, an AwaitablePromise<T> will be
created.
+ // Then call its get_return_object() to return an Awaitable<T>.
+ auto get_return_object() {
+ _coro = std::coroutine_handle<AwaitablePromise>::from_promise(*this);
+ return Awaitable<T>(this);
+ }
+
+ // When we call co_return in a function, return_value() will be called.
+ auto return_value(T v) {
+ _value = v;
+ return std::suspend_never{};
+ }
+
+private:
+ T _value;
+ std::coroutine_handle<AwaitablePromise> _coro;
+};
+
+template <>
+class AwaitablePromise<void> : public AwaitablePromiseBase {
+public:
+ void resume() override {
+ _coro.resume();
+ }
+
+ void destroy() override {
+ _coro.destroy();
+ }
+
+ // When we call a coroutine function, an AwaitablePromise<void> will be
created.
+ // Then call its get_return_object() to return an Awaitable<void>.
+ auto get_return_object() {
+ _coro = std::coroutine_handle<AwaitablePromise>::from_promise(*this);
+ return Awaitable<void>(this);
+ }
+
+ // When we call return in a coroutine function, return_value() will be
called.
+ auto return_value() {
+ return std::suspend_never{};
+ }
+
+private:
+ std::coroutine_handle<AwaitablePromise> _coro;
+};
+
+} // namespace detail
+
+// When co_await an Awaitable<T>, await_ready() will be called automatically.
+template <typename T>
+inline bool Awaitable<T>::await_ready() {
+ // Always returns false so that the caller will be suspended at the
co_await point.
+ return false;
+}
+
+// If await_ready returns false, await_suspend() will be called automatically.
+template <typename T>
+template <typename U>
+inline void
Awaitable<T>::await_suspend(std::coroutine_handle<detail::AwaitablePromise<U> >
awaiting) {
+ _promise->set_caller(&awaiting.promise());
+ if (_promise->needs_suspend()) {
+ _promise->on_suspend();
+ return;
+ }
+ _promise->resume();
+}
+
+// When the caller resumes from co_await, await_resume() will be called to get
return value
+template <typename T>
+inline T Awaitable<T>::await_resume() {
+ if constexpr (!std::is_same<T, void>::value) {
+ return _promise->value();
+ }
+}
+
+inline AwaitableDone::AwaitableDone()
+ : _awaitable(new detail::AwaitablePromise<void>) {
+ _awaitable.promise()->set_needs_suspend();
+}
+
+inline void AwaitableDone::Run() {
+ _awaitable.promise()->on_done();
+}
+
+template <typename T>
+inline Coroutine::Coroutine(Awaitable<T>&& aw, bool detach) {
+ detail::AwaitablePromise<T>* origin_promise = aw.promise();
+ CHECK(origin_promise);
+
+ if (!detach) {
+ // Create butex for join()
+ _butex = bthread::butex_create_checked<std::atomic<int> >();
+ _butex->store(0);
+
+ // Create AwaitablePromise for awaitable()
+ _promise = new detail::AwaitablePromise<T>();
+ _promise->set_needs_suspend();
+
+ auto cb = [this, origin_promise]() {
+ if constexpr (!std::is_same<T, void>::value) {
+
dynamic_cast<detail::AwaitablePromise<T>*>(_promise)->set_value(origin_promise->value());
+ }
+ // wakeup join()
+ _butex->store(1);
+ bthread::butex_wake(_butex);
+
+ // wakeup co_await on awaitable()
+ _promise->on_done();
+ };
+ origin_promise->set_callback(cb);
+ }
+
+ // Start to run the coroutine
+ origin_promise->resume();
+}
+
+inline Coroutine::~Coroutine() {
+ if (_promise != nullptr && !_waited) {
+ join();
+ }
+ if (_butex) {
+ bthread::butex_destroy(_butex);
+ _butex = nullptr;
+ }
+}
+
+template <typename T>
+inline T Coroutine::join() {
+ CHECK(_promise != nullptr) << "join() can not be called to detached
coroutine!";
+ CHECK(_waited == false) << "awaitable() or join() can only be called
once!";
+ _waited = true;
+ bthread::butex_wait(_butex, 0, nullptr);
+ if constexpr (!std::is_same<T, void>::value) {
+ auto promise = dynamic_cast<detail::AwaitablePromise<T>*>(_promise);
+ CHECK(promise != nullptr) << "join type not match";
+ T ret = promise->value();
+ _promise->on_suspend();
+ return ret;
+ } else {
+ _promise->on_suspend();
+ }
+}
+
+template <typename T>
+inline Awaitable<T> Coroutine::awaitable() {
+ CHECK(_promise != nullptr) << "awaitable() can not be called to detached
coroutine!";
+ CHECK(_waited == false) << "awaitable() or join() can only be called
once!";
+ auto promise = dynamic_cast<detail::AwaitablePromise<T>*>(_promise);
+ CHECK(promise != nullptr) << "awaitable type not match";
+ _waited = true;
+ return Awaitable<T>(promise);
+}
+
+// NOTE: the caller will be resumed on bthread timer thread,
+// bthread only have one timer thread, this may be performance bottle-neck
+inline Awaitable<int> Coroutine::usleep(int sleep_us) {
+ auto promise = new detail::AwaitablePromise<int>();
+ promise->set_needs_suspend();
+ bthread_timer_t timer;
+ auto abstime = butil::microseconds_from_now(sleep_us);
+ auto cb = [](void* p) {
+ auto promise = static_cast<detail::AwaitablePromise<int>*>(p);
+ promise->set_value(0);
+ promise->on_done();
+ };
+ if (bthread_timer_add(&timer, abstime, cb, promise) != 0) {
+ promise->set_value(-1);
+ promise->on_done();
+ }
+ return Awaitable<int>(promise);
+}
+
+} // namespace experimental
+} // namespace brpc
+
+#endif // BRPC_COROUTINE_INL_H
\ No newline at end of file
diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher.cpp
index f747206a..689e80da 100644
--- a/src/brpc/event_dispatcher.cpp
+++ b/src/brpc/event_dispatcher.cpp
@@ -33,6 +33,8 @@ DEFINE_int32(event_dispatcher_num, 1, "Number of event
dispatcher");
DEFINE_bool(usercode_in_pthread, false,
"Call user's callback in pthreads, use bthreads otherwise");
+DEFINE_bool(usercode_in_coroutine, false,
+ "User's callback are run in coroutine, no bthread or pthread
blocking call");
static EventDispatcher* g_edisp = NULL;
static pthread_once_t g_edisp_once = PTHREAD_ONCE_INIT;
diff --git a/src/brpc/input_messenger.cpp b/src/brpc/input_messenger.cpp
index e619af74..3f740b1c 100644
--- a/src/brpc/input_messenger.cpp
+++ b/src/brpc/input_messenger.cpp
@@ -66,6 +66,7 @@ DEFINE_int32(socket_keepalive_count, -1,
"Set number of keepalives of sockets before close if this value
is positive");
DECLARE_bool(usercode_in_pthread);
+DECLARE_bool(usercode_in_coroutine);
DECLARE_uint64(max_body_size);
const size_t MSG_SIZE_WINDOW = 10; // Take last so many message into stat.
@@ -195,7 +196,7 @@ static void QueueMessage(InputMessageBase* to_run_msg,
BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
tmp.keytable_pool = keytable_pool;
tmp.tag = bthread_self_tag();
- if (bthread_start_background(
+ if (!FLAGS_usercode_in_coroutine && bthread_start_background(
&th, &tmp, ProcessInputMessage, to_run_msg) == 0) {
++*num_bthread_created;
} else {
diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp
index 2a391f35..9248dd18 100644
--- a/src/brpc/socket.cpp
+++ b/src/brpc/socket.cpp
@@ -100,6 +100,7 @@ DEFINE_int32(connect_timeout_as_unreachable, 3,
"fails the main socket as well when this socket is pooled.");
DECLARE_int32(health_check_timeout_ms);
+DECLARE_bool(usercode_in_coroutine);
static bool validate_connect_timeout_as_unreachable(const char*, int32_t v) {
return v >= 2 && v < 1000/*large enough*/;
@@ -2197,7 +2198,9 @@ int Socket::StartInputEvent(SocketId id, uint32_t events,
bthread_attr_t attr = thread_attr;
attr.keytable_pool = p->_keytable_pool;
attr.tag = bthread_self_tag();
- if (bthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) {
+ if (FLAGS_usercode_in_coroutine) {
+ ProcessEvent(p);
+ } else if (bthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) {
LOG(FATAL) << "Fail to start ProcessEvent";
ProcessEvent(p);
}
diff --git a/src/butil/containers/stack_container.h
b/src/butil/containers/stack_container.h
index 111e7e5e..5679ab86 100644
--- a/src/butil/containers/stack_container.h
+++ b/src/butil/containers/stack_container.h
@@ -36,8 +36,14 @@ namespace butil {
template<typename T, size_t stack_capacity>
class StackAllocator : public std::allocator<T> {
public:
- typedef typename std::allocator<T>::pointer pointer;
- typedef typename std::allocator<T>::size_type size_type;
+#if __cplusplus >= 202002L
+ typedef typename std::allocator_traits<std::allocator<T> > Allocator;
+#else
+ typedef typename std::allocator<T> Allocator;
+#endif
+
+ typedef typename Allocator::pointer pointer;
+ typedef typename Allocator::size_type size_type;
// Backing store for the allocator. The container owner is responsible for
// maintaining this for as long as any containers using this allocator are
@@ -109,7 +115,12 @@ class StackAllocator : public std::allocator<T> {
source_->used_stack_buffer_ = true;
return source_->stack_buffer();
} else {
+#if __cplusplus >= 202002L
+ (void)hint;
+ return std::allocator<T>::allocate(n);
+#else
return std::allocator<T>::allocate(n, hint);
+#endif
}
}
diff --git a/src/butil/type_traits.h b/src/butil/type_traits.h
index 4f67fcca..5f342db3 100644
--- a/src/butil/type_traits.h
+++ b/src/butil/type_traits.h
@@ -92,7 +92,15 @@ template <typename T> struct is_pointer : false_type {};
template <typename T> struct is_pointer<T*> : true_type {};
#if defined(BUTIL_CXX11_ENABLED)
+
+#if __cplusplus >= 202002L
+template <class T> struct is_pod
+: integral_constant<bool, (std::is_standard_layout<T>::value &&
+ std::is_trivial<T>::value)> {};
+#else
template <class T> struct is_pod : std::is_pod<T> {};
+#endif
+
#else
// We can't get is_pod right without compiler help, so fail conservatively.
// We will assume it's false except for arithmetic types, enumerations,
diff --git a/test/brpc_coroutine_unittest.cpp b/test/brpc_coroutine_unittest.cpp
new file mode 100644
index 00000000..b89c1408
--- /dev/null
+++ b/test/brpc_coroutine_unittest.cpp
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include "brpc/server.h"
+#include "brpc/channel.h"
+#include "brpc/coroutine.h"
+#include "echo.pb.h"
+
+int main(int argc, char* argv[]) {
+#ifdef BRPC_ENABLE_COROUTINE
+ testing::InitGoogleTest(&argc, argv);
+ GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
+ return RUN_ALL_TESTS();
+#else
+ printf("bRPC coroutine is not enabled, please add -std=c++20 to compile
options\n");
+ return 0;
+#endif
+}
+
+#ifdef BRPC_ENABLE_COROUTINE
+
+using brpc::experimental::Awaitable;
+using brpc::experimental::AwaitableDone;
+using brpc::experimental::Coroutine;
+
+class Trace {
+public:
+ Trace(const std::string& name) {
+ _name = name;
+ LOG(INFO) << "enter " << name;
+ }
+ ~Trace() {
+ LOG(INFO) << "exit " << _name;
+ }
+private:
+ std::string _name;
+};
+
+class EchoServiceImpl : public test::EchoService {
+public:
+ EchoServiceImpl() {}
+ virtual ~EchoServiceImpl() {}
+ virtual void Echo(google::protobuf::RpcController* cntl_base,
+ const test::EchoRequest* request,
+ test::EchoResponse* response,
+ google::protobuf::Closure* done) {
+ // brpc::Controller* cntl = (brpc::Controller*)cntl_base;
+ // brpc::ClosureGuard done_guard(done);
+ // response->set_message(request->message());
+
+ // Create a detached coroutine, so the current bthread will return at
once.
+ Coroutine(EchoAsync(request, response, done), true);
+ }
+
+ Awaitable<void> EchoAsync(const test::EchoRequest* request,
+ test::EchoResponse* response,
+ google::protobuf::Closure* done) {
+ Trace t("EchoAsync");
+ // This is important to test RAII object's destruction after coroutine
finished
+ brpc::ClosureGuard done_guard(done);
+ if (request->has_sleep_us()) {
+ LOG(INFO) << "sleep " << request->sleep_us() << " us at server
side";
+ co_await Coroutine::usleep(request->sleep_us());
+ }
+ response->set_message(request->message());
+ }
+};
+
+class CoroutineTest : public ::testing::Test{
+protected:
+ CoroutineTest() {};
+ virtual ~CoroutineTest(){};
+ virtual void SetUp() {};
+ virtual void TearDown() {};
+};
+
+
+static int delay_us = 0;
+
+Awaitable<std::string> inplace_func(const std::string& input) {
+ Trace t("inplace_func");
+ co_return input;
+}
+
+Awaitable<double> inplace_func2() {
+ Trace t("inplace_func2");
+ co_await inplace_func("123");
+ co_return 0.5;
+}
+
+Awaitable<int> sleep_func() {
+ Trace t("sleep_func");
+ int64_t s = butil::monotonic_time_us();
+ auto aw = Coroutine::usleep(1000);
+ usleep(delay_us);
+ co_await aw;
+ int cost = butil::monotonic_time_us() - s;
+ EXPECT_GE(cost, 1000);
+ LOG(INFO) << "after usleep:" << cost;
+ co_return 123;
+}
+
+Awaitable<float> exception_func() {
+ Trace t("exception_func");
+ throw std::string("error");
+}
+
+Awaitable<void> func(brpc::Channel& channel, int* out) {
+ Trace t("func");
+ test::EchoService_Stub stub(&channel);
+ test::EchoRequest request;
+ request.set_message("hello world");
+ test::EchoResponse response;
+ brpc::Controller cntl;
+
+ LOG(INFO) << "before start coroutine";
+ Coroutine coro(sleep_func());
+ usleep(delay_us);
+ LOG(INFO) << "before wait coroutine";
+ int ret = co_await coro.awaitable<int>();
+ EXPECT_EQ(ret, 123);
+ LOG(INFO) << "after wait coroutine, ret:" << ret;
+
+ auto str = co_await inplace_func("hello");
+ EXPECT_EQ("hello", str);
+
+ float num = 0.0;
+ try {
+ num = co_await exception_func();
+ } catch(std::string str) {
+ EXPECT_EQ("error", str);
+ num = 1.0;
+ }
+ EXPECT_EQ(1.0, num);
+
+ AwaitableDone done;
+ LOG(INFO) << "start echo";
+ stub.Echo(&cntl, &request, &response, &done);
+ LOG(INFO) << "after echo";
+ usleep(delay_us);
+ co_await done.awaitable();
+ LOG(INFO) << "after wait";
+ EXPECT_FALSE(cntl.Failed()) << cntl.ErrorText();
+ EXPECT_EQ("hello world", response.message());
+
+ cntl.Reset();
+ request.set_sleep_us(2000);
+ AwaitableDone done2;
+ LOG(INFO) << "start echo2";
+ int64_t s = butil::monotonic_time_us();
+ stub.Echo(&cntl, &request, &response, &done2);
+ LOG(INFO) << "after echo2";
+ co_await done2.awaitable();
+ int cost = butil::monotonic_time_us() - s;
+ LOG(INFO) << "after wait2";
+ EXPECT_GE(cost, 2000);
+ EXPECT_FALSE(cntl.Failed()) << cntl.ErrorText();
+ EXPECT_EQ("hello world", response.message());
+
+ *out = 456;
+}
+
+TEST_F(CoroutineTest, coroutine) {
+ butil::EndPoint ep;
+ ASSERT_EQ(0, str2endpoint("127.0.0.1:8613", &ep));
+
+ brpc::Server server;
+ EchoServiceImpl service;
+ server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE);
+ ASSERT_EQ(0, server.Start(ep, NULL));
+
+ brpc::Channel channel;
+ brpc::ChannelOptions options;
+ ASSERT_EQ(0, channel.Init(ep, &options));
+
+ int out = 0;
+ Coroutine coro(func(channel, &out));
+ coro.join();
+ ASSERT_EQ(456, out);
+
+ out = 0;
+ delay_us = 10000;
+ Coroutine coro2(func(channel, &out));
+ coro2.join();
+ ASSERT_EQ(456, out);
+ delay_us = 0;
+
+ Coroutine coro3(inplace_func2());
+ double d = coro3.join<double>();
+ ASSERT_EQ(0.5, d);
+
+ Coroutine coro4(inplace_func("abc"));
+ coro4.join();
+
+ Coroutine coro5(sleep_func());
+ coro5.join();
+
+ Coroutine coro6(inplace_func2(), true);
+ Coroutine coro7(inplace_func("abc"), true);
+ Coroutine coro8(sleep_func(), true);
+ usleep(10000); // wait sleep_func() to complete
+
+ LOG(INFO) << "test case finished";
+}
+
+#endif // BRPC_ENABLE_COROUTINE
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]