This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 51cb15d032d [improve](move-memtable) cancel load immediately when back
pressure in delta writer v2 (#29280)
51cb15d032d is described below
commit 51cb15d032d929da9301be85dac61d7fd568cb1b
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Sat Dec 30 10:45:06 2023 +0800
[improve](move-memtable) cancel load immediately when back pressure in
delta writer v2 (#29280)
---
be/src/olap/delta_writer_v2.cpp | 19 ++--
be/src/olap/delta_writer_v2.h | 7 +-
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 2 +-
be/test/vec/exec/delta_writer_v2_pool_test.cpp | 10 +-
..._writer_v2_back_pressure_fault_injection.groovy | 106 +++++++++++++++++++++
5 files changed, 132 insertions(+), 12 deletions(-)
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index 643f1837982..5987c63e658 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -65,16 +65,18 @@ namespace doris {
using namespace ErrorCode;
std::unique_ptr<DeltaWriterV2> DeltaWriterV2::open(
- WriteRequest* req, const std::vector<std::shared_ptr<LoadStreamStub>>&
streams) {
+ WriteRequest* req, const std::vector<std::shared_ptr<LoadStreamStub>>&
streams,
+ RuntimeState* state) {
std::unique_ptr<DeltaWriterV2> writer(
- new DeltaWriterV2(req, streams, StorageEngine::instance()));
+ new DeltaWriterV2(req, streams, StorageEngine::instance(), state));
return writer;
}
DeltaWriterV2::DeltaWriterV2(WriteRequest* req,
const
std::vector<std::shared_ptr<LoadStreamStub>>& streams,
- StorageEngine* storage_engine)
- : _req(*req),
+ StorageEngine* storage_engine, RuntimeState*
state)
+ : _state(state),
+ _req(*req),
_tablet_schema(new TabletSchema),
_memtable_writer(new MemTableWriter(*req)),
_streams(streams) {}
@@ -158,8 +160,13 @@ Status DeltaWriterV2::write(const vectorized::Block*
block, const std::vector<ui
}
{
SCOPED_RAW_TIMER(&_wait_flush_limit_time);
- while (_memtable_writer->flush_running_count() >=
- config::memtable_flush_running_count_limit) {
+ auto memtable_flush_running_count_limit =
config::memtable_flush_running_count_limit;
+ DBUG_EXECUTE_IF("DeltaWriterV2.write.back_pressure",
+ { memtable_flush_running_count_limit = 0; });
+ while (_memtable_writer->flush_running_count() >=
memtable_flush_running_count_limit) {
+ if (_state->is_cancelled()) {
+ return Status::Cancelled(_state->cancel_reason());
+ }
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h
index f896977d842..7db79680ee8 100644
--- a/be/src/olap/delta_writer_v2.h
+++ b/be/src/olap/delta_writer_v2.h
@@ -64,7 +64,8 @@ class Block;
class DeltaWriterV2 {
public:
static std::unique_ptr<DeltaWriterV2> open(
- WriteRequest* req, const
std::vector<std::shared_ptr<LoadStreamStub>>& streams);
+ WriteRequest* req, const
std::vector<std::shared_ptr<LoadStreamStub>>& streams,
+ RuntimeState* state);
~DeltaWriterV2();
@@ -88,7 +89,7 @@ public:
private:
DeltaWriterV2(WriteRequest* req, const
std::vector<std::shared_ptr<LoadStreamStub>>& streams,
- StorageEngine* storage_engine);
+ StorageEngine* storage_engine, RuntimeState* state);
void _build_current_tablet_schema(int64_t index_id,
const OlapTableSchemaParam*
table_schema_param,
@@ -96,6 +97,8 @@ private:
void _update_profile(RuntimeProfile* profile);
+ RuntimeState* _state = nullptr;
+
bool _is_init = false;
bool _is_cancelled = false;
WriteRequest _req;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 4972a397f70..8ded964950b 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -438,7 +438,7 @@ Status
VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
break;
}
}
- return DeltaWriterV2::open(&req, streams);
+ return DeltaWriterV2::open(&req, streams, _state);
});
{
SCOPED_TIMER(_wait_mem_limit_timer);
diff --git a/be/test/vec/exec/delta_writer_v2_pool_test.cpp
b/be/test/vec/exec/delta_writer_v2_pool_test.cpp
index d44fd17a761..a40724cf9ff 100644
--- a/be/test/vec/exec/delta_writer_v2_pool_test.cpp
+++ b/be/test/vec/exec/delta_writer_v2_pool_test.cpp
@@ -56,9 +56,13 @@ TEST_F(DeltaWriterV2PoolTest, test_map) {
auto map = pool.get_or_create(load_id);
EXPECT_EQ(1, pool.size());
WriteRequest req;
- auto writer = map->get_or_create(100, [&req]() { return
DeltaWriterV2::open(&req, {}); });
- auto writer2 = map->get_or_create(101, [&req]() { return
DeltaWriterV2::open(&req, {}); });
- auto writer3 = map->get_or_create(100, [&req]() { return
DeltaWriterV2::open(&req, {}); });
+ RuntimeState state;
+ auto writer = map->get_or_create(
+ 100, [&req, &state]() { return DeltaWriterV2::open(&req, {},
&state); });
+ auto writer2 = map->get_or_create(
+ 101, [&req, &state]() { return DeltaWriterV2::open(&req, {},
&state); });
+ auto writer3 = map->get_or_create(
+ 100, [&req, &state]() { return DeltaWriterV2::open(&req, {},
&state); });
EXPECT_EQ(2, map->size());
EXPECT_EQ(writer, writer3);
EXPECT_NE(writer, writer2);
diff --git
a/regression-test/suites/fault_injection_p0/test_delta_writer_v2_back_pressure_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_delta_writer_v2_back_pressure_fault_injection.groovy
new file mode 100644
index 00000000000..ea9e9ffb8bb
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_delta_writer_v2_back_pressure_fault_injection.groovy
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("test_delta_writer_v2_back_pressure_fault_injection", "nonConcurrent") {
+ sql """ set enable_memtable_on_sink_node=true """
+ sql """ DROP TABLE IF EXISTS `baseall` """
+ sql """ DROP TABLE IF EXISTS `test` """
+ sql """
+ CREATE TABLE IF NOT EXISTS `baseall` (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(9, 3) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS `test` (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(9, 3) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace_if_not_null null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+ """
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ streamLoad {
+ table "baseall"
+ db "regression_test_fault_injection_p0"
+ set 'column_separator', ','
+ file "baseall.txt"
+ }
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("DeltaWriterV2.write.back_pressure")
+ def thread1 = new Thread({
+ try {
+ def res = sql "insert into test select * from baseall where k1
<= 3"
+ logger.info(res.toString())
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ assertTrue(e.getMessage().contains("Communications link
failure"))
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("DeltaWriterV2.write.back_pressure")
+ }
+ })
+ thread1.start()
+
+ sleep(1000)
+
+ def processList = sql "show processlist"
+ logger.info(processList.toString())
+ processList.each { item ->
+ logger.info(item[1].toString())
+ logger.info(item[11].toString())
+ if (item[11].toString() == "insert into test select * from baseall
where k1 <= 3".toString()){
+ def res = sql "kill ${item[1]}"
+ logger.info(res.toString())
+ }
+ }
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ }
+
+ sql """ DROP TABLE IF EXISTS `baseall` """
+ sql """ DROP TABLE IF EXISTS `test` """
+ sql """ set enable_memtable_on_sink_node=false """
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]