This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 210ebfb3 Support c++20 coroutine (#2121)
210ebfb3 is described below

commit 210ebfb312b5bfff867a4712ee1f01342f03aefa
Author: Weibing Wang <wwb...@163.com>
AuthorDate: Tue Jan 23 12:12:59 2024 +0800

    Support c++20 coroutine (#2121)
    
    * Support c++20 coroutine
    
    * Fix CI UT failed
    
    * Add example coroutine Makefile
    
    * Add usercode_in_coroutine flag
    
    * Add coroutine document
    
    * Add experimental namespace for coroutine
---
 docs/cn/coroutine.md                   | 193 ++++++++++++++++++++
 example/coroutine/Makefile             |  87 +++++++++
 example/coroutine/coroutine_server.cpp | 145 +++++++++++++++
 example/coroutine/echo.proto           |  34 ++++
 src/brpc/controller.cpp                |   3 +-
 src/brpc/coroutine.h                   | 148 ++++++++++++++++
 src/brpc/coroutine_inl.h               | 312 +++++++++++++++++++++++++++++++++
 src/brpc/event_dispatcher.cpp          |   2 +
 src/brpc/input_messenger.cpp           |   3 +-
 src/brpc/socket.cpp                    |   5 +-
 src/butil/containers/stack_container.h |  15 +-
 src/butil/type_traits.h                |   8 +
 test/brpc_coroutine_unittest.cpp       | 221 +++++++++++++++++++++++
 13 files changed, 1171 insertions(+), 5 deletions(-)

diff --git a/docs/cn/coroutine.md b/docs/cn/coroutine.md
new file mode 100644
index 00000000..cdce5b9e
--- /dev/null
+++ b/docs/cn/coroutine.md
@@ -0,0 +1,193 @@
+# C++20 协程支持
+
+bRPC 支持 C++20 协程说明文档。
+
+> 注:该功能是实验性的,请勿在生产环境下使用。
+
+## 使用说明
+
+### 适用场景
+
+C++协程适用于极高并发的场景。由于bthread使用了mmap,存在系统限制,一个进程bthread数量一般最多到万级别,如果采用同步方式用一个bthread来处理一个请求,那么请求的并发度也只能到万级别。如果采用异步方式来写代码,可以达到更高的并发,但又会导致代码难以维护。这时我们就可以使用C++协程,以类似同步的方式来写代码,而达到异步的性能效果。
+
+### 使用前提
+
+1. 需要使用支持c++20的编译器,如gcc 11
+2. 需要编译选项中加上 `-std=c++20`
+
+### 简单示例
+
+以下例子显示了如何在bRPC中启动一个C++20协程,在协程中发起 RPC调用,并等待返回结果。
+
+```cpp
+#include <brpc/channel.h>
+#include <brpc/coroutine.h>
+
+// 协程函数的返回类型,需要是brpc::experimental::Awaitable<T>
+// T是函数返回的实际数据类型
+brpc::experimental::Awaitable<int> RpcCall(brpc::Channel& channel) {
+    EchoRequest request;
+    EchoResponse response;
+    EchoService_Stub stub(&_channel);
+    brpc::Controller cntl;
+    brpc::experimental::AwaitableDone done;
+    stub.Echo(&cntl, &request, &response, &done);
+    // 等待RPC返回结果
+    co_await done.awaitable();
+    // 返回数据,注意这里用co_return而不是return
+    // 因为函数返回值类型是brpc::experimental::Awaitable<int>而不是int
+    co_return cntl.ErrorCode();
+}
+
+brpc::experimental::Awaitable<void> CoroutineMain(const char* server) {
+    brpc::Channel channel;
+    channel.Init(server, NULL);
+    // co_await会从Awaitable<int>得到int类型的返回值
+    int code = co_await RpcCall(channel);
+    printf("Rpc result:%d\n", code);
+}
+
+int main() {
+    // 启动协程
+    brpc::experimental::Coroutine coro(CoroutineMain("127.0.0.1:8080"));
+    // 等待协程执行完成
+    coro.join();
+    return 0;
+}
+```
+
+更完整的例子可以查看源码中的`example/coroutine/coroutine_server.cpp`文件。
+
+### 更多用法
+
+1. 在非协程环境下等待一个协程执行完成:
+
+```cpp
+brpc::experimental::Coroutine coro(func(args));
+coro.join();
+```
+
+2. 在非协程环境下等待协程完成并获取返回值:
+
+```cpp
+brpc::experimental::Coroutine coro(func(args)); // func的返回值类型为Awaitable<int>
+int result = coro.join<int>();
+```
+
+3. 在协程环境下等待协程执行完成:
+
+```cpp
+brpc::experimental::Coroutine coro(func(args));
+... // 做一些其它事情
+co_await coro.awaitable();
+```
+
+4. 在协程环境下等待协程执行完成并获取返回值:
+
+```cpp
+brpc::experimental::Coroutine coro(func(args)); // func的返回值类型为Awaitable<int>
+... // 做一些其它事情
+int ret = co_await coro.awaitable<int>();
+```
+
+5. 在协程环境下sleep:
+```cpp
+co_await brpc::experimental::Coroutine::usleep(1000);
+```
+
+### 注意事项
+
+1. 
协程不保证一个函数的上下文都在同一个pthread或同一个bthread下执行。在co_await之后,代码所在的pthread或bthread可能发生变化,因此依赖于pthread或bthread的线程局部变量的代码(比如rpcz功能)将无法正确工作。
+2. 不应在协程中使用阻塞bthread(如bthread_join、同步RPC)或阻塞pthread的函数,否则可能导致死锁或者长耗时。
+3. 不要在不必要的地方使用协程,如下面的代码,虽然也能正常工作,但没有意义:
+
+```cpp
+brpc::experimental::Awaitable<int> inplace_func() {
+    co_return 123;
+}
+```
+
+### 实现极致性能
+
+如果确保服务的处理代码都运行在协程之中,并且没有任何阻塞bthread或阻塞pthread操作,则可以开启`usercode_in_coroutine`这个flag。开启这个flag之后,bRPC会简化服务端处理逻辑,减少不必要的bthread开销。在这种情况下,实际的工作线程数量将由event_dispatcher_num控制,而不再是由bthread
 worker数量控制。
+
+## 实现原理
+
+### C++20协程实现原理
+
+为了方便理解,我们把上面的CoroutineMain函数稍微改写一下,把co_await前后的逻辑分成两部分:
+
+```cpp
+brpc::experimental::Awaitable<void> CoroutineMain(const char* server) {
+    brpc::Channel channel;
+    channel.Init(server, NULL);
+    brpc::experimental::Awaitable<int> awaitable = RpcCall(channel);
+
+    int code = co_await awaitable;
+    printf("Rpc result:%d\n", code);
+}
+```
+
+上面的代码实际上是怎么执行的呢?当你使用co_await关键字的时候,编译器会把co_await后面的步骤转换成一个callback函数,把这个callback传给实际co_await的那个`Awaitable<T>`对象,比如上面的CoroutineMain函数,经过编译器转换后会变成大概如下的逻辑(简化版,实际要比这个复杂得多):
+
+```cpp
+
+brpc::experimental::Awaitable<void> CoroutineMain(const char* server) {
+    // 根据函数返回类型,找到Awaitable<void>的名为promise_type的子类
+    // 在函数的入口,创建一个promise_type类型的对象
+    auto promise = new brpc::experimental::Awaitable<void>::promise_type();
+    // 从promise对象中创建返回Awaitable对象
+    Awaitable<void> ret = promise->get_return_object();
+
+    // co_await之前的逻辑,保持不变
+    brpc::Channel channel;
+    channel.Init(server, NULL);
+    brpc::experimental::Awaitable<int> awaitable = RpcCall(channel);
+
+    // co_await的逻辑,转成一个await_suspend的函数调用,传入一个callback函数
+    awaitable.await_suspend([promise, &awaitable]() {
+        // co_await之后的逻辑,转移到callback函数中
+        int code = awaitable.await_resume();
+        printf("Rpc result:%d\n", code);
+        // 在final_suspend里面,会做一些唤醒调用者、资源释放的工作
+        promise->final_suspend();
+        delete promise;
+    })
+    // 返回Awaitable<void>对象,以便上层函数进行处理
+    return ret;
+}
+```
+
+也就是说,co_await就是一个语法转换器,把看似同步的代码转化成异步调用的代码,仅此而已。至于Awaitable类和promise类的具体实现,编译器就不关心了,这是基础库需要做的。比如在brpc中封装了brpc::experimental::Awaitable类和promise子类,实现了await_suspend/await_resume等逻辑,使协程可以正确的工作起来。
+
+### 原子等待操作
+
+上面我们看到的是一个中间函数,它co_await一个子函数返回的Awaitable对象,然后自己也返回一个Awaitable对象。这样层层调用一定有一个尽头,即原子等待操作,它会返回Awaitable对象,但是它内部不再有co_await/co_return这样的语句了。目前实现了3种原子等待操作,未来可以扩展更多。
+
+1. 等待RPC返回结果: `AwaitableDone::awaitable()`
+2. 等待sleep: `Coroutine::usleep()`
+3. 等待另一个协程完成: `Coroutine::awaitable()`
+
+下面是一个原子等待操作的示例实现,我们需要手动创建一个promise对象,设置set_needs_suspend(),然后发起一个异步调用(如bthread_timer_add),在回调函数里设置好返回值、调用promise->on_done(),最后根据promise返回Awaitable对象即可。
+
+```cpp
+inline Awaitable<int> Coroutine::usleep(int sleep_us) {
+    auto promise = new detail::AwaitablePromise<int>();
+    promise->set_needs_suspend();
+    bthread_timer_t timer;
+    auto abstime = butil::microseconds_from_now(sleep_us);
+    auto cb = [](void* p) {
+        auto promise = static_cast<detail::AwaitablePromise<int>*>(p);
+        promise->set_value(0);
+        promise->on_done();
+    };
+    bthread_timer_add(&timer, abstime, cb, promise);
+    return Awaitable<int>(promise);
+}
+```
+
+### 协程与多线程
+
+上面我们可以看到,协程本质上就是一种callback,和线程没有直接关系。它可以是单线程的,也可以是多线程的,这完全取决于它的原子等待操作里是怎么调用callback的。在bRPC的环境里,callback有可能从另一个pthread或bthread发起,所以协程也是需要考虑多线程问题。比如,有可能在调用co_await语句之前,要等待的事情就已经结束了,对于这种情况co_await应该立即返回。
+
+协程和线程可以一起使用,比如我们可以使用bthread将任务scale到多核,然后在任务内部的子任务用协程来实现异步化。
\ No newline at end of file
diff --git a/example/coroutine/Makefile b/example/coroutine/Makefile
new file mode 100644
index 00000000..2dd5631d
--- /dev/null
+++ b/example/coroutine/Makefile
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+LINK_SO=1
+NEED_GPERFTOOLS=0
+BRPC_PATH=../..
+include $(BRPC_PATH)/config.mk
+# Notes on the flags:
+# 1. Added -fno-omit-frame-pointer: perf/tcmalloc-profiler use frame pointers 
by default
+CXXFLAGS+=$(CPPFLAGS) -std=c++20 -DNDEBUG -O2 -pipe -W -Wall 
-Wno-unused-parameter -fPIC -fno-omit-frame-pointer
+#CXXFLAGS+= -fsanitize=address
+#STATIC_LINKINGS+= -Wl,-Bstatic -lasan -Wl,-Bdynamic
+ifeq ($(NEED_GPERFTOOLS), 1)
+       CXXFLAGS+=-DBRPC_ENABLE_CPU_PROFILER
+endif
+HDRS+=$(BRPC_PATH)/src
+#HDRS+=$(BRPC_PATH)/output/include
+LIBS+=$(BRPC_PATH)/output/lib
+
+HDRPATHS=$(addprefix -I, $(HDRS))
+LIBPATHS=$(addprefix -L, $(LIBS))
+COMMA=,
+SOPATHS=$(addprefix -Wl$(COMMA)-rpath$(COMMA), $(LIBS))
+
+SERVER_SOURCES = coroutine_server.cpp
+PROTOS = $(wildcard *.proto)
+
+PROTO_OBJS = $(PROTOS:.proto=.pb.o)
+PROTO_GENS = $(PROTOS:.proto=.pb.h) $(PROTOS:.proto=.pb.cc)
+SERVER_OBJS = $(addsuffix .o, $(basename $(SERVER_SOURCES))) 
+
+ifeq ($(SYSTEM),Darwin)
+ ifneq ("$(LINK_SO)", "")
+       STATIC_LINKINGS += -lbrpc
+ else
+       # *.a must be explicitly specified in clang
+       STATIC_LINKINGS += $(BRPC_PATH)/output/lib/libbrpc.a
+ endif
+       LINK_OPTIONS_SO = $^ $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS)
+       LINK_OPTIONS = $^ $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS)
+else ifeq ($(SYSTEM),Linux)
+       STATIC_LINKINGS += -lbrpc
+       LINK_OPTIONS_SO = -Xlinker "-(" $^ -Xlinker "-)" $(STATIC_LINKINGS) 
$(DYNAMIC_LINKINGS)
+       LINK_OPTIONS = -Xlinker "-(" $^ -Wl,-Bstatic $(STATIC_LINKINGS) 
-Wl,-Bdynamic -Xlinker "-)" $(DYNAMIC_LINKINGS)
+endif
+
+.PHONY:all
+all: coroutine_server
+
+.PHONY:clean
+clean:
+       @echo "> Cleaning"
+       rm -rf coroutine_server $(PROTO_GENS) $(PROTO_OBJS) $(SERVER_OBJS)
+
+coroutine_server:$(PROTO_OBJS) $(SERVER_OBJS)
+       @echo "> Linking $@"
+ifneq ("$(LINK_SO)", "")
+       $(CXX) $(LIBPATHS) $(SOPATHS) $(LINK_OPTIONS_SO) -o $@
+else
+       $(CXX) $(LIBPATHS) $(LINK_OPTIONS) -o $@
+endif
+
+%.pb.cc %.pb.h:%.proto
+       @echo "> Generating $@"
+       $(PROTOC) --cpp_out=. --proto_path=. $(PROTOC_EXTRA_ARGS) $<
+
+%.o:%.cpp
+       @echo "> Compiling $@"
+       $(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@
+
+%.o:%.cc
+       @echo "> Compiling $@"
+       $(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@
diff --git a/example/coroutine/coroutine_server.cpp 
b/example/coroutine/coroutine_server.cpp
new file mode 100644
index 00000000..9df50b04
--- /dev/null
+++ b/example/coroutine/coroutine_server.cpp
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// A server to receive EchoRequest and send back EchoResponse.
+
+#include <gflags/gflags.h>
+#include <butil/logging.h>
+#include <brpc/server.h>
+#include <brpc/channel.h>
+#include <brpc/coroutine.h>
+#include "echo.pb.h"
+
+DEFINE_int32(port, 8000, "TCP Port of this server");
+DEFINE_int32(sleep_us, 1000000, "Server sleep us");
+DEFINE_bool(enable_coroutine, true, "Enable coroutine");
+
+using brpc::experimental::Awaitable;
+using brpc::experimental::AwaitableDone;
+using brpc::experimental::Coroutine;
+
+namespace example {
+class EchoServiceImpl : public EchoService {
+public:
+    EchoServiceImpl() {
+        brpc::ChannelOptions options;
+        options.timeout_ms = FLAGS_sleep_us / 1000 * 2 + 100;
+        options.max_retry = 0;
+        CHECK(_channel.Init(butil::EndPoint(butil::IP_ANY, FLAGS_port), 
&options) == 0);
+    }
+
+    virtual ~EchoServiceImpl() {}
+
+    void Echo(google::protobuf::RpcController* cntl_base,
+              const EchoRequest* request,
+              EchoResponse* response,
+              google::protobuf::Closure* done) override {
+        // brpc::Controller* cntl =
+        //     static_cast<brpc::Controller*>(cntl_base);
+
+        if (FLAGS_enable_coroutine) {
+            Coroutine(EchoAsync(request, response, done), true);
+        } else {
+            brpc::ClosureGuard done_guard(done);
+            bthread_usleep(FLAGS_sleep_us);
+            response->set_message(request->message());
+        }
+    }
+
+    Awaitable<void> EchoAsync(const EchoRequest* request,
+               EchoResponse* response,
+               google::protobuf::Closure* done) {
+        brpc::ClosureGuard done_guard(done);
+        co_await Coroutine::usleep(FLAGS_sleep_us);
+        response->set_message(request->message());
+    }
+
+    void Proxy(google::protobuf::RpcController* cntl_base,
+               const EchoRequest* request,
+               EchoResponse* response,
+               google::protobuf::Closure* done) override {
+        // brpc::Controller* cntl =
+        //     static_cast<brpc::Controller*>(cntl_base);
+
+        if (FLAGS_enable_coroutine) {
+            Coroutine(ProxyAsync(request, response, done), true);
+        } else {
+            brpc::ClosureGuard done_guard(done);
+            EchoService_Stub stub(&_channel);
+            brpc::Controller cntl;
+            stub.Echo(&cntl, request, response, NULL);
+            if (cntl.Failed()) {
+                response->set_message(cntl.ErrorText());
+            }
+        }
+    }
+
+    Awaitable<void> ProxyAsync(const EchoRequest* request,
+                    EchoResponse* response,
+                    google::protobuf::Closure* done) {
+        brpc::ClosureGuard done_guard(done);
+        EchoService_Stub stub(&_channel);
+        brpc::Controller cntl;
+        AwaitableDone done2;
+        stub.Echo(&cntl, request, response, &done2);
+        co_await done2.awaitable();
+        if (cntl.Failed()) {
+            response->set_message(cntl.ErrorText());
+        }
+    }    
+
+private:
+    brpc::Channel _channel;
+};
+}  // namespace example
+
+int main(int argc, char* argv[]) {
+    bthread_setconcurrency(BTHREAD_MIN_CONCURRENCY);
+
+    // Parse gflags. We recommend you to use gflags as well.
+    GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
+    if (FLAGS_enable_coroutine) {
+        GFLAGS_NS::SetCommandLineOption("usercode_in_coroutine", "true");
+    }
+
+    // Generally you only need one Server.
+    brpc::Server server;
+
+    // Instance of your service.
+    example::EchoServiceImpl echo_service_impl;
+
+    // Add the service into server. Notice the second parameter, because the
+    // service is put on stack, we don't want server to delete it, otherwise
+    // use brpc::SERVER_OWNS_SERVICE.
+    if (server.AddService(&echo_service_impl, 
+                          brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
+        LOG(ERROR) << "Fail to add service";
+        return -1;
+    }
+
+    // Start the server.
+    brpc::ServerOptions options;
+    options.num_threads = BTHREAD_MIN_CONCURRENCY;
+    if (server.Start(FLAGS_port, &options) != 0) {
+        LOG(ERROR) << "Fail to start EchoServer";
+        return -1;
+    }
+
+    // Wait until Ctrl-C is pressed, then Stop() and Join() the server.
+    server.RunUntilAskedToQuit();
+    return 0;
+}
\ No newline at end of file
diff --git a/example/coroutine/echo.proto b/example/coroutine/echo.proto
new file mode 100644
index 00000000..ef5cc8ab
--- /dev/null
+++ b/example/coroutine/echo.proto
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax="proto2";
+package example;
+
+option cc_generic_services = true;
+
+message EchoRequest {
+      required string message = 1;
+};
+
+message EchoResponse {
+      required string message = 1;
+};
+
+service EchoService {
+      rpc Echo(EchoRequest) returns (EchoResponse);
+      rpc Proxy(EchoRequest) returns (EchoResponse);
+};
diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp
index f49a27a9..bfe278ff 100644
--- a/src/brpc/controller.cpp
+++ b/src/brpc/controller.cpp
@@ -125,6 +125,7 @@ const Controller* GetSubControllerOfSelectiveChannel(
     const RPCSender* sender, int index);
 
 DECLARE_bool(usercode_in_pthread);
+DECLARE_bool(usercode_in_coroutine);
 static const int MAX_RETRY_COUNT = 1000;
 static bvar::Adder<int64_t>* g_ncontroller = NULL;
 
@@ -684,7 +685,7 @@ void Controller::OnVersionedRPCReturned(const 
CompletionInfo& info,
     }
 
 END_OF_RPC:
-    if (new_bthread) {
+    if (new_bthread && !FLAGS_usercode_in_coroutine) {
         // [ Essential for -usercode_in_pthread=true ]
         // When -usercode_in_pthread is on, the reserved threads (set by
         // -usercode_backup_threads) may all block on bthread_id_lock in
diff --git a/src/brpc/coroutine.h b/src/brpc/coroutine.h
new file mode 100644
index 00000000..513f4022
--- /dev/null
+++ b/src/brpc/coroutine.h
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_COROUTINE_H
+#define BRPC_COROUTINE_H
+
+#if __cplusplus >= 202002L
+
+#define BRPC_ENABLE_COROUTINE 1
+
+#include <coroutine>
+#include <functional>
+#include <atomic>
+#include "brpc/callback.h"
+
+namespace brpc {
+namespace experimental {
+
+namespace detail {
+class AwaitablePromiseBase;
+template <typename T>
+class AwaitablePromise;
+}
+
+class AwaitableDone;
+class Coroutine;
+
+// WARN:The bRPC coroutine feature is experimental, DO NOT use in production 
environment!
+
+// Awaitable<T> is used as coroutine return type, for example:
+//  Awaitable<int> func1() {
+//      co_return 42;
+//  }
+//  Awaitable<std::string> func2() {
+//     int ret = co_await func1();
+//      co_return std::to_string(ret);
+//  }
+template <typename T>
+class Awaitable {
+public:
+    using promise_type = detail::AwaitablePromise<T>;
+
+    ~Awaitable() {}
+
+    // NOTE: compiler will generate calls to these functions automatically,
+    // DO NOT call them manually
+    bool await_ready();
+    template <typename U>
+    void await_suspend(std::coroutine_handle<detail::AwaitablePromise<U> > 
awaiting);
+    T await_resume();
+
+private:
+friend class detail::AwaitablePromise<T>;
+friend class AwaitableDone;
+friend class Coroutine;
+
+    Awaitable() = delete;
+    Awaitable(promise_type* p) : _promise(p) {}
+
+    promise_type* promise() {
+        return _promise;
+    }
+
+    promise_type* _promise;
+};
+
+// Utility for a coroutine to wait for RPC call. Usage:
+//    AwaitableDone done;
+//    stub.CallMethod(&cntl, &req, &resp, &done);
+//    co_await done.awaitable();
+// 
+class AwaitableDone : public google::protobuf::Closure {
+public:
+    AwaitableDone();
+    
+    void Run() override;
+
+    Awaitable<void>& awaitable() {
+        return _awaitable;
+    }
+private:
+    Awaitable<void> _awaitable;
+};
+
+// Class for management of coroutine
+// 1. To create a new coroutine and wait it finish:
+//  Awaitable<void> func(double val);
+//  
+//  int main() {
+//      Coroutine coro(func(1.0));
+//      coro.join();
+//  }
+// 2. To wait a coroutine in another coroutine:
+//  Awaitable<void> another_func() {
+//      Coroutine coro(func(1.0));
+//      co_await coro.awaitable<void>();
+//  }
+// 3. To create a detached coroutine without waiting:
+//  Coroutine coro(func(1.0), true);
+// 4. To sleep in a coroutine:
+//  co_await Coroutine::usleep(100);
+// 
+// NOTE: Inside coroutine function, DO NOT call pthread-blocking or 
+// bthread-blocking functions (eg. bthread_join(), bthread_usleep(), 
syncronized RPC),
+// otherwise may cause dead lock or long latency.
+class Coroutine {
+public:
+    template <typename T>
+    Coroutine(Awaitable<T>&& aw, bool detach = false);
+
+    ~Coroutine();
+
+    template <typename T = void>
+    T join();
+
+    template <typename T = void>
+    Awaitable<T> awaitable();
+
+    static Awaitable<int> usleep(int sleep_us);
+
+private:
+    detail::AwaitablePromiseBase* _promise{nullptr};
+    bool _waited{false};
+    std::atomic<int>* _butex{nullptr};
+};
+
+} // namespace experimental
+} // namespace brpc
+
+#include "brpc/coroutine_inl.h"
+
+#endif // __cplusplus >= 202002L
+
+#endif // BRPC_COROUTINE_H
\ No newline at end of file
diff --git a/src/brpc/coroutine_inl.h b/src/brpc/coroutine_inl.h
new file mode 100644
index 00000000..1ff40054
--- /dev/null
+++ b/src/brpc/coroutine_inl.h
@@ -0,0 +1,312 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_COROUTINE_INL_H
+#define BRPC_COROUTINE_INL_H
+
+#include "bthread/unstable.h"   // bthread_timer_add
+#include "bthread/butex.h"      // butex_wake/butex_wait
+
+namespace brpc {
+namespace experimental {
+
+namespace detail {
+
+class AwaitablePromiseBase {
+public:
+    AwaitablePromiseBase() {
+    }
+
+    virtual ~AwaitablePromiseBase() {
+        delete _suspended_or_done;
+    }
+
+    virtual void resume() = 0;
+    virtual void destroy() = 0;
+
+    bool needs_suspend() {
+        return _suspended_or_done != nullptr;
+    }
+
+    void set_needs_suspend() {
+        _suspended_or_done = new std::atomic<bool>();
+        _suspended_or_done->store(false);
+    }
+
+    // For a Coroutine's leaf function
+    // Its caller will be suspended, waiting for its done.
+    // But the suspend and done are always in different threads.
+    // It may suspend before done, or done before suspend.
+    // So we use an atomic<bool>, after first suspend_or_done() it will become 
true.
+    // Then the second suspend_or_done(), exchange(true) will returns true.
+    // Then we can safely delete this.
+    void suspend_or_done() {
+        if (_suspended_or_done->exchange(true)) {
+            // Already suspend AND done
+            if (_caller) {
+                // The leaf function has finished, resume its caller.
+                _caller->resume();
+            }
+            delete this;
+        }
+    }
+
+    void on_suspend() { suspend_or_done(); }
+    void on_done() { suspend_or_done(); }
+
+    void set_callback(std::function<void()> cb) {
+        _callback = cb;
+    }
+
+    void set_caller(AwaitablePromiseBase* caller) {
+        _caller = caller;
+    }
+
+    // When the coroutine function begins, initial_suspend() will be called
+    auto initial_suspend() {
+        // Always suspend the function, later resume() will make it start to 
run
+        return std::suspend_always{};
+    }
+
+    // When the coroutine function throws unhandled exception, 
unhandled_exception() will be called
+    void unhandled_exception() {
+        LOG(ERROR) << "Coroutine throws unhandled exception!";
+        std::exit(1);
+    }
+
+    // When the coroutine function ends, final_suspend() will be called
+    auto final_suspend() noexcept {
+        if (_caller) {
+            // The caller is waiting for this function to return
+            // Now it can be resumed
+            _caller->resume();
+        }
+        if (_callback) {
+            _callback();
+            _callback = nullptr;
+        }
+        // Returns suspend_never{} so that the coroutine will be destroyed and 
the AwaitablePromise be deleted.
+        // DO NOT call destroy() here, which will cause double destruct of 
RAII objects.
+        // DO NOT call delete this here, which will cause malloc and free not 
match.
+        return std::suspend_never{};
+    }
+
+private:
+    // For a Coroutine's root function, it needs a callback to notify its 
waiter
+    std::function<void()> _callback;
+    // For a Coroutine's leaf function, it is always resumed from another 
thread.
+    // It needs an atomic variable to keep thread safety.
+    // Non-leaf function does't need this, so we defined it as an optional 
pointer.
+    std::atomic<bool>* _suspended_or_done{nullptr};
+    // For a Coroutine's non-root function, it needs to resume its caller when 
it finished.
+    AwaitablePromiseBase* _caller{nullptr};
+};
+
+template <typename T>
+class AwaitablePromise : public AwaitablePromiseBase {
+public:
+    T value() {
+        return _value;
+    }
+
+    void set_value(T value) {
+        _value = value;
+    }
+
+    void resume() override {
+        _coro.resume();
+    }
+
+    void destroy() override {
+        _coro.destroy();
+    }
+
+    // When we call a coroutine function, an AwaitablePromise<T> will be 
created.
+    // Then call its get_return_object() to return an Awaitable<T>.
+    auto get_return_object() {
+        _coro = std::coroutine_handle<AwaitablePromise>::from_promise(*this);
+        return Awaitable<T>(this);
+    }
+
+    // When we call co_return in a function, return_value() will be called.
+    auto return_value(T v) {
+        _value = v;
+        return std::suspend_never{};
+    }
+
+private:
+    T _value;
+    std::coroutine_handle<AwaitablePromise> _coro;
+};
+
+template <>
+class AwaitablePromise<void> : public AwaitablePromiseBase {
+public:
+    void resume() override {
+        _coro.resume();
+    }
+
+    void destroy() override {
+        _coro.destroy();
+    }
+
+    // When we call a coroutine function, an AwaitablePromise<void> will be 
created.
+    // Then call its get_return_object() to return an Awaitable<void>.
+    auto get_return_object() {
+        _coro = std::coroutine_handle<AwaitablePromise>::from_promise(*this);
+        return Awaitable<void>(this);
+    }
+
+    // When we call return in a coroutine function, return_value() will be 
called.
+    auto return_value() {
+        return std::suspend_never{};
+    }
+
+private:
+    std::coroutine_handle<AwaitablePromise> _coro;
+};
+
+} // namespace detail
+
+// When co_await an Awaitable<T>, await_ready() will be called automatically.
+template <typename T>
+inline bool Awaitable<T>::await_ready() {
+    // Always returns false so that the caller will be suspended at the 
co_await point.
+    return false;
+}
+
+// If await_ready returns false, await_suspend() will be called automatically.
+template <typename T>
+template <typename U>
+inline void 
Awaitable<T>::await_suspend(std::coroutine_handle<detail::AwaitablePromise<U> > 
awaiting) {
+    _promise->set_caller(&awaiting.promise());
+    if (_promise->needs_suspend()) {
+        _promise->on_suspend();
+        return;
+    }
+    _promise->resume();
+}
+
+// When the caller resumes from co_await, await_resume() will be called to get 
return value
+template <typename T>
+inline T Awaitable<T>::await_resume() {
+    if constexpr (!std::is_same<T, void>::value) {
+        return _promise->value();
+    }
+}
+
+inline AwaitableDone::AwaitableDone()
+    : _awaitable(new detail::AwaitablePromise<void>) {
+    _awaitable.promise()->set_needs_suspend();
+}
+    
+inline void AwaitableDone::Run() {
+    _awaitable.promise()->on_done();
+}
+
+template <typename T>
+inline Coroutine::Coroutine(Awaitable<T>&& aw, bool detach) {
+    detail::AwaitablePromise<T>* origin_promise = aw.promise();
+    CHECK(origin_promise);
+
+    if (!detach) {
+        // Create butex for join()
+        _butex = bthread::butex_create_checked<std::atomic<int> >();
+        _butex->store(0);
+
+        // Create AwaitablePromise for awaitable()
+        _promise = new detail::AwaitablePromise<T>();
+        _promise->set_needs_suspend();
+
+        auto cb = [this, origin_promise]() {
+            if constexpr (!std::is_same<T, void>::value) {
+                
dynamic_cast<detail::AwaitablePromise<T>*>(_promise)->set_value(origin_promise->value());
+            }
+            // wakeup join()
+            _butex->store(1);
+            bthread::butex_wake(_butex);
+
+            // wakeup co_await on awaitable()
+            _promise->on_done();
+        };
+        origin_promise->set_callback(cb);
+    }
+
+    // Start to run the coroutine
+    origin_promise->resume();
+}
+
+inline Coroutine::~Coroutine() {
+    if (_promise != nullptr && !_waited) {
+        join();
+    }
+    if (_butex) {
+        bthread::butex_destroy(_butex);
+        _butex = nullptr;
+    }
+}
+
+template <typename T>
+inline T Coroutine::join() {
+    CHECK(_promise != nullptr) << "join() can not be called to detached 
coroutine!";
+    CHECK(_waited == false) << "awaitable() or join() can only be called 
once!";
+    _waited = true;
+    bthread::butex_wait(_butex, 0, nullptr);
+    if constexpr (!std::is_same<T, void>::value) {
+        auto promise = dynamic_cast<detail::AwaitablePromise<T>*>(_promise);
+        CHECK(promise != nullptr) << "join type not match";
+        T ret = promise->value();
+        _promise->on_suspend();
+        return ret;
+    } else {
+        _promise->on_suspend();
+    }
+}
+
+template <typename T>
+inline Awaitable<T> Coroutine::awaitable() {
+    CHECK(_promise != nullptr) << "awaitable() can not be called to detached 
coroutine!";
+    CHECK(_waited == false) << "awaitable() or join() can only be called 
once!";
+    auto promise = dynamic_cast<detail::AwaitablePromise<T>*>(_promise);
+    CHECK(promise != nullptr) << "awaitable type not match";
+    _waited = true;
+    return Awaitable<T>(promise);
+}
+
+// NOTE: the caller will be resumed on bthread timer thread,
+// bthread only have one timer thread, this may be performance bottle-neck
+inline Awaitable<int> Coroutine::usleep(int sleep_us) {
+    auto promise = new detail::AwaitablePromise<int>();
+    promise->set_needs_suspend();
+    bthread_timer_t timer;
+    auto abstime = butil::microseconds_from_now(sleep_us);
+    auto cb = [](void* p) {
+        auto promise = static_cast<detail::AwaitablePromise<int>*>(p);
+        promise->set_value(0);
+        promise->on_done();
+    };
+    if (bthread_timer_add(&timer, abstime, cb, promise) != 0) {
+        promise->set_value(-1);
+        promise->on_done();
+    }
+    return Awaitable<int>(promise);
+}
+
+} // namespace experimental
+} // namespace brpc
+
+#endif // BRPC_COROUTINE_INL_H
\ No newline at end of file
diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher.cpp
index f747206a..689e80da 100644
--- a/src/brpc/event_dispatcher.cpp
+++ b/src/brpc/event_dispatcher.cpp
@@ -33,6 +33,8 @@ DEFINE_int32(event_dispatcher_num, 1, "Number of event 
dispatcher");
 
 DEFINE_bool(usercode_in_pthread, false, 
             "Call user's callback in pthreads, use bthreads otherwise");
+DEFINE_bool(usercode_in_coroutine, false,
+            "User's callback are run in coroutine, no bthread or pthread 
blocking call");
 
 static EventDispatcher* g_edisp = NULL;
 static pthread_once_t g_edisp_once = PTHREAD_ONCE_INIT;
diff --git a/src/brpc/input_messenger.cpp b/src/brpc/input_messenger.cpp
index e619af74..3f740b1c 100644
--- a/src/brpc/input_messenger.cpp
+++ b/src/brpc/input_messenger.cpp
@@ -66,6 +66,7 @@ DEFINE_int32(socket_keepalive_count, -1,
              "Set number of keepalives of sockets before close if this value 
is positive");
 
 DECLARE_bool(usercode_in_pthread);
+DECLARE_bool(usercode_in_coroutine);
 DECLARE_uint64(max_body_size);
 
 const size_t MSG_SIZE_WINDOW = 10;  // Take last so many message into stat.
@@ -195,7 +196,7 @@ static void QueueMessage(InputMessageBase* to_run_msg,
                           BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
     tmp.keytable_pool = keytable_pool;
     tmp.tag = bthread_self_tag();
-    if (bthread_start_background(
+    if (!FLAGS_usercode_in_coroutine && bthread_start_background(
             &th, &tmp, ProcessInputMessage, to_run_msg) == 0) {
         ++*num_bthread_created;
     } else {
diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp
index 2a391f35..9248dd18 100644
--- a/src/brpc/socket.cpp
+++ b/src/brpc/socket.cpp
@@ -100,6 +100,7 @@ DEFINE_int32(connect_timeout_as_unreachable, 3,
              "fails the main socket as well when this socket is pooled.");
 
 DECLARE_int32(health_check_timeout_ms);
+DECLARE_bool(usercode_in_coroutine);
 
 static bool validate_connect_timeout_as_unreachable(const char*, int32_t v) {
     return v >= 2 && v < 1000/*large enough*/;
@@ -2197,7 +2198,9 @@ int Socket::StartInputEvent(SocketId id, uint32_t events,
         bthread_attr_t attr = thread_attr;
         attr.keytable_pool = p->_keytable_pool;
         attr.tag = bthread_self_tag();
-        if (bthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) {
+        if (FLAGS_usercode_in_coroutine) {
+            ProcessEvent(p);
+        } else if (bthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) {
             LOG(FATAL) << "Fail to start ProcessEvent";
             ProcessEvent(p);
         }
diff --git a/src/butil/containers/stack_container.h 
b/src/butil/containers/stack_container.h
index 111e7e5e..5679ab86 100644
--- a/src/butil/containers/stack_container.h
+++ b/src/butil/containers/stack_container.h
@@ -36,8 +36,14 @@ namespace butil {
 template<typename T, size_t stack_capacity>
 class StackAllocator : public std::allocator<T> {
  public:
-  typedef typename std::allocator<T>::pointer pointer;
-  typedef typename std::allocator<T>::size_type size_type;
+#if __cplusplus >= 202002L
+  typedef typename std::allocator_traits<std::allocator<T> > Allocator;
+#else
+  typedef typename std::allocator<T> Allocator;
+#endif
+
+  typedef typename Allocator::pointer pointer;
+  typedef typename Allocator::size_type size_type;
 
   // Backing store for the allocator. The container owner is responsible for
   // maintaining this for as long as any containers using this allocator are
@@ -109,7 +115,12 @@ class StackAllocator : public std::allocator<T> {
       source_->used_stack_buffer_ = true;
       return source_->stack_buffer();
     } else {
+#if __cplusplus >= 202002L
+      (void)hint;
+      return std::allocator<T>::allocate(n);
+#else
       return std::allocator<T>::allocate(n, hint);
+#endif
     }
   }
 
diff --git a/src/butil/type_traits.h b/src/butil/type_traits.h
index 4f67fcca..5f342db3 100644
--- a/src/butil/type_traits.h
+++ b/src/butil/type_traits.h
@@ -92,7 +92,15 @@ template <typename T> struct is_pointer : false_type {};
 template <typename T> struct is_pointer<T*> : true_type {};
 
 #if defined(BUTIL_CXX11_ENABLED)
+
+#if __cplusplus >= 202002L
+template <class T> struct is_pod
+: integral_constant<bool, (std::is_standard_layout<T>::value &&
+                           std::is_trivial<T>::value)> {};
+#else
 template <class T> struct is_pod : std::is_pod<T> {};
+#endif
+
 #else
 // We can't get is_pod right without compiler help, so fail conservatively.
 // We will assume it's false except for arithmetic types, enumerations,
diff --git a/test/brpc_coroutine_unittest.cpp b/test/brpc_coroutine_unittest.cpp
new file mode 100644
index 00000000..b89c1408
--- /dev/null
+++ b/test/brpc_coroutine_unittest.cpp
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include "brpc/server.h"
+#include "brpc/channel.h"
+#include "brpc/coroutine.h"
+#include "echo.pb.h"
+
+int main(int argc, char* argv[]) {
+#ifdef BRPC_ENABLE_COROUTINE
+    testing::InitGoogleTest(&argc, argv);
+    GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
+    return RUN_ALL_TESTS();
+#else
+    printf("bRPC coroutine is not enabled, please add -std=c++20 to compile 
options\n");
+    return 0;
+#endif
+}
+
+#ifdef BRPC_ENABLE_COROUTINE
+
+using brpc::experimental::Awaitable;
+using brpc::experimental::AwaitableDone;
+using brpc::experimental::Coroutine;
+
+class Trace {
+public:
+    Trace(const std::string& name) {
+        _name = name;
+        LOG(INFO) << "enter " << name;
+    }
+    ~Trace() {
+        LOG(INFO) << "exit " << _name;
+    }
+private:
+    std::string _name;
+};
+
+class EchoServiceImpl : public test::EchoService {
+public:
+    EchoServiceImpl() {}
+    virtual ~EchoServiceImpl() {}
+    virtual void Echo(google::protobuf::RpcController* cntl_base,
+                      const test::EchoRequest* request,
+                      test::EchoResponse* response,
+                      google::protobuf::Closure* done) {
+        // brpc::Controller* cntl = (brpc::Controller*)cntl_base;
+        // brpc::ClosureGuard done_guard(done);
+        // response->set_message(request->message());
+
+        // Create a detached coroutine, so the current bthread will return at 
once.
+        Coroutine(EchoAsync(request, response, done), true);
+    }
+
+    Awaitable<void> EchoAsync(const test::EchoRequest* request,
+                                   test::EchoResponse* response,
+                                   google::protobuf::Closure* done) {
+        Trace t("EchoAsync");
+        // This is important to test RAII object's destruction after coroutine 
finished
+        brpc::ClosureGuard done_guard(done);
+        if (request->has_sleep_us()) {
+            LOG(INFO) << "sleep " << request->sleep_us() << " us at server 
side";
+            co_await Coroutine::usleep(request->sleep_us());
+        }
+        response->set_message(request->message());
+    }
+};
+
+class CoroutineTest : public ::testing::Test{
+protected:
+    CoroutineTest() {};
+    virtual ~CoroutineTest(){};
+    virtual void SetUp() {};
+    virtual void TearDown() {};
+};
+
+
+static int delay_us = 0;
+
+Awaitable<std::string> inplace_func(const std::string& input) {
+    Trace t("inplace_func");
+    co_return input;
+}
+
+Awaitable<double> inplace_func2() {
+    Trace t("inplace_func2");
+    co_await inplace_func("123");
+    co_return 0.5;
+}
+
+Awaitable<int> sleep_func() {
+    Trace t("sleep_func");
+    int64_t s = butil::monotonic_time_us();
+    auto aw = Coroutine::usleep(1000);
+    usleep(delay_us);
+    co_await aw;
+    int cost = butil::monotonic_time_us() - s;
+    EXPECT_GE(cost, 1000);
+    LOG(INFO) << "after usleep:" << cost;
+    co_return 123;
+}
+
+Awaitable<float> exception_func() {
+    Trace t("exception_func");
+    throw std::string("error");
+}
+
+Awaitable<void> func(brpc::Channel& channel, int* out) {
+    Trace t("func");
+    test::EchoService_Stub stub(&channel);
+    test::EchoRequest request;
+    request.set_message("hello world");
+    test::EchoResponse response;
+    brpc::Controller cntl;
+
+    LOG(INFO) << "before start coroutine";
+    Coroutine coro(sleep_func());
+    usleep(delay_us);
+    LOG(INFO) << "before wait coroutine";
+    int ret = co_await coro.awaitable<int>();
+    EXPECT_EQ(ret, 123);
+    LOG(INFO) << "after wait coroutine, ret:" << ret;
+
+    auto str = co_await inplace_func("hello");
+    EXPECT_EQ("hello", str);
+
+    float num = 0.0;
+    try {
+        num = co_await exception_func();
+    } catch(std::string str) {
+        EXPECT_EQ("error", str);
+        num = 1.0;
+    }
+    EXPECT_EQ(1.0, num);
+
+    AwaitableDone done;
+    LOG(INFO) << "start echo";
+    stub.Echo(&cntl, &request, &response, &done);
+    LOG(INFO) << "after echo";
+    usleep(delay_us);
+    co_await done.awaitable();
+    LOG(INFO) << "after wait";
+    EXPECT_FALSE(cntl.Failed()) << cntl.ErrorText();
+    EXPECT_EQ("hello world", response.message());
+
+    cntl.Reset();
+    request.set_sleep_us(2000);
+    AwaitableDone done2;
+    LOG(INFO) << "start echo2";
+    int64_t s = butil::monotonic_time_us();
+    stub.Echo(&cntl, &request, &response, &done2);
+    LOG(INFO) << "after echo2";
+    co_await done2.awaitable();
+    int cost = butil::monotonic_time_us() - s;
+    LOG(INFO) << "after wait2";
+    EXPECT_GE(cost, 2000);
+    EXPECT_FALSE(cntl.Failed()) << cntl.ErrorText();
+    EXPECT_EQ("hello world", response.message());
+
+    *out = 456;
+}
+
+TEST_F(CoroutineTest, coroutine) {
+    butil::EndPoint ep;
+    ASSERT_EQ(0, str2endpoint("127.0.0.1:8613", &ep));
+
+    brpc::Server server;
+    EchoServiceImpl service;
+    server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE);
+    ASSERT_EQ(0, server.Start(ep, NULL));
+
+    brpc::Channel channel;
+    brpc::ChannelOptions options;
+    ASSERT_EQ(0, channel.Init(ep, &options));
+
+    int out = 0;
+    Coroutine coro(func(channel, &out));
+    coro.join();
+    ASSERT_EQ(456, out);
+
+    out = 0;
+    delay_us = 10000;
+    Coroutine coro2(func(channel, &out));
+    coro2.join();
+    ASSERT_EQ(456, out);
+    delay_us = 0;
+
+    Coroutine coro3(inplace_func2());
+    double d = coro3.join<double>();
+    ASSERT_EQ(0.5, d);
+
+    Coroutine coro4(inplace_func("abc"));
+    coro4.join();
+
+    Coroutine coro5(sleep_func());
+    coro5.join();
+
+    Coroutine coro6(inplace_func2(), true);
+    Coroutine coro7(inplace_func("abc"), true);
+    Coroutine coro8(sleep_func(), true);
+    usleep(10000); // wait sleep_func() to complete
+
+    LOG(INFO) << "test case finished";
+}
+
+#endif // BRPC_ENABLE_COROUTINE
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org


Reply via email to