This is an automated email from the ASF dual-hosted git repository.

guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 923d4137 Support segment large stream messages automatically (#2889)
923d4137 is described below

commit 923d4137fe98a0f43e4df8820a7bcfd372544914
Author: Jenrry You <[email protected]>
AuthorDate: Mon Feb 17 12:58:13 2025 +0800

    Support segment large stream messages automatically (#2889)
---
 docs/cn/streaming_rpc.md             |  3 +-
 docs/en/streaming_rpc.md             |  4 +--
 src/brpc/stream.cpp                  | 64 ++++++++++++++++++++++++++++++------
 test/brpc_streaming_rpc_unittest.cpp | 55 +++++++++++++++++++++++++++++++
 4 files changed, 112 insertions(+), 14 deletions(-)

diff --git a/docs/cn/streaming_rpc.md b/docs/cn/streaming_rpc.md
index c9f9ab36..6bdb2f29 100644
--- a/docs/cn/streaming_rpc.md
+++ b/docs/cn/streaming_rpc.md
@@ -16,8 +16,7 @@ Streaming RPC保证:
 - 全双工。
 - 支持流控。
 - 提供超时提醒
-
-目前的实现还没有自动切割过大的消息,同一个tcp连接上的多个Stream之间可能有[Head-of-line 
blocking](https://en.wikipedia.org/wiki/Head-of-line_blocking)问题,请尽量避免过大的单个消息,实现自动切割后我们会告知并更新文档。
+- 支持自动切割过大的消息,避免[Head-of-line 
blocking](https://en.wikipedia.org/wiki/Head-of-line_blocking)问题
 
 
例子见[example/streaming_echo_c++](https://github.com/apache/brpc/tree/master/example/streaming_echo_c++/)。
 
diff --git a/docs/en/streaming_rpc.md b/docs/en/streaming_rpc.md
index 8e067148..7a41c24d 100644
--- a/docs/en/streaming_rpc.md
+++ b/docs/en/streaming_rpc.md
@@ -16,8 +16,8 @@ Streaming RPC ensures/provides:
 - Full duplex
 - Flow control
 - Notification on timeout
-
-We do not support segment large messages automatically so that multiple 
Streams on a single TCP connection may lead to [Head-of-line 
blocking](https://en.wikipedia.org/wiki/Head-of-line_blocking) problem. Please 
avoid putting huge data into single message until we provide automatic 
segmentation.
+- We support segment large messages automatically to avoid [Head-of-line 
blocking](https://en.wikipedia.org/wiki/Head-of-line_bloc
+king) problem.
 
 For examples please refer to 
[example/streaming_echo_c++](https://github.com/apache/brpc/tree/master/example/streaming_echo_c++/).
 
diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp
index 73d64051..68397b57 100644
--- a/src/brpc/stream.cpp
+++ b/src/brpc/stream.cpp
@@ -36,6 +36,9 @@ namespace brpc {
 
 DECLARE_bool(usercode_in_pthread);
 DECLARE_int64(socket_max_streams_unconsumed_bytes);
+DEFINE_uint64(stream_write_max_segment_size, 512 * 1024 * 1024,
+              "Stream message exceeding this size will be automatically split 
into smaller segments");
+BRPC_VALIDATE_GFLAG(stream_write_max_segment_size, PositiveInteger);
 
 const static butil::IOBuf *TIMEOUT_TASK = (butil::IOBuf*)-1L;
 
@@ -60,6 +63,11 @@ Stream::Stream()
 }
 
 Stream::~Stream() {
+    // Clear pending buffer
+    if (_pending_buf != NULL) {
+        delete _pending_buf;
+        _pending_buf = NULL;
+    }
     CHECK(_host_socket == NULL);
     bthread_mutex_destroy(&_connect_mutex);
     bthread_mutex_destroy(&_congestion_control_mutex);
@@ -154,18 +162,54 @@ ssize_t Stream::CutMessageIntoFileDescriptor(int /*fd*/,
     }
     butil::IOBuf out;
     ssize_t len = 0;
+    ssize_t unwritten_data_size = 0;
     for (size_t i = 0; i < size; ++i) {
-        StreamFrameMeta fm;
-        fm.set_stream_id(_remote_settings.stream_id());
-        fm.set_source_stream_id(id());
-        fm.set_frame_type(FRAME_TYPE_DATA);
-        // TODO: split large data
-        fm.set_has_continuation(false);
-        policy::PackStreamMessage(&out, fm, data_list[i]);
-        len += data_list[i]->length();
-        data_list[i]->clear();
+        butil::IOBuf *data = data_list[i];
+        size_t length = data->length();
+        if (length > FLAGS_stream_write_max_segment_size) {
+            if (unwritten_data_size) {
+                WriteToHostSocket(&out);
+                unwritten_data_size = 0;
+                out.clear();
+            }
+            // segmenting large data into multiple parts
+            butil::IOBuf segment_buf;
+            bool has_continuation = true;
+            while (has_continuation) {
+                data->cutn(&segment_buf, FLAGS_stream_write_max_segment_size);
+                StreamFrameMeta fm;
+                fm.set_stream_id(_remote_settings.stream_id());
+                fm.set_source_stream_id(id());
+                fm.set_frame_type(FRAME_TYPE_DATA);
+                has_continuation = !data->empty();
+                fm.set_has_continuation(has_continuation);
+                policy::PackStreamMessage(&out, fm, &segment_buf);
+                len += segment_buf.length();
+                segment_buf.clear();
+                WriteToHostSocket(&out);
+                out.clear();
+            }
+        } else {
+            if (unwritten_data_size + length > 
FLAGS_stream_write_max_segment_size) {
+                WriteToHostSocket(&out);
+                unwritten_data_size = 0;
+                out.clear();
+            }
+            unwritten_data_size += length;
+            StreamFrameMeta fm;
+            fm.set_stream_id(_remote_settings.stream_id());
+            fm.set_source_stream_id(id());
+            fm.set_frame_type(FRAME_TYPE_DATA);
+            fm.set_has_continuation(false);
+            policy::PackStreamMessage(&out, fm, data_list[i]);
+            len += length;
+            data_list[i]->clear();
+        }
+    }
+
+    if (!out.empty()) {
+        WriteToHostSocket(&out);
     }
-    WriteToHostSocket(&out);
     return len;
 }
 
diff --git a/test/brpc_streaming_rpc_unittest.cpp 
b/test/brpc_streaming_rpc_unittest.cpp
index df6a37d8..b0dd4a39 100644
--- a/test/brpc_streaming_rpc_unittest.cpp
+++ b/test/brpc_streaming_rpc_unittest.cpp
@@ -24,6 +24,7 @@
 
 #include "brpc/controller.h"
 #include "brpc/channel.h"
+#include "brpc/socket.h"
 #include "brpc/stream_impl.h"
 #include "brpc/policy/streaming_rpc_protocol.h"
 #include "echo.pb.h"
@@ -577,3 +578,57 @@ TEST_F(StreamingRpcTest, server_send_data_before_run_done) 
{
     ASSERT_FALSE(handler.failed());
     ASSERT_EQ(0, handler.idle_times());
 }
+
+TEST_F(StreamingRpcTest, segment_stream_data_automatically) {
+    GFLAGS_NAMESPACE::SetCommandLineOption("stream_write_max_segment_size", 
"1");
+    OrderedInputHandler handler;
+    brpc::StreamOptions opt;
+    opt.handler = &handler;
+    opt.messages_in_batch = 100;
+    brpc::Server server;
+    MyServiceWithStream service(opt);
+    ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
+    ASSERT_EQ(0, server.Start(9007, NULL));
+    brpc::Channel channel;
+    ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL));
+    brpc::Controller cntl;
+    brpc::StreamId request_stream;
+    brpc::StreamOptions request_stream_options;
+    ASSERT_EQ(0, StreamCreate(&request_stream, cntl, &request_stream_options));
+    brpc::ScopedStream stream_guard(request_stream);
+    test::EchoService_Stub stub(&channel);
+    stub.Echo(&cntl, &request, &response, NULL);
+    ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText() << " request_stream=" << 
request_stream;
+    const int N = 1000;
+    for (int i = 0; i < N; ++i) {
+        int network = htonl(i);
+        butil::IOBuf out;
+        out.append(&network, sizeof(network));
+        ASSERT_EQ(0, brpc::StreamWrite(request_stream, out)) << "i=" << i;
+    }
+
+    brpc::SocketUniquePtr host_socket_ptr;
+    {
+      brpc::SocketUniquePtr ptr;
+      ASSERT_EQ(0, brpc::Socket::Address(request_stream, &ptr));
+      brpc::Stream *s = (brpc::Stream *)ptr->conn();
+      ASSERT_TRUE(s->_host_socket != NULL);
+      s->_host_socket->ReAddress(&host_socket_ptr);
+    }
+
+    ASSERT_EQ(0, brpc::StreamClose(request_stream));
+    server.Stop(0);
+    server.Join();
+    while (!handler.stopped()) {
+        usleep(100);
+    }
+    const int64_t now_ms = butil::cpuwide_time_ms();
+    host_socket_ptr->UpdateStatsEverySecond(now_ms);
+    brpc::SocketStat stat;
+    host_socket_ptr->GetStat(&stat);
+    ASSERT_LT(N * sizeof(N), stat.out_num_messages_m);
+    ASSERT_FALSE(handler.failed());
+    ASSERT_EQ(0, handler.idle_times());
+    ASSERT_EQ(N, handler._expected_next_value);
+    GFLAGS_NAMESPACE::SetCommandLineOption("stream_write_max_segment_size", 
"536870912");
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to