jenrryyou commented on code in PR #2754:
URL: https://github.com/apache/brpc/pull/2754#discussion_r1764340657
##########
src/brpc/policy/baidu_rpc_protocol.cpp:
##########
@@ -266,7 +273,7 @@ void SendRpcResponse(int64_t correlation_id,
// Response_stream can be INVALID_STREAM_ID when error occurs.
if (SendStreamData(sock, &res_buf,
accessor.remote_stream_settings()->stream_id(),
- accessor.response_stream()) != 0) {
+ accessor.response_streams()[0]) != 0) {
Review Comment:
> 是不是用前面232行的`StreamId response_stream_id = response_stream_ids[0]`更好一点?
同上面的回复,我尝试加个判断
##########
src/brpc/policy/baidu_rpc_protocol.cpp:
##########
@@ -226,11 +228,16 @@ void SendRpcResponse(int64_t correlation_id,
meta.set_attachment_size(attached_size);
}
SocketUniquePtr stream_ptr;
- if (response_stream_id != INVALID_STREAM_ID) {
+ if (!response_stream_ids.empty()) {
+ StreamId response_stream_id = response_stream_ids[0];
Review Comment:
除了baidu_rpc_protocol.cpp
672行(这里后面判断了INVALID_STREAM_ID,没有其他地方会设置INVALID_STREAM_ID了(之前controller初始化stream_id会用INVALID_STREAM_ID),不过为了防止以后有人设置,我还是加个判断
> 还需要判断INVALID_STREAM_ID吗?
##########
src/brpc/stream.cpp:
##########
@@ -759,37 +771,73 @@ int StreamClose(StreamId stream_id) {
int StreamCreate(StreamId *request_stream, Controller &cntl,
const StreamOptions* options) {
- if (cntl._request_stream != INVALID_STREAM_ID) {
+ if (request_stream == NULL) {
+ LOG(ERROR) << "request_stream is NULL";
+ return -1;
+ }
+ StreamIds request_streams;
+ StreamCreate(request_streams, 1, cntl, options);
+ *request_stream = request_streams[0];
+ return 0;
+}
+
+int StreamCreate(StreamIds& request_streams, int request_stream_size,
Controller & cntl,
+ const StreamOptions* options) {
+ if (!cntl._request_streams.empty()) {
LOG(ERROR) << "Can't create request stream more than once";
return -1;
}
- if (request_stream == NULL) {
- LOG(ERROR) << "request_stream is NULL";
+ if (!request_streams.empty()) {
+ LOG(ERROR) << "request_streams should be empty";
return -1;
}
- StreamId stream_id;
StreamOptions opt;
if (options != NULL) {
opt = *options;
}
- if (Stream::Create(opt, NULL, &stream_id) != 0) {
- LOG(ERROR) << "Fail to create stream";
- return -1;
+ for (auto i = 0; i < request_stream_size; ++i) {
+ StreamId stream_id;
+ if (Stream::Create(opt, NULL, &stream_id) != 0) {
+ // Close already created streams
+ Stream::SetFailed(request_streams, 0 , "Fail to create stream at
%d index", i);
+ LOG(ERROR) << "Fail to create stream";
+ return -1;
+ }
+ cntl._request_streams.push_back(stream_id);
+ request_streams.push_back(stream_id);
}
- cntl._request_stream = stream_id;
- *request_stream = stream_id;
return 0;
}
int StreamAccept(StreamId* response_stream, Controller &cntl,
const StreamOptions* options) {
+ if (response_stream == NULL) {
+ LOG(ERROR) << "response_stream is NULL";
+ return -1;
+ }
+ StreamIds response_streams;
+ int res = StreamAccept(response_streams, cntl, options);
+ if(res != 0) {
+ return res;
+ }
+ if(response_streams.size() != 1) {
+ Stream::SetFailed(response_streams, 0, "Logic error");
+ LOG(ERROR) << "accept more than one response_stream";
+ return -1;
+ }
Review Comment:
> 1.
819行accept成功,并设置到cntl._response_streams了,但是这里返回了失败给用户,即服务端accept失败,客户端成功?
> 2. 这个接口处于不可用的状态了吧?要调StreamAccept(StreamIds& ...)才能保证一定成功。
理论上819成功,后面不可能失败了,这里只是为了防止用户用错接口(用批量stream create,结果rpc处理的时候用了单个stream
accept接口)。不过确实会存在你说的情况,我看下怎么报错更一致一些
##########
src/brpc/controller.cpp:
##########
@@ -1370,34 +1370,54 @@ void* Controller::session_local_data() {
}
void Controller::HandleStreamConnection(Socket *host_socket) {
- if (_request_stream == INVALID_STREAM_ID) {
+ if (_request_streams.empty()) {
CHECK(!has_remote_stream());
return;
}
- SocketUniquePtr ptr;
+ size_t stream_num = _request_streams.size();
+ std::vector<SocketUniquePtr> ptrs(stream_num);
if (!FailedInline()) {
- if (Socket::Address(_request_stream, &ptr) != 0) {
- if (!FailedInline()) {
- SetFailed(EREQUEST, "Request stream=%" PRIu64 " was closed
before responded",
- _request_stream);
- }
- } else if (_remote_stream_settings == NULL) {
+ if (_remote_stream_settings == NULL) {
if (!FailedInline()) {
SetFailed(EREQUEST, "The server didn't accept the stream");
}
+ } else {
+ for (size_t i = 0; i < stream_num; ++i) {
+ if (Socket::Address(_request_streams[i], &ptrs[i]) != 0) {
+ if (!FailedInline()) {
+ SetFailed(EREQUEST, "Request stream=%" PRIu64 " was
closed before responded",
+ _request_streams[i]);
+ break;
+ }
Review Comment:
> 一个closed就导致全部失败吗?
目前是这么设计的,同一个RPC注册的一批要全部成功,任意一个失败都要全部关闭
##########
src/brpc/stream.cpp:
##########
@@ -759,37 +771,73 @@ int StreamClose(StreamId stream_id) {
int StreamCreate(StreamId *request_stream, Controller &cntl,
const StreamOptions* options) {
- if (cntl._request_stream != INVALID_STREAM_ID) {
+ if (request_stream == NULL) {
+ LOG(ERROR) << "request_stream is NULL";
+ return -1;
+ }
+ StreamIds request_streams;
+ StreamCreate(request_streams, 1, cntl, options);
+ *request_stream = request_streams[0];
+ return 0;
Review Comment:
单个是批量的一种情况(为了兼容老接口),单个复用批量接口比较自然一些。批量调用单个的话有一些特殊的逻辑要处理下,而且要额外传入这次要创建的序号(方便知道是extra_stream里的第几个)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]