This is an automated email from the ASF dual-hosted git repository.

huixxi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 283ac462 add HTTP server-sent-events(SSE) example in brpc http server
     new aa0f052e Merge pull request #2375 from thorneliu/master
283ac462 is described below

commit 283ac46212f2df9dba0d5a4e57db8e835b2402bd
Author: thorneliu <[email protected]>
AuthorDate: Thu Sep 7 23:07:22 2023 +0800

    add HTTP server-sent-events(SSE) example in brpc http server
---
 docs/cn/http_service.md          |  2 ++
 docs/en/http_service.md          |  2 ++
 example/http_c++/http.proto      |  4 +++
 example/http_c++/http_server.cpp | 56 ++++++++++++++++++++++++++++++++++++++++
 4 files changed, 64 insertions(+)

diff --git a/docs/cn/http_service.md b/docs/cn/http_service.md
index d266c2f5..88bb46fc 100644
--- a/docs/cn/http_service.md
+++ b/docs/cn/http_service.md
@@ -347,6 +347,8 @@ brpc server支持发送超大或无限长的body。方法如下:
 
 3. 发送完毕后确保所有的`butil::intrusive_ptr<brpc::ProgressiveAttachment>`都析构以释放资源。
 
+另外,利用该特性可以轻松实现Server-Sent Events(SSE)服务,从而使客户端能够通过 HTTP 
连接从服务器自动接收更新。非常适合构建诸如chatGPT这类实时应用程序,应用例子详见[http_server.cpp](https://github.com/apache/brpc/blob/master/example/http_c++/http_server.cpp)中的HttpSSEServiceImpl。
+
 # 持续接收
 
 目前brpc server不支持在收齐http请求的header部分后就调用服务回调,即brpc server不适合接收超长或无限长的body。
diff --git a/docs/en/http_service.md b/docs/en/http_service.md
index d1f5c044..bf080ecb 100644
--- a/docs/en/http_service.md
+++ b/docs/en/http_service.md
@@ -348,6 +348,8 @@ brpc server is capable of sending large or infinite sized 
body, in following ste
 
 3. After usage, destruct all 
`butil::intrusive_ptr<brpc::ProgressiveAttachment>` to release related 
resources.
 
+In addition, we can easily implement Server-Sent Events(SSE) with this 
feature, which enables a client to receive automatic updates from a server via 
a HTTP connection. SSE could be used to build real-time applications such as 
chatGPT. Please refer to HttpSSEServiceImpl in 
[http_server.cpp](https://github.com/apache/brpc/blob/master/example/http_c++/http_server.cpp)
 for more details.
+
 # Progressive receiving
 
 Currently brpc server doesn't support calling the service callback once header 
part in the http request is parsed. In other words, brpc server is not suitable 
for receiving large or infinite sized body.
diff --git a/example/http_c++/http.proto b/example/http_c++/http.proto
index 9b5d2c0d..8581294f 100644
--- a/example/http_c++/http.proto
+++ b/example/http_c++/http.proto
@@ -37,3 +37,7 @@ service QueueService {
   rpc stop(HttpRequest) returns (HttpResponse);
   rpc getstats(HttpRequest) returns (HttpResponse);
 };
+
+service HttpSSEService {
+  rpc stream(HttpRequest) returns (HttpResponse);
+};
diff --git a/example/http_c++/http_server.cpp b/example/http_c++/http_server.cpp
index af373ce8..9a1595b9 100644
--- a/example/http_c++/http_server.cpp
+++ b/example/http_c++/http_server.cpp
@@ -151,6 +151,56 @@ public:
     }
 };
 
+class HttpSSEServiceImpl : public HttpSSEService {
+public:
+    HttpSSEServiceImpl() {}
+    virtual ~HttpSSEServiceImpl() {}
+
+    struct PredictJobArgs {
+        std::vector<uint32_t> input_ids;
+        butil::intrusive_ptr<brpc::ProgressiveAttachment> pa;
+    };
+
+    static void* Predict(void* raw_args) {
+        std::unique_ptr<PredictJobArgs> 
args(static_cast<PredictJobArgs*>(raw_args));
+        if (args->pa == NULL) {
+            LOG(ERROR) << "ProgressiveAttachment is NULL";
+            return NULL;
+        }
+        for (int i = 0; i < 100; ++i) {
+            char buf[48];
+            int len = snprintf(buf, sizeof(buf), "event: foo\ndata: Hello, 
world! (%d)\n\n", i);
+            args->pa->Write(buf, len);
+
+            // sleep a while to send another part.
+            bthread_usleep(10000 * 10);
+        }
+        return NULL;
+    }
+
+    void stream(google::protobuf::RpcController* cntl_base,
+                const HttpRequest*,
+                HttpResponse*,
+                google::protobuf::Closure* done) {
+        brpc::ClosureGuard done_guard(done);
+        brpc::Controller* cntl =
+            static_cast<brpc::Controller*>(cntl_base);
+
+        // Send the first SSE response
+        cntl->http_response().set_content_type("text/event-stream");
+        cntl->http_response().set_status_code(200);
+        cntl->http_response().SetHeader("Connection", "keep-alive");
+        cntl->http_response().SetHeader("Cache-Control", "no-cache");
+
+        // Send the generated words with progressiveAttachment
+        std::unique_ptr<PredictJobArgs> args(new PredictJobArgs);
+        args->pa = cntl->CreateProgressiveAttachment();
+        args->input_ids = {101, 102};
+        bthread_t th;
+        bthread_start_background(&th, NULL, Predict, args.release());
+    }
+};
+
 }  // namespace example
 
 int main(int argc, char* argv[]) {
@@ -163,6 +213,7 @@ int main(int argc, char* argv[]) {
     example::HttpServiceImpl http_svc;
     example::FileServiceImpl file_svc;
     example::QueueServiceImpl queue_svc;
+    example::HttpSSEServiceImpl sse_svc;
     
     // Add services into server. Notice the second parameter, because the
     // service is put on stack, we don't want server to delete it, otherwise
@@ -185,6 +236,11 @@ int main(int argc, char* argv[]) {
         LOG(ERROR) << "Fail to add queue_svc";
         return -1;
     }
+    if (server.AddService(&sse_svc,
+                          brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
+        LOG(ERROR) << "Fail to add sse_svc";
+        return -1;
+    }
 
     // Start the server.
     brpc::ServerOptions options;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to