This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit ffd9da44a2a1803bcab1d2c4895eea9eed9f6e44 Author: Kaijie Chen <c...@apache.org> AuthorDate: Fri Apr 19 10:13:18 2024 +0800 [fix](move-memtable) fix commit may fail due to duplicated reports (#32403) --- be/src/vec/sink/load_stream_stub.h | 12 ++ be/src/vec/sink/writer/vtablet_writer_v2.cpp | 86 +++++--- be/src/vec/sink/writer/vtablet_writer_v2.h | 7 + be/test/vec/sink/vtablet_writer_v2_test.cpp | 239 +++++++++++++++++++++ .../test_commit_info_fault_injection.groovy | 99 +++++++++ 5 files changed, 414 insertions(+), 29 deletions(-) diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 5ebec9f9d78..8ef40b84145 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -200,6 +200,18 @@ public: std::string to_string(); + // for tests only + void add_success_tablet(int64_t tablet_id) { + std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex); + _success_tablets.push_back(tablet_id); + } + + // for tests only + void add_failed_tablet(int64_t tablet_id, Status reason) { + std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex); + _failed_tablets[tablet_id] = reason; + } + private: Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data = {}); Status _send_with_buffer(butil::IOBuf& buf, bool sync = false); diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 1f1756b5a16..21a87c150b8 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -574,39 +574,28 @@ Status VTabletWriterV2::close(Status exec_status) { // calculate and submit commit info if (is_last_sink) { - std::unordered_map<int64_t, int> failed_tablets; - std::unordered_map<int64_t, Status> failed_reason; - std::vector<TTabletCommitInfo> tablet_commit_infos; - - _load_stream_map->for_each([&](int64_t dst_id, const Streams& streams) { - std::unordered_set<int64_t> known_tablets; - for (const auto& stream : streams) { - for (auto [tablet_id, reason] : stream->failed_tablets()) { - if (known_tablets.contains(tablet_id)) { - continue; - } - known_tablets.insert(tablet_id); - failed_tablets[tablet_id]++; - failed_reason[tablet_id] = reason; - } - for (auto tablet_id : stream->success_tablets()) { - if (known_tablets.contains(tablet_id)) { - continue; - } - known_tablets.insert(tablet_id); - TTabletCommitInfo commit_info; - commit_info.tabletId = tablet_id; - commit_info.backendId = dst_id; - tablet_commit_infos.emplace_back(std::move(commit_info)); + DBUG_EXECUTE_IF("VTabletWriterV2.close.add_failed_tablet", { + auto streams = _load_stream_map->at(_tablets_for_node.begin()->first); + int64_t tablet_id = -1; + for (auto& stream : *streams) { + const auto& tablets = stream->success_tablets(); + if (tablets.size() > 0) { + tablet_id = tablets[0]; + break; } } + if (tablet_id != -1) { + LOG(INFO) << "fault injection: adding failed tablet_id: " << tablet_id; + streams->front()->add_failed_tablet(tablet_id, + Status::InternalError("fault injection")); + } else { + LOG(INFO) << "fault injection: failed to inject failed tablet_id"; + } }); - for (auto [tablet_id, replicas] : failed_tablets) { - if (replicas > (_num_replicas - 1) / 2) { - return failed_reason.at(tablet_id); - } - } + std::vector<TTabletCommitInfo> tablet_commit_infos; + RETURN_IF_ERROR( + _create_commit_info(tablet_commit_infos, _load_stream_map, _num_replicas)); _state->tablet_commit_infos().insert( _state->tablet_commit_infos().end(), std::make_move_iterator(tablet_commit_infos.begin()), @@ -659,4 +648,43 @@ void VTabletWriterV2::_calc_tablets_to_commit() { } } +Status VTabletWriterV2::_create_commit_info(std::vector<TTabletCommitInfo>& tablet_commit_infos, + std::shared_ptr<LoadStreamMap> load_stream_map, + int num_replicas) { + std::unordered_map<int64_t, int> failed_tablets; + std::unordered_map<int64_t, Status> failed_reason; + load_stream_map->for_each([&](int64_t dst_id, const Streams& streams) { + std::unordered_set<int64_t> known_tablets; + for (const auto& stream : streams) { + for (auto [tablet_id, reason] : stream->failed_tablets()) { + if (known_tablets.contains(tablet_id)) { + continue; + } + known_tablets.insert(tablet_id); + failed_tablets[tablet_id]++; + failed_reason[tablet_id] = reason; + } + for (auto tablet_id : stream->success_tablets()) { + if (known_tablets.contains(tablet_id)) { + continue; + } + known_tablets.insert(tablet_id); + TTabletCommitInfo commit_info; + commit_info.tabletId = tablet_id; + commit_info.backendId = dst_id; + tablet_commit_infos.emplace_back(std::move(commit_info)); + } + } + }); + + for (auto [tablet_id, replicas] : failed_tablets) { + if (replicas > (num_replicas - 1) / 2) { + LOG(INFO) << "tablet " << tablet_id + << " failed on majority backends: " << failed_reason[tablet_id]; + return failed_reason.at(tablet_id); + } + } + return Status::OK(); +} + } // namespace doris::vectorized diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index 20952229bbb..e3d31fb32b9 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -114,6 +114,13 @@ public: Status on_partitions_created(TCreatePartitionResult* result); +#ifndef BE_TEST +private: +#endif + static Status _create_commit_info(std::vector<TTabletCommitInfo>& tablet_commit_infos, + std::shared_ptr<LoadStreamMap> load_stream_map, + int num_replicas); + private: Status _init_row_distribution(); diff --git a/be/test/vec/sink/vtablet_writer_v2_test.cpp b/be/test/vec/sink/vtablet_writer_v2_test.cpp new file mode 100644 index 00000000000..6289896c75f --- /dev/null +++ b/be/test/vec/sink/vtablet_writer_v2_test.cpp @@ -0,0 +1,239 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/writer/vtablet_writer_v2.h" + +#include <gtest/gtest.h> + +#include "vec/sink/load_stream_map_pool.h" +#include "vec/sink/load_stream_stub.h" + +namespace doris { + +class TestVTabletWriterV2 : public ::testing::Test { +public: + TestVTabletWriterV2() = default; + ~TestVTabletWriterV2() = default; + static void SetUpTestSuite() {} + static void TearDownTestSuite() {} +}; + +const int64_t src_id = 1000; + +static void add_stream(std::shared_ptr<LoadStreamMap> load_stream_map, int64_t node_id, + std::vector<int64_t> success_tablets, + std::unordered_map<int64_t, Status> failed_tablets) { + auto stub = load_stream_map->get_or_create(node_id); + for (const auto& tablet_id : success_tablets) { + stub->at(0)->add_success_tablet(tablet_id); + } + for (const auto& [tablet_id, reason] : failed_tablets) { + stub->at(0)->add_failed_tablet(tablet_id, reason); + } +} + +TEST_F(TestVTabletWriterV2, one_replica) { + UniqueId load_id; + std::vector<TTabletCommitInfo> tablet_commit_infos; + std::shared_ptr<LoadStreamMap> load_stream_map = + std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr); + const int num_replicas = 1; + add_stream(load_stream_map, 1001, {1, 2}, {}); + auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map, + num_replicas); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(tablet_commit_infos.size(), 2); +} + +TEST_F(TestVTabletWriterV2, one_replica_fail) { + UniqueId load_id; + std::vector<TTabletCommitInfo> tablet_commit_infos; + std::shared_ptr<LoadStreamMap> load_stream_map = + std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr); + const int num_replicas = 1; + add_stream(load_stream_map, 1001, {1}, {{2, Status::InternalError("test")}}); + auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map, + num_replicas); + ASSERT_EQ(st, Status::InternalError("test")); +} + +TEST_F(TestVTabletWriterV2, two_replica) { + UniqueId load_id; + std::vector<TTabletCommitInfo> tablet_commit_infos; + std::shared_ptr<LoadStreamMap> load_stream_map = + std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr); + const int num_replicas = 2; + add_stream(load_stream_map, 1001, {1, 2}, {}); + add_stream(load_stream_map, 1002, {1, 2}, {}); + auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map, + num_replicas); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(tablet_commit_infos.size(), 4); +} + +TEST_F(TestVTabletWriterV2, two_replica_fail) { + UniqueId load_id; + std::vector<TTabletCommitInfo> tablet_commit_infos; + std::shared_ptr<LoadStreamMap> load_stream_map = + std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr); + const int num_replicas = 2; + add_stream(load_stream_map, 1001, {1}, {{2, Status::InternalError("test")}}); + add_stream(load_stream_map, 1002, {1, 2}, {}); + auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map, + num_replicas); + ASSERT_EQ(st, Status::InternalError("test")); +} + +TEST_F(TestVTabletWriterV2, normal) { + UniqueId load_id; + std::vector<TTabletCommitInfo> tablet_commit_infos; + std::shared_ptr<LoadStreamMap> load_stream_map = + std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr); + const int num_replicas = 3; + add_stream(load_stream_map, 1001, {1, 2}, {}); + add_stream(load_stream_map, 1002, {1, 2}, {}); + add_stream(load_stream_map, 1003, {1, 2}, {}); + auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map, + num_replicas); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(tablet_commit_infos.size(), 6); +} + +TEST_F(TestVTabletWriterV2, miss_one) { + UniqueId load_id; + std::vector<TTabletCommitInfo> tablet_commit_infos; + std::shared_ptr<LoadStreamMap> load_stream_map = + std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr); + const int num_replicas = 3; + add_stream(load_stream_map, 1001, {1, 2}, {}); + add_stream(load_stream_map, 1002, {1}, {}); + add_stream(load_stream_map, 1003, {1, 2}, {}); + auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map, + num_replicas); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(tablet_commit_infos.size(), 5); +} + +TEST_F(TestVTabletWriterV2, miss_two) { + UniqueId load_id; + std::vector<TTabletCommitInfo> tablet_commit_infos; + std::shared_ptr<LoadStreamMap> load_stream_map = + std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr); + const int num_replicas = 3; + add_stream(load_stream_map, 1001, {1, 2}, {}); + add_stream(load_stream_map, 1002, {1}, {}); + add_stream(load_stream_map, 1003, {1}, {}); + auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map, + num_replicas); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(tablet_commit_infos.size(), 4); +} + +TEST_F(TestVTabletWriterV2, fail_one) { + UniqueId load_id; + std::vector<TTabletCommitInfo> tablet_commit_infos; + std::shared_ptr<LoadStreamMap> load_stream_map = + std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr); + const int num_replicas = 3; + add_stream(load_stream_map, 1001, {1, 2}, {}); + add_stream(load_stream_map, 1002, {1}, {{2, Status::InternalError("test")}}); + add_stream(load_stream_map, 1003, {1, 2}, {}); + auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map, + num_replicas); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(tablet_commit_infos.size(), 5); +} + +TEST_F(TestVTabletWriterV2, fail_one_duplicate) { + UniqueId load_id; + std::vector<TTabletCommitInfo> tablet_commit_infos; + std::shared_ptr<LoadStreamMap> load_stream_map = + std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr); + const int num_replicas = 3; + add_stream(load_stream_map, 1001, {1, 2}, {}); + add_stream(load_stream_map, 1002, {1}, {{2, Status::InternalError("test")}}); + add_stream(load_stream_map, 1002, {1}, {{2, Status::InternalError("test")}}); + add_stream(load_stream_map, 1003, {1, 2}, {}); + auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map, + num_replicas); + // Duplicate tablets from same node should be ignored + ASSERT_TRUE(st.ok()); + ASSERT_EQ(tablet_commit_infos.size(), 5); +} + +TEST_F(TestVTabletWriterV2, fail_two_diff_tablet_same_node) { + UniqueId load_id; + std::vector<TTabletCommitInfo> tablet_commit_infos; + std::shared_ptr<LoadStreamMap> load_stream_map = + std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr); + const int num_replicas = 3; + add_stream(load_stream_map, 1001, {1, 2}, {}); + add_stream(load_stream_map, 1002, {}, + {{1, Status::InternalError("test")}, {2, Status::InternalError("test")}}); + add_stream(load_stream_map, 1003, {1, 2}, {}); + auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map, + num_replicas); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(tablet_commit_infos.size(), 4); +} + +TEST_F(TestVTabletWriterV2, fail_two_diff_tablet_diff_node) { + UniqueId load_id; + std::vector<TTabletCommitInfo> tablet_commit_infos; + std::shared_ptr<LoadStreamMap> load_stream_map = + std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr); + const int num_replicas = 3; + add_stream(load_stream_map, 1001, {1, 2}, {}); + add_stream(load_stream_map, 1002, {1}, {{2, Status::InternalError("test")}}); + add_stream(load_stream_map, 1003, {2}, {{1, Status::InternalError("test")}}); + auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map, + num_replicas); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(tablet_commit_infos.size(), 4); +} + +TEST_F(TestVTabletWriterV2, fail_two_same_tablet) { + UniqueId load_id; + std::vector<TTabletCommitInfo> tablet_commit_infos; + std::shared_ptr<LoadStreamMap> load_stream_map = + std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr); + const int num_replicas = 3; + add_stream(load_stream_map, 1001, {1, 2}, {}); + add_stream(load_stream_map, 1002, {1}, {{2, Status::InternalError("test")}}); + add_stream(load_stream_map, 1003, {1}, {{2, Status::InternalError("test")}}); + auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map, + num_replicas); + // BE should detect and abort commit if majority of replicas failed + ASSERT_EQ(st, Status::InternalError("test")); +} + +TEST_F(TestVTabletWriterV2, fail_two_miss_one_same_tablet) { + UniqueId load_id; + std::vector<TTabletCommitInfo> tablet_commit_infos; + std::shared_ptr<LoadStreamMap> load_stream_map = + std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr); + const int num_replicas = 3; + add_stream(load_stream_map, 1001, {1}, {}); + add_stream(load_stream_map, 1002, {1}, {{2, Status::InternalError("test")}}); + add_stream(load_stream_map, 1003, {1}, {{2, Status::InternalError("test")}}); + auto st = vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, load_stream_map, + num_replicas); + // BE should detect and abort commit if majority of replicas failed + ASSERT_EQ(st, Status::InternalError("test")); +} + +} // namespace doris diff --git a/regression-test/suites/fault_injection_p0/test_commit_info_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_commit_info_fault_injection.groovy new file mode 100644 index 00000000000..be769149df4 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_commit_info_fault_injection.groovy @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods +import org.apache.doris.regression.util.Http + +suite("test_commit_info_fault_injection", "nonConcurrent") { + def res = sql"show backends;" + logger.info(res.toString()) + def beNums = 0; + res.each { item -> + beNums++; + logger.info(item.toString()) + } + if (beNums == 3){ + result = sql "show VARIABLES like \'enable_memtable_on_sink_node\'" + log.info(result.toString()) + original_status = result[0][1] + + sql """ set enable_memtable_on_sink_node=true """ + sql """ + CREATE TABLE IF NOT EXISTS `baseall` ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "3") + """ + sql """ + CREATE TABLE IF NOT EXISTS `test` ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace_if_not_null null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "3") + """ + + GetDebugPoint().clearDebugPointsForAllBEs() + streamLoad { + table "baseall" + db "regression_test_fault_injection_p0" + set 'column_separator', ',' + file "baseall.txt" + } + + def load_with_injection = { injection, error_msg-> + try { + GetDebugPoint().enableDebugPointForAllBEs(injection) + sql "insert into test select * from baseall where k1 <= 3" + } catch(Exception e) { + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains(error_msg)) + } finally { + GetDebugPoint().disableDebugPointForAllBEs(injection) + } + } + + // One replica commit failed, load should success + load_with_injection("VTabletWriterV2.close.add_failed_tablet", "sucess") + + sql """set enable_memtable_on_sink_node = ${original_status}""" + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org