This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0c302e7ebf4 [performance](move-memtable) only call _select_streams
when necessary (#35576)
0c302e7ebf4 is described below
commit 0c302e7ebf422e2a3d21cff71f0be967acc4669c
Author: Kaijie Chen <[email protected]>
AuthorDate: Wed Jun 19 09:45:43 2024 +0800
[performance](move-memtable) only call _select_streams when necessary
(#35576)
Only call `_select_streams` when creating delta writer.
Before:

After:

---
be/src/vec/sink/delta_writer_v2_pool.cpp | 4 +++-
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 12 ++++++++----
be/src/vec/sink/writer/vtablet_writer_v2.h | 2 +-
.../fault_injection_p0/test_writer_v2_fault_injection.groovy | 4 ++--
4 files changed, 14 insertions(+), 8 deletions(-)
diff --git a/be/src/vec/sink/delta_writer_v2_pool.cpp
b/be/src/vec/sink/delta_writer_v2_pool.cpp
index cfb2b5294c7..87c18194127 100644
--- a/be/src/vec/sink/delta_writer_v2_pool.cpp
+++ b/be/src/vec/sink/delta_writer_v2_pool.cpp
@@ -37,7 +37,9 @@ std::shared_ptr<DeltaWriterV2>
DeltaWriterV2Map::get_or_create(
return _map.at(tablet_id);
}
std::shared_ptr<DeltaWriterV2> writer = creator();
- _map[tablet_id] = writer;
+ if (writer != nullptr) {
+ _map[tablet_id] = writer;
+ }
return writer;
}
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 3c9c581b49d..3c8dede657f 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -416,17 +416,21 @@ Status VTabletWriterV2::write(Block& input_block) {
// For each tablet, send its input_rows from block to delta writer
for (const auto& [tablet_id, rows] : rows_for_tablet) {
- Streams streams;
- RETURN_IF_ERROR(_select_streams(tablet_id, rows.partition_id,
rows.index_id, streams));
- RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows, streams));
+ RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows));
}
return Status::OK();
}
Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block>
block, int64_t tablet_id,
- const Rows& rows, const Streams&
streams) {
+ const Rows& rows) {
auto delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id,
[&]() {
+ Streams streams;
+ auto st = _select_streams(tablet_id, rows.partition_id, rows.index_id,
streams);
+ if (!st.ok()) [[unlikely]] {
+ LOG(WARNING) << st << ", load_id=" << print_id(_load_id);
+ return std::unique_ptr<DeltaWriterV2>(nullptr);
+ }
WriteRequest req {
.tablet_id = tablet_id,
.txn_id = _txn_id,
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index 5a9890cdb49..ff31e1552dd 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -140,7 +140,7 @@ private:
RowsForTablet& rows_for_tablet);
Status _write_memtable(std::shared_ptr<vectorized::Block> block, int64_t
tablet_id,
- const Rows& rows, const Streams& streams);
+ const Rows& rows);
Status _select_streams(int64_t tablet_id, int64_t partition_id, int64_t
index_id,
Streams& streams);
diff --git
a/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy
index e6e5758b2b3..eaf87127abc 100644
---
a/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy
+++
b/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy
@@ -88,11 +88,11 @@ suite("test_writer_v2_fault_injection", "nonConcurrent") {
// VTabletWriterV2 tablet_location is null
load_with_injection("VTabletWriterV2._build_tablet_node_mapping.tablet_location_null",
"unknown tablet location")
// VTabletWriterV2 location is null
- load_with_injection("VTabletWriterV2._select_streams.location_null",
"unknown tablet location")
+ load_with_injection("VTabletWriterV2._select_streams.location_null",
"failed to open DeltaWriter for tablet")
// VTabletWriterV2 cancel
load_with_injection("VTabletWriterV2.close.cancel", "load cancel")
// DeltaWriterV2 stream_size is 0
load_with_injection("DeltaWriterV2.init.stream_size", "failed to find
tablet schema")
sql """ set enable_memtable_on_sink_node=false """
-}
\ No newline at end of file
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]