This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 14310ad30b [improvement](move-memtable) wait StreamClose from remote
(#23605)
14310ad30b is described below
commit 14310ad30b569bc65c6eadce862e1599ba32b448
Author: Kaijie Chen <[email protected]>
AuthorDate: Wed Aug 30 18:03:36 2023 +0800
[improvement](move-memtable) wait StreamClose from remote (#23605)
* [fix](move-memtable) wait StreamClose from remote
---
be/src/runtime/load_stream.cpp | 1 +
be/src/vec/sink/vtablet_sink_v2.cpp | 12 +++++------
be/src/vec/sink/vtablet_sink_v2.h | 2 +-
be/test/runtime/load_stream_test.cpp | 41 +++++++++++++++++-------------------
4 files changed, 27 insertions(+), 29 deletions(-)
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 961f73540a..5abf75476c 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -430,6 +430,7 @@ int LoadStream::on_received_messages(StreamId id,
butil::IOBuf* const messages[]
auto st =
close(hdr.src_id(), tablets_to_commit,
&success_tablet_ids, &failed_tablet_ids);
_report_result(id, st, &success_tablet_ids, &failed_tablet_ids);
+ brpc::StreamClose(id);
} break;
default:
LOG(WARNING) << "unexpected stream message " << hdr.opcode();
diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp
b/be/src/vec/sink/vtablet_sink_v2.cpp
index 196e5ff7e2..243507db71 100644
--- a/be/src/vec/sink/vtablet_sink_v2.cpp
+++ b/be/src/vec/sink/vtablet_sink_v2.cpp
@@ -125,13 +125,13 @@ int
StreamSinkHandler::on_received_messages(brpc::StreamId id, butil::IOBuf* con
<< status;
}
}
-
- _sink->_pending_reports.fetch_add(-1);
}
return 0;
}
-void StreamSinkHandler::on_closed(brpc::StreamId id) {}
+void StreamSinkHandler::on_closed(brpc::StreamId id) {
+ _sink->_pending_streams.fetch_add(-1);
+}
VOlapTableSinkV2::VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor&
row_desc,
const std::vector<TExpr>& texprs, Status*
status)
@@ -298,6 +298,7 @@ Status VOlapTableSinkV2::_init_stream_pool(const NodeInfo&
node_info, StreamPool
cntl.ErrorText());
}
stream_pool.push_back(stream);
+ _pending_streams.fetch_add(1);
}
return Status::OK();
}
@@ -522,10 +523,10 @@ Status VOlapTableSinkV2::close(RuntimeState* state,
Status exec_status) {
{
SCOPED_TIMER(_close_load_timer);
- while (_pending_reports.load() > 0) {
+ while (_pending_streams.load() > 0) {
// TODO: use a better wait
std::this_thread::sleep_for(std::chrono::milliseconds(1));
- LOG(INFO) << "sinkv2 close_wait, pending reports: " <<
_pending_reports.load();
+ LOG(INFO) << "sinkv2 close_wait, pending streams: " <<
_pending_streams.load();
}
}
@@ -587,7 +588,6 @@ Status VOlapTableSinkV2::_close_load(brpc::StreamId stream)
{
size_t header_len = header.ByteSizeLong();
buf.append(reinterpret_cast<uint8_t*>(&header_len), sizeof(header_len));
buf.append(header.SerializeAsString());
- _pending_reports.fetch_add(1);
io::StreamSinkFileWriter::send_with_retry(stream, buf);
return Status::OK();
}
diff --git a/be/src/vec/sink/vtablet_sink_v2.h
b/be/src/vec/sink/vtablet_sink_v2.h
index 8f4ce8f42f..c2c24a26fb 100644
--- a/be/src/vec/sink/vtablet_sink_v2.h
+++ b/be/src/vec/sink/vtablet_sink_v2.h
@@ -226,7 +226,7 @@ private:
size_t _stream_index = 0;
std::shared_ptr<DeltaWriterForTablet> _delta_writer_for_tablet;
- std::atomic<int> _pending_reports {0};
+ std::atomic<int> _pending_streams {0};
std::unordered_map<int64_t, std::vector<int64_t>> _tablet_success_map;
std::unordered_map<int64_t, std::vector<int64_t>> _tablet_failure_map;
diff --git a/be/test/runtime/load_stream_test.cpp
b/be/test/runtime/load_stream_test.cpp
index e3d3868547..00f53b5c5d 100644
--- a/be/test/runtime/load_stream_test.cpp
+++ b/be/test/runtime/load_stream_test.cpp
@@ -679,9 +679,8 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_load) {
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID);
- EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1);
- client.disconnect();
+ // server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
@@ -708,12 +707,11 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_index) {
close_load(client, 0);
wait_for_ack(3);
- EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
- client.disconnect();
+ // server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
@@ -743,7 +741,7 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_sender) {
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
- client.disconnect();
+ // server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
@@ -773,12 +771,12 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_tablet) {
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
- client.disconnect();
+ // server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
-TEST_F(LoadStreamMgrTest,
one_client_one_index_one_tablet_signle_segment0_zero_bytes) {
+TEST_F(LoadStreamMgrTest,
one_client_one_index_one_tablet_single_segment0_zero_bytes) {
MockSinkClient client;
auto st = client.connect_stream();
EXPECT_TRUE(st.ok());
@@ -814,12 +812,12 @@ TEST_F(LoadStreamMgrTest,
one_client_one_index_one_tablet_signle_segment0_zero_b
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);
- client.disconnect();
+ // server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
-TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_signle_segment0) {
+TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment0) {
MockSinkClient client;
auto st = client.connect_stream();
EXPECT_TRUE(st.ok());
@@ -860,12 +858,12 @@ TEST_F(LoadStreamMgrTest,
one_client_one_index_one_tablet_signle_segment0) {
auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID,
NORMAL_TABLET_ID, 0);
EXPECT_EQ(written_data, data + data);
- client.disconnect();
+ // server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
-TEST_F(LoadStreamMgrTest,
one_client_one_index_one_tablet_signle_segment_without_eos) {
+TEST_F(LoadStreamMgrTest,
one_client_one_index_one_tablet_single_segment_without_eos) {
MockSinkClient client;
auto st = client.connect_stream();
EXPECT_TRUE(st.ok());
@@ -901,12 +899,12 @@ TEST_F(LoadStreamMgrTest,
one_client_one_index_one_tablet_signle_segment_without
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);
- client.disconnect();
+ // server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
-TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_signle_segment1) {
+TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment1) {
MockSinkClient client;
auto st = client.connect_stream();
EXPECT_TRUE(st.ok());
@@ -944,7 +942,7 @@ TEST_F(LoadStreamMgrTest,
one_client_one_index_one_tablet_signle_segment1) {
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);
- client.disconnect();
+ // server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
@@ -997,7 +995,7 @@ TEST_F(LoadStreamMgrTest,
one_client_one_index_one_tablet_two_segment) {
written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID,
NORMAL_TABLET_ID, 1);
EXPECT_EQ(written_data, data2);
- client.disconnect();
+ // server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
@@ -1058,7 +1056,7 @@ TEST_F(LoadStreamMgrTest,
one_client_one_index_three_tablet) {
written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID,
NORMAL_TABLET_ID + 2, 0);
EXPECT_EQ(written_data, data2);
- client.disconnect();
+ // server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
@@ -1100,13 +1098,14 @@ TEST_F(LoadStreamMgrTest,
two_client_one_index_one_tablet_three_segment) {
// duplicated close
close_load(clients[1], 1);
wait_for_ack(2);
- EXPECT_EQ(g_response_stat.num, 2);
+ // stream closed, no response will be sent
+ EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
close_load(clients[0], 0);
- wait_for_ack(3);
- EXPECT_EQ(g_response_stat.num, 3);
+ wait_for_ack(2);
+ EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID);
@@ -1130,9 +1129,7 @@ TEST_F(LoadStreamMgrTest,
two_client_one_index_one_tablet_three_segment) {
EXPECT_EQ(written_data, segment_data[sender_id * 3 + i]);
}
- for (int i = 0; i < 2; i++) {
- clients[i].disconnect();
- }
+ // server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]