This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new fed632bf4a2 [fix](move-memtable) check segment num when closing each
tablet (#36753) (#37536)
fed632bf4a2 is described below
commit fed632bf4a2dc5e2e41d9c3c972d626f617e8daf
Author: Kaijie Chen <[email protected]>
AuthorDate: Thu Jul 11 20:33:44 2024 +0800
[fix](move-memtable) check segment num when closing each tablet (#36753)
(#37536)
cherry-pick #36753 and #37660
---
be/src/io/fs/stream_sink_file_writer.cpp | 53 ++++---
be/src/olap/delta_writer_v2.cpp | 3 +-
be/src/olap/delta_writer_v2.h | 2 +-
be/src/olap/rowset/beta_rowset_writer_v2.h | 2 +
be/src/runtime/load_stream.cpp | 13 +-
be/src/runtime/load_stream.h | 2 +
be/src/vec/sink/delta_writer_v2_pool.cpp | 9 +-
be/src/vec/sink/delta_writer_v2_pool.h | 3 +-
be/src/vec/sink/load_stream_map_pool.cpp | 18 ++-
be/src/vec/sink/load_stream_map_pool.h | 7 +-
be/src/vec/sink/load_stream_stub.cpp | 58 +++++++-
be/src/vec/sink/load_stream_stub.h | 2 +-
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 10 +-
be/test/runtime/load_stream_test.cpp | 152 +++++++++++++++------
be/test/vec/exec/delta_writer_v2_pool_test.cpp | 10 +-
gensrc/proto/internal_service.proto | 1 +
.../test_multi_replica_fault_injection.groovy | 5 +-
17 files changed, 262 insertions(+), 88 deletions(-)
diff --git a/be/src/io/fs/stream_sink_file_writer.cpp
b/be/src/io/fs/stream_sink_file_writer.cpp
index cfc924fad0a..e6007550396 100644
--- a/be/src/io/fs/stream_sink_file_writer.cpp
+++ b/be/src/io/fs/stream_sink_file_writer.cpp
@@ -51,42 +51,26 @@ Status StreamSinkFileWriter::appendv(const Slice* data,
size_t data_cnt) {
<< ", data_length: " << bytes_req;
std::span<const Slice> slices {data, data_cnt};
- size_t stream_index = 0;
+ size_t fault_injection_skipped_streams = 0;
bool ok = false;
- bool skip_stream = false;
Status st;
for (auto& stream : _streams) {
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
{
- if (stream_index >= 2) {
- skip_stream = true;
+ if (fault_injection_skipped_streams < 1) {
+ fault_injection_skipped_streams++;
+ continue;
}
});
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
{
- if (stream_index >= 1) {
- skip_stream = true;
+ if (fault_injection_skipped_streams < 2) {
+ fault_injection_skipped_streams++;
+ continue;
}
});
- if (!skip_stream) {
- st = stream->append_data(_partition_id, _index_id, _tablet_id,
_segment_id,
- _bytes_appended, slices);
- }
-
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
{
- if (stream_index >= 2) {
- st = Status::InternalError("stream sink file writer append
data failed");
- }
- stream_index++;
- skip_stream = false;
- });
-
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
{
- if (stream_index >= 1) {
- st = Status::InternalError("stream sink file writer append
data failed");
- }
- stream_index++;
- skip_stream = false;
- });
-
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
{
- st = Status::InternalError("stream sink file writer append data
failed");
- });
+
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
+ { continue; });
+ st = stream->append_data(_partition_id, _index_id, _tablet_id,
_segment_id, _bytes_appended,
+ slices);
ok = ok || st.ok();
if (!st.ok()) {
LOG(WARNING) << "failed to send segment data to backend " <<
stream->dst_id()
@@ -116,8 +100,23 @@ Status StreamSinkFileWriter::finalize() {
VLOG_DEBUG << "writer finalize, load_id: " << print_id(_load_id) << ",
index_id: " << _index_id
<< ", tablet_id: " << _tablet_id << ", segment_id: " <<
_segment_id;
// TODO(zhengyu): update get_inverted_index_file_size into stat
+ size_t fault_injection_skipped_streams = 0;
bool ok = false;
for (auto& stream : _streams) {
+
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
{
+ if (fault_injection_skipped_streams < 1) {
+ fault_injection_skipped_streams++;
+ continue;
+ }
+ });
+
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
{
+ if (fault_injection_skipped_streams < 2) {
+ fault_injection_skipped_streams++;
+ continue;
+ }
+ });
+
DBUG_EXECUTE_IF("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
+ { continue; });
auto st = stream->append_data(_partition_id, _index_id, _tablet_id,
_segment_id,
_bytes_appended, {}, true);
ok = ok || st.ok();
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index 378728f025c..e02c8eea70c 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -180,7 +180,7 @@ Status DeltaWriterV2::close() {
return _memtable_writer->close();
}
-Status DeltaWriterV2::close_wait(RuntimeProfile* profile) {
+Status DeltaWriterV2::close_wait(int32_t& num_segments, RuntimeProfile*
profile) {
SCOPED_RAW_TIMER(&_close_wait_time);
std::lock_guard<std::mutex> l(_lock);
DCHECK(_is_init)
@@ -190,6 +190,7 @@ Status DeltaWriterV2::close_wait(RuntimeProfile* profile) {
_update_profile(profile);
}
RETURN_IF_ERROR(_memtable_writer->close_wait(profile));
+ num_segments = _rowset_writer->next_segment_id();
_delta_written_success = true;
return Status::OK();
diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h
index 31b364e1038..0ef564be393 100644
--- a/be/src/olap/delta_writer_v2.h
+++ b/be/src/olap/delta_writer_v2.h
@@ -77,7 +77,7 @@ public:
Status close();
// wait for all memtables to be flushed.
// mem_consumption() should be 0 after this function returns.
- Status close_wait(RuntimeProfile* profile = nullptr);
+ Status close_wait(int32_t& num_segments, RuntimeProfile* profile =
nullptr);
// abandon current memtable and wait for all pending-flushing memtables to
be destructed.
// mem_consumption() should be 0 after this function returns.
diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h
b/be/src/olap/rowset/beta_rowset_writer_v2.h
index 4b0ab950de4..6d1321bd144 100644
--- a/be/src/olap/rowset/beta_rowset_writer_v2.h
+++ b/be/src/olap/rowset/beta_rowset_writer_v2.h
@@ -131,6 +131,8 @@ public:
int32_t allocate_segment_id() override { return
_segment_creator.allocate_segment_id(); };
+ int32_t next_segment_id() { return _segment_creator.next_segment_id(); };
+
int64_t delete_bitmap_ns() override { return _delete_bitmap_ns; }
int64_t segment_writer_ns() override { return _segment_writer_ns; }
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 8de15091ec5..1f8c33995b3 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -244,6 +244,11 @@ Status TabletStream::close() {
if (!_failed_st->ok()) {
return *_failed_st;
}
+ if (_next_segid.load() != _num_segments) {
+ return Status::Corruption(
+ "segment num mismatch in tablet {}, expected: {}, actual: {},
load_id: {}", _id,
+ _num_segments, _next_segid.load(), print_id(_load_id));
+ }
Status st = Status::OK();
auto close_func = [this, &mu, &cv, &st]() {
@@ -307,11 +312,17 @@ Status IndexStream::close(const std::vector<PTabletID>&
tablets_to_commit,
SCOPED_TIMER(_close_wait_timer);
// open all need commit tablets
for (const auto& tablet : tablets_to_commit) {
+ if (_id != tablet.index_id()) {
+ continue;
+ }
TabletStreamSharedPtr tablet_stream;
auto it = _tablet_streams_map.find(tablet.tablet_id());
- if (it == _tablet_streams_map.end() && _id == tablet.index_id()) {
+ if (it == _tablet_streams_map.end()) {
RETURN_IF_ERROR(
_init_tablet_stream(tablet_stream, tablet.tablet_id(),
tablet.partition_id()));
+ tablet_stream->add_num_segments(tablet.num_segments());
+ } else {
+ it->second->add_num_segments(tablet.num_segments());
}
}
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index b2635698379..f690882a878 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -52,6 +52,7 @@ public:
Status append_data(const PStreamHeader& header, butil::IOBuf* data);
Status add_segment(const PStreamHeader& header, butil::IOBuf* data);
+ void add_num_segments(int64_t num_segments) { _num_segments +=
num_segments; }
Status close();
int64_t id() const { return _id; }
@@ -63,6 +64,7 @@ private:
std::vector<std::unique_ptr<ThreadPoolToken>> _flush_tokens;
std::unordered_map<int64_t, std::unique_ptr<SegIdMapping>> _segids_mapping;
std::atomic<uint32_t> _next_segid;
+ int64_t _num_segments = 0;
bthread::Mutex _lock;
std::shared_ptr<Status> _failed_st;
PUniqueId _load_id;
diff --git a/be/src/vec/sink/delta_writer_v2_pool.cpp
b/be/src/vec/sink/delta_writer_v2_pool.cpp
index 87c18194127..bc5233ac307 100644
--- a/be/src/vec/sink/delta_writer_v2_pool.cpp
+++ b/be/src/vec/sink/delta_writer_v2_pool.cpp
@@ -43,7 +43,8 @@ std::shared_ptr<DeltaWriterV2>
DeltaWriterV2Map::get_or_create(
return writer;
}
-Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
+Status DeltaWriterV2Map::close(std::unordered_map<int64_t, int32_t>&
segments_for_tablet,
+ RuntimeProfile* profile) {
int num_use = --_use_cnt;
if (num_use > 0) {
LOG(INFO) << "keeping DeltaWriterV2Map, load_id=" << _load_id << " ,
use_cnt=" << num_use;
@@ -58,8 +59,10 @@ Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
RETURN_IF_ERROR(writer->close());
}
LOG(INFO) << "close-waiting DeltaWriterV2Map, load_id=" << _load_id;
- for (auto& [_, writer] : _map) {
- RETURN_IF_ERROR(writer->close_wait(profile));
+ for (auto& [tablet_id, writer] : _map) {
+ int32_t num_segments;
+ RETURN_IF_ERROR(writer->close_wait(num_segments, profile));
+ segments_for_tablet[tablet_id] = num_segments;
}
return Status::OK();
}
diff --git a/be/src/vec/sink/delta_writer_v2_pool.h
b/be/src/vec/sink/delta_writer_v2_pool.h
index 912b9216e9f..7e58eea3149 100644
--- a/be/src/vec/sink/delta_writer_v2_pool.h
+++ b/be/src/vec/sink/delta_writer_v2_pool.h
@@ -70,7 +70,8 @@ public:
int64_t tablet_id, std::function<std::unique_ptr<DeltaWriterV2>()>
creator);
// close all delta writers in this DeltaWriterV2Map if there is no other
users
- Status close(RuntimeProfile* profile = nullptr);
+ Status close(std::unordered_map<int64_t, int32_t>& segments_for_tablet,
+ RuntimeProfile* profile = nullptr);
// cancel all delta writers in this DeltaWriterV2Map
void cancel(Status status);
diff --git a/be/src/vec/sink/load_stream_map_pool.cpp
b/be/src/vec/sink/load_stream_map_pool.cpp
index 7a3072ade6e..2fcb8deaeb2 100644
--- a/be/src/vec/sink/load_stream_map_pool.cpp
+++ b/be/src/vec/sink/load_stream_map_pool.cpp
@@ -87,7 +87,9 @@ void LoadStreamMap::save_tablets_to_commit(int64_t dst_id,
const std::vector<PTabletID>&
tablets_to_commit) {
std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
auto& tablets = _tablets_to_commit[dst_id];
- tablets.insert(tablets.end(), tablets_to_commit.begin(),
tablets_to_commit.end());
+ for (const auto& tablet : tablets_to_commit) {
+ tablets.emplace(tablet.tablet_id(), tablet);
+ }
}
bool LoadStreamMap::release() {
@@ -103,12 +105,24 @@ bool LoadStreamMap::release() {
Status LoadStreamMap::close_load(bool incremental) {
return for_each_st([this, incremental](int64_t dst_id, const Streams&
streams) -> Status {
+ std::vector<PTabletID> tablets_to_commit;
const auto& tablets = _tablets_to_commit[dst_id];
+ tablets_to_commit.reserve(tablets.size());
+ for (const auto& [tablet_id, tablet] : tablets) {
+ tablets_to_commit.push_back(tablet);
+
tablets_to_commit.back().set_num_segments(_segments_for_tablet[tablet_id]);
+ }
+ bool first = true;
for (auto& stream : streams) {
if (stream->is_incremental() != incremental) {
continue;
}
- RETURN_IF_ERROR(stream->close_load(tablets));
+ if (first) {
+ RETURN_IF_ERROR(stream->close_load(tablets_to_commit));
+ first = false;
+ } else {
+ RETURN_IF_ERROR(stream->close_load({}));
+ }
}
return Status::OK();
});
diff --git a/be/src/vec/sink/load_stream_map_pool.h
b/be/src/vec/sink/load_stream_map_pool.h
index d0f72ab7e00..dcddcdaf8d8 100644
--- a/be/src/vec/sink/load_stream_map_pool.h
+++ b/be/src/vec/sink/load_stream_map_pool.h
@@ -90,6 +90,10 @@ public:
void save_tablets_to_commit(int64_t dst_id, const std::vector<PTabletID>&
tablets_to_commit);
+ void save_segments_for_tablet(const std::unordered_map<int64_t, int32_t>&
segments_for_tablet) {
+ _segments_for_tablet.insert(segments_for_tablet.cbegin(),
segments_for_tablet.cend());
+ }
+
// Return true if the last instance is just released.
bool release();
@@ -109,7 +113,8 @@ private:
std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;
std::mutex _tablets_to_commit_mutex;
- std::unordered_map<int64_t, std::vector<PTabletID>> _tablets_to_commit;
+ std::unordered_map<int64_t, std::unordered_map<int64_t, PTabletID>>
_tablets_to_commit;
+ std::unordered_map<int64_t, int32_t> _segments_for_tablet;
};
class LoadStreamMapPool {
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index caebb381db6..93f3fd87a85 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -207,6 +207,11 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id,
int64_t tablet_id,
int64_t segment_id, uint64_t offset,
std::span<const Slice> data,
bool segment_eos) {
+ DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", {
+ if (segment_id != 0) {
+ return Status::OK();
+ }
+ });
PStreamHeader header;
header.set_src_id(_src_id);
*header.mutable_load_id() = _load_id;
@@ -224,6 +229,11 @@ Status LoadStreamStub::append_data(int64_t partition_id,
int64_t index_id, int64
Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id,
int64_t tablet_id,
int64_t segment_id, const
SegmentStatistics& segment_stat,
TabletSchemaSPtr flush_schema) {
+ DBUG_EXECUTE_IF("LoadStreamStub.only_send_segment_0", {
+ if (segment_id != 0) {
+ return Status::OK();
+ }
+ });
PStreamHeader header;
header.set_src_id(_src_id);
*header.mutable_load_id() = _load_id;
@@ -368,7 +378,53 @@ Status LoadStreamStub::_send_with_buffer(butil::IOBuf&
buf, bool sync) {
std::lock_guard<decltype(_send_mutex)> send_lock(_send_mutex);
buffer_lock.unlock();
VLOG_DEBUG << "send buf size : " << output.size() << ", sync: " << sync;
- return _send_with_retry(output);
+ auto st = _send_with_retry(output);
+ if (!st.ok()) {
+ _handle_failure(output, st);
+ }
+ return st;
+}
+
+void LoadStreamStub::_handle_failure(butil::IOBuf& buf, Status st) {
+ while (buf.size() > 0) {
+ // step 1: parse header
+ size_t hdr_len = 0;
+ buf.cutn((void*)&hdr_len, sizeof(size_t));
+ butil::IOBuf hdr_buf;
+ PStreamHeader hdr;
+ buf.cutn(&hdr_buf, hdr_len);
+ butil::IOBufAsZeroCopyInputStream wrapper(hdr_buf);
+ hdr.ParseFromZeroCopyStream(&wrapper);
+
+ // step 2: cut data
+ size_t data_len = 0;
+ buf.cutn((void*)&data_len, sizeof(size_t));
+ butil::IOBuf data_buf;
+ buf.cutn(&data_buf, data_len);
+
+ // step 3: handle failure
+ switch (hdr.opcode()) {
+ case PStreamHeader::ADD_SEGMENT:
+ case PStreamHeader::APPEND_DATA: {
+ add_failed_tablet(hdr.tablet_id(), st);
+ } break;
+ case PStreamHeader::CLOSE_LOAD: {
+ brpc::StreamClose(_stream_id);
+ } break;
+ case PStreamHeader::GET_SCHEMA: {
+ // Just log and let wait_for_schema timeout
+ std::ostringstream oss;
+ for (const auto& tablet : hdr.tablets()) {
+ oss << " " << tablet.tablet_id();
+ }
+ LOG(WARNING) << "failed to send GET_SCHEMA request, tablet_id:" <<
oss.str() << ", "
+ << *this;
+ } break;
+ default:
+ LOG(WARNING) << "unexpected stream message " << hdr.opcode() << ",
" << *this;
+ DCHECK(false);
+ }
+ }
}
Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) {
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index 1bf0fac4e38..4e6aad8d1ae 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -207,7 +207,6 @@ public:
_success_tablets.push_back(tablet_id);
}
- // for tests only
void add_failed_tablet(int64_t tablet_id, Status reason) {
std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
_failed_tablets[tablet_id] = reason;
@@ -217,6 +216,7 @@ private:
Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data
= {});
Status _send_with_buffer(butil::IOBuf& buf, bool sync = false);
Status _send_with_retry(butil::IOBuf& buf);
+ void _handle_failure(butil::IOBuf& buf, Status st);
Status _check_cancel() {
if (!_is_cancelled.load()) {
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index a6abe14f4f1..fecbd324c57 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -545,13 +545,18 @@ Status VTabletWriterV2::close(Status exec_status) {
// close DeltaWriters
{
+ std::unordered_map<int64_t, int32_t> segments_for_tablet;
SCOPED_TIMER(_close_writer_timer);
// close all delta writers if this is the last user
- auto st = _delta_writer_for_tablet->close(_profile);
+ auto st = _delta_writer_for_tablet->close(segments_for_tablet,
_profile);
_delta_writer_for_tablet.reset();
if (!st.ok()) {
RETURN_IF_ERROR(_cancel(st));
}
+ // only the last sink closing delta writers will have segment num
+ if (!segments_for_tablet.empty()) {
+
_load_stream_map->save_segments_for_tablet(segments_for_tablet);
+ }
}
_calc_tablets_to_commit();
@@ -661,7 +666,8 @@ void VTabletWriterV2::_calc_tablets_to_commit() {
if (VLOG_DEBUG_IS_ON) {
partition_ids.push_back(tablet.partition_id());
}
- tablets_to_commit.push_back(tablet);
+ PTabletID t(tablet);
+ tablets_to_commit.push_back(t);
}
}
if (VLOG_DEBUG_IS_ON) {
diff --git a/be/test/runtime/load_stream_test.cpp
b/be/test/runtime/load_stream_test.cpp
index 2ee3f86c1a4..f0cc3354d8e 100644
--- a/be/test/runtime/load_stream_test.cpp
+++ b/be/test/runtime/load_stream_test.cpp
@@ -494,19 +494,17 @@ public:
: _heavy_work_pool(4, 32, "load_stream_test_heavy"),
_light_work_pool(4, 32, "load_stream_test_light") {}
- void close_load(MockSinkClient& client, uint32_t sender_id =
NORMAL_SENDER_ID) {
+ void close_load(MockSinkClient& client, const std::vector<PTabletID>&
tablets_to_commit = {},
+ uint32_t sender_id = NORMAL_SENDER_ID) {
butil::IOBuf append_buf;
PStreamHeader header;
header.mutable_load_id()->set_hi(1);
header.mutable_load_id()->set_lo(1);
header.set_opcode(PStreamHeader::CLOSE_LOAD);
header.set_src_id(sender_id);
- /* TODO: fix test with tablets_to_commit
- PTabletID* tablets_to_commit = header.add_tablets();
- tablets_to_commit->set_partition_id(NORMAL_PARTITION_ID);
- tablets_to_commit->set_index_id(NORMAL_INDEX_ID);
- tablets_to_commit->set_tablet_id(NORMAL_TABLET_ID);
- */
+ for (const auto& tablet : tablets_to_commit) {
+ *header.add_tablets() = tablet;
+ }
size_t hdr_len = header.ByteSizeLong();
append_buf.append((char*)&hdr_len, sizeof(size_t));
append_buf.append(header.SerializeAsString());
@@ -677,14 +675,19 @@ TEST_F(LoadStreamMgrTest, one_client_normal) {
write_normal(client);
reset_response_stat();
- close_load(client, ABNORMAL_SENDER_ID);
+ PTabletID tablet;
+ tablet.set_partition_id(NORMAL_PARTITION_ID);
+ tablet.set_index_id(NORMAL_INDEX_ID);
+ tablet.set_tablet_id(NORMAL_TABLET_ID);
+ tablet.set_num_segments(1);
+ close_load(client, {tablet}, ABNORMAL_SENDER_ID);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1);
- close_load(client);
+ close_load(client, {tablet});
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
@@ -735,14 +738,19 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_index) {
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);
- close_load(client, 1);
+ PTabletID tablet;
+ tablet.set_partition_id(NORMAL_PARTITION_ID);
+ tablet.set_index_id(ABNORMAL_INDEX_ID);
+ tablet.set_tablet_id(NORMAL_TABLET_ID);
+ tablet.set_num_segments(1);
+ close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
- close_load(client, 0);
+ close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
@@ -766,17 +774,23 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_sender) {
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);
- close_load(client, 1);
+ PTabletID tablet;
+ tablet.set_partition_id(NORMAL_PARTITION_ID);
+ tablet.set_index_id(NORMAL_INDEX_ID);
+ tablet.set_tablet_id(NORMAL_TABLET_ID);
+ tablet.set_num_segments(1);
+ close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
- close_load(client, 0);
+ // on the final close_load, segment num check will fail
+ close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
- EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
+ EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 2);
// server will close stream on CLOSE_LOAD
wait_for_close();
@@ -796,13 +810,18 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_tablet) {
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids[0], ABNORMAL_TABLET_ID);
- close_load(client, 1);
+ PTabletID tablet;
+ tablet.set_partition_id(NORMAL_PARTITION_ID);
+ tablet.set_index_id(NORMAL_INDEX_ID);
+ tablet.set_tablet_id(ABNORMAL_TABLET_ID);
+ tablet.set_num_segments(1);
+ close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
- close_load(client, 0);
+ close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
@@ -829,21 +848,26 @@ TEST_F(LoadStreamMgrTest,
one_client_one_index_one_tablet_single_segment0_zero_b
0, data, true);
EXPECT_EQ(g_response_stat.num, 0);
+ PTabletID tablet;
+ tablet.set_partition_id(NORMAL_PARTITION_ID);
+ tablet.set_index_id(NORMAL_INDEX_ID);
+ tablet.set_tablet_id(NORMAL_TABLET_ID);
+ tablet.set_num_segments(1);
// CLOSE_LOAD
- close_load(client, 1);
+ close_load(client, {tablet}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
// duplicated close
- close_load(client, 1);
+ close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
- close_load(client, 0);
+ close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
@@ -870,8 +894,13 @@ TEST_F(LoadStreamMgrTest, close_load_before_recv_eos) {
data.length(), data, false);
EXPECT_EQ(g_response_stat.num, 0);
+ PTabletID tablet;
+ tablet.set_partition_id(NORMAL_PARTITION_ID);
+ tablet.set_index_id(NORMAL_INDEX_ID);
+ tablet.set_tablet_id(NORMAL_TABLET_ID);
+ tablet.set_num_segments(1);
// CLOSE_LOAD before EOS
- close_load(client);
+ close_load(client, {tablet});
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
@@ -885,7 +914,7 @@ TEST_F(LoadStreamMgrTest, close_load_before_recv_eos) {
data.length(), data, true);
// duplicated close, will not be handled
- close_load(client);
+ close_load(client, {tablet});
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
@@ -912,21 +941,26 @@ TEST_F(LoadStreamMgrTest,
one_client_one_index_one_tablet_single_segment0) {
data.length(), data, true);
EXPECT_EQ(g_response_stat.num, 0);
+ PTabletID tablet;
+ tablet.set_partition_id(NORMAL_PARTITION_ID);
+ tablet.set_index_id(NORMAL_INDEX_ID);
+ tablet.set_tablet_id(NORMAL_TABLET_ID);
+ tablet.set_num_segments(1);
// CLOSE_LOAD
- close_load(client, 1);
+ close_load(client, {tablet}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
// duplicated close
- close_load(client, 1);
+ close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
- close_load(client, 0);
+ close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
@@ -956,21 +990,26 @@ TEST_F(LoadStreamMgrTest,
one_client_one_index_one_tablet_single_segment_without
0, data, false);
EXPECT_EQ(g_response_stat.num, 0);
+ PTabletID tablet;
+ tablet.set_partition_id(NORMAL_PARTITION_ID);
+ tablet.set_index_id(NORMAL_INDEX_ID);
+ tablet.set_tablet_id(NORMAL_TABLET_ID);
+ tablet.set_num_segments(1);
// CLOSE_LOAD
- close_load(client, 1);
+ close_load(client, {tablet}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
// duplicated close
- close_load(client, 1);
+ close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
- close_load(client, 0);
+ close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
@@ -999,21 +1038,26 @@ TEST_F(LoadStreamMgrTest,
one_client_one_index_one_tablet_single_segment1) {
data.length(), data, true);
EXPECT_EQ(g_response_stat.num, 0);
+ PTabletID tablet;
+ tablet.set_partition_id(NORMAL_PARTITION_ID);
+ tablet.set_index_id(NORMAL_INDEX_ID);
+ tablet.set_tablet_id(NORMAL_TABLET_ID);
+ tablet.set_num_segments(1);
// CLOSE_LOAD
- close_load(client, 1);
+ close_load(client, {tablet}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
// duplicated close
- close_load(client, 1);
+ close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
- close_load(client, 0);
+ close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
@@ -1046,21 +1090,26 @@ TEST_F(LoadStreamMgrTest,
one_client_one_index_one_tablet_two_segment) {
0, data2, true);
EXPECT_EQ(g_response_stat.num, 0);
+ PTabletID tablet;
+ tablet.set_partition_id(NORMAL_PARTITION_ID);
+ tablet.set_index_id(NORMAL_INDEX_ID);
+ tablet.set_tablet_id(NORMAL_TABLET_ID);
+ tablet.set_num_segments(2);
// CLOSE_LOAD
- close_load(client, 1);
+ close_load(client, {tablet}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
// duplicated close
- close_load(client, 1);
+ close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
- close_load(client, 0);
+ close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
@@ -1098,21 +1147,32 @@ TEST_F(LoadStreamMgrTest,
one_client_one_index_three_tablet) {
NORMAL_TABLET_ID + 2, 0, 0, data2, true);
EXPECT_EQ(g_response_stat.num, 0);
+
+ PTabletID tablet1;
+ tablet1.set_partition_id(NORMAL_PARTITION_ID);
+ tablet1.set_index_id(NORMAL_INDEX_ID);
+ tablet1.set_tablet_id(NORMAL_TABLET_ID);
+ tablet1.set_num_segments(1);
+ PTabletID tablet2 {tablet1};
+ tablet2.set_tablet_id(NORMAL_TABLET_ID + 1);
+ PTabletID tablet3 {tablet1};
+ tablet3.set_tablet_id(NORMAL_TABLET_ID + 2);
+
// CLOSE_LOAD
- close_load(client, 1);
+ close_load(client, {tablet1, tablet2, tablet3}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
// duplicated close
- close_load(client, 1);
+ close_load(client, {tablet1, tablet2, tablet3}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
- close_load(client, 0);
+ close_load(client, {tablet1, tablet2, tablet3}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 3);
@@ -1166,22 +1226,27 @@ TEST_F(LoadStreamMgrTest,
two_client_one_index_one_tablet_three_segment) {
}
EXPECT_EQ(g_response_stat.num, 0);
+ PTabletID tablet;
+ tablet.set_partition_id(NORMAL_PARTITION_ID);
+ tablet.set_index_id(NORMAL_INDEX_ID);
+ tablet.set_tablet_id(NORMAL_TABLET_ID);
+ tablet.set_num_segments(3);
// CLOSE_LOAD
- close_load(clients[1], 1);
+ close_load(clients[1], {tablet}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
// duplicated close
- close_load(clients[1], 1);
+ close_load(clients[1], {tablet}, 1);
wait_for_ack(2);
// stream closed, no response will be sent
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
- close_load(clients[0], 0);
+ close_load(clients[0], {tablet}, 0);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
@@ -1236,8 +1301,13 @@ TEST_F(LoadStreamMgrTest,
two_client_one_close_before_the_other_open) {
}
EXPECT_EQ(g_response_stat.num, 0);
+ PTabletID tablet;
+ tablet.set_partition_id(NORMAL_PARTITION_ID);
+ tablet.set_index_id(NORMAL_INDEX_ID);
+ tablet.set_tablet_id(NORMAL_TABLET_ID);
+ tablet.set_num_segments(3);
// CLOSE_LOAD
- close_load(clients[0], 0);
+ close_load(clients[0], {tablet}, 0);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
@@ -1254,7 +1324,7 @@ TEST_F(LoadStreamMgrTest,
two_client_one_close_before_the_other_open) {
NORMAL_TABLET_ID, segid, 0, segment_data[i * 3 +
segid], true);
}
- close_load(clients[1], 1);
+ close_load(clients[1], {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
diff --git a/be/test/vec/exec/delta_writer_v2_pool_test.cpp
b/be/test/vec/exec/delta_writer_v2_pool_test.cpp
index a67a701c409..dc86ce8c3a2 100644
--- a/be/test/vec/exec/delta_writer_v2_pool_test.cpp
+++ b/be/test/vec/exec/delta_writer_v2_pool_test.cpp
@@ -42,9 +42,10 @@ TEST_F(DeltaWriterV2PoolTest, test_pool) {
EXPECT_EQ(2, pool.size());
EXPECT_EQ(map, map3);
EXPECT_NE(map, map2);
- EXPECT_TRUE(map->close().ok());
- EXPECT_TRUE(map2->close().ok());
- EXPECT_TRUE(map3->close().ok());
+ std::unordered_map<int64_t, int32_t> sft;
+ EXPECT_TRUE(map->close(sft).ok());
+ EXPECT_TRUE(map2->close(sft).ok());
+ EXPECT_TRUE(map3->close(sft).ok());
EXPECT_EQ(0, pool.size());
}
@@ -72,7 +73,8 @@ TEST_F(DeltaWriterV2PoolTest, test_map) {
EXPECT_EQ(2, map->size());
EXPECT_EQ(writer, writer3);
EXPECT_NE(writer, writer2);
- static_cast<void>(map->close());
+ std::unordered_map<int64_t, int32_t> sft;
+ static_cast<void>(map->close(sft));
EXPECT_EQ(0, pool.size());
}
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 0a975b81991..14a165a3b9d 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -66,6 +66,7 @@ message PTabletID {
optional int64 partition_id = 1;
optional int64 index_id = 2;
optional int64 tablet_id = 3;
+ optional int64 num_segments = 4;
}
message PTabletInfo {
diff --git
a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
index 37d8b4f2610..8080b52ff48 100644
---
a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
+++
b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy
@@ -91,10 +91,11 @@ suite("test_multi_replica_fault_injection",
"nonConcurrent") {
// success
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_one_replica",
"sucess")
// StreamSinkFileWriter appendv write segment failed two replica
-
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
"replica num 1 < load required replica num 2")
+
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_two_replica",
"add segment failed")
// StreamSinkFileWriter appendv write segment failed all replica
load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_all_replica",
"failed to send segment data to any replicas")
-
+ // test segment num check when LoadStreamStub missed tail segments
+ load_with_injection("LoadStreamStub.only_send_segment_0", "segment num
mismatch")
sql """ set enable_memtable_on_sink_node=false """
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]