This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new bcd01c5f242 branch-2.1-pick: [Fix](merge-on-write) should calculate
delete bitmaps between segmens before skip if tablet is in `NOT_READY` state in
flush phase (#48056) (#48089)
bcd01c5f242 is described below
commit bcd01c5f2425c8bd9a00392e6c893b5e77d0fb2e
Author: bobhan1 <[email protected]>
AuthorDate: Fri Feb 21 19:53:17 2025 +0800
branch-2.1-pick: [Fix](merge-on-write) should calculate delete bitmaps
between segmens before skip if tablet is in `NOT_READY` state in flush phase
(#48056) (#48089)
pick https://github.com/apache/doris/pull/48056
---
be/src/olap/memtable.cpp | 2 +
be/src/olap/rowset_builder.cpp | 15 +-
.../test_skip_calc_between_segments.out | Bin 0 -> 175 bytes
.../org/apache/doris/regression/suite/Suite.groovy | 68 +++++++++
.../test_skip_calc_between_segments.groovy | 153 +++++++++++++++++++++
5 files changed, 231 insertions(+), 7 deletions(-)
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index ec64406eb31..856cba6a1f5 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -35,6 +35,7 @@
#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
#include "tablet_meta.h"
+#include "util/debug_points.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "vec/aggregate_functions/aggregate_function_reader.h"
@@ -486,6 +487,7 @@ void MemTable::shrink_memtable_by_agg() {
}
bool MemTable::need_flush() const {
+ DBUG_EXECUTE_IF("MemTable.need_flush", { return true; });
auto max_size = config::write_buffer_size;
if (_is_partial_update) {
auto update_columns_size = _num_columns;
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index c668df4bd33..ddb55775965 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -255,13 +255,6 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() {
}
std::lock_guard<std::mutex> l(_lock);
SCOPED_TIMER(_submit_delete_bitmap_timer);
- // tablet is under alter process. The delete bitmap will be calculated
after conversion.
- if (tablet()->tablet_state() == TABLET_NOTREADY) {
- LOG(INFO) << "tablet is under alter process, delete bitmap will be
calculated later, "
- "tablet_id: "
- << tablet()->tablet_id() << " txn_id: " << _req.txn_id;
- return Status::OK();
- }
auto* beta_rowset = reinterpret_cast<BetaRowset*>(_rowset.get());
std::vector<segment_v2::SegmentSharedPtr> segments;
RETURN_IF_ERROR(beta_rowset->load_segments(&segments));
@@ -271,6 +264,14 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() {
tablet()->calc_delete_bitmap_between_segments(_rowset,
segments, _delete_bitmap));
}
+ // tablet is under alter process. The delete bitmap will be calculated
after conversion.
+ if (_tablet->tablet_state() == TABLET_NOTREADY) {
+ LOG(INFO) << "tablet is under alter process, delete bitmap will be
calculated later, "
+ "tablet_id: "
+ << _tablet->tablet_id() << " txn_id: " << _req.txn_id;
+ return Status::OK();
+ }
+
// For partial update, we need to fill in the entire row of data, during
the calculation
// of the delete bitmap. This operation is resource-intensive, and we need
to minimize
// the number of times it occurs. Therefore, we skip this operation here.
diff --git
a/regression-test/data/fault_injection_p0/test_skip_calc_between_segments.out
b/regression-test/data/fault_injection_p0/test_skip_calc_between_segments.out
new file mode 100644
index 00000000000..783cb8c5f8f
Binary files /dev/null and
b/regression-test/data/fault_injection_p0/test_skip_calc_between_segments.out
differ
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index c9e057cd7d0..3cd7f89ba09 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -1543,6 +1543,74 @@ class Suite implements GroovyInterceptable {
}
}
+ def get_be_param = { paramName ->
+ def ipList = [:]
+ def portList = [:]
+ def backendId_to_params = [:]
+ getBackendIpHttpPort(ipList, portList)
+ for (String id in ipList.keySet()) {
+ def beIp = ipList.get(id)
+ def bePort = portList.get(id)
+ // get the config value from be
+ def (code, out, err) = curl("GET",
String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort,
paramName))
+ assert code == 0
+ assert out.contains(paramName)
+ // parsing
+ def resultList = parseJson(out)[0]
+ assert resultList.size() == 4
+ // get original value
+ def paramValue = resultList[2]
+ backendId_to_params.put(id, paramValue)
+ }
+ logger.info("backendId_to_params: ${backendId_to_params}".toString())
+ return backendId_to_params
+ }
+
+ def set_be_param = { paramName, paramValue ->
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHttpPort(ipList, portList)
+ for (String id in ipList.keySet()) {
+ def beIp = ipList.get(id)
+ def bePort = portList.get(id)
+ logger.info("set be_id ${id} ${paramName} to
${paramValue}".toString())
+ def (code, out, err) = curl("POST",
String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName,
paramValue))
+ assert out.contains("OK")
+ }
+ }
+
+ def set_original_be_param = { paramName, backendId_to_params ->
+ def ipList = [:]
+ def portList = [:]
+ getBackendIpHttpPort(ipList, portList)
+ for (String id in ipList.keySet()) {
+ def beIp = ipList.get(id)
+ def bePort = portList.get(id)
+ def paramValue = backendId_to_params.get(id)
+ logger.info("set be_id ${id} ${paramName} to
${paramValue}".toString())
+ def (code, out, err) = curl("POST",
String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName,
paramValue))
+ assert out.contains("OK")
+ }
+ }
+
+ void setBeConfigTemporary(Map<String, Object> tempConfig, Closure
actionSupplier) {
+ Map<String, Map<String, String>> originConf = Maps.newHashMap()
+ tempConfig.each{ k, v ->
+ originConf.put(k, get_be_param(k))
+ }
+ try {
+ tempConfig.each{ k, v -> set_be_param(k, v)}
+ actionSupplier()
+ } catch (Exception e) {
+ logger.info(e.getMessage())
+ throw e
+ } finally {
+ originConf.each { k, confs ->
+ set_original_be_param(k, confs)
+ }
+ }
+ }
+
void waiteCreateTableFinished(String tableName) {
Thread.sleep(2000);
String showCreateTable = "SHOW CREATE TABLE ${tableName}"
diff --git
a/regression-test/suites/fault_injection_p0/test_skip_calc_between_segments.groovy
b/regression-test/suites/fault_injection_p0/test_skip_calc_between_segments.groovy
new file mode 100644
index 00000000000..a549bbec562
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_skip_calc_between_segments.groovy
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_skip_calc_between_segments", "nonConcurrent") {
+
+ def table1 = "test_skip_calc_between_segments"
+ sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+ sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+ `k1` int NOT NULL,
+ `c1` int,
+ `c2` int
+ )UNIQUE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "enable_unique_key_merge_on_write" = "true",
+ "disable_auto_compaction" = "true",
+ "replication_num" = "1"); """
+
+ sql "insert into ${table1} values(99999,99999,99999);"
+ sql "insert into ${table1} values(88888,88888,88888);"
+ sql "insert into ${table1} values(77777,77777,77777);"
+ sql "sync;"
+ qt_sql "select * from ${table1} order by k1;"
+
+ def block_sc = {
+
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::_do_process_alter_tablet.block")
+ }
+
+ def unblock_sc = {
+
GetDebugPoint().disableDebugPointForAllBEs("SchemaChangeJob::_do_process_alter_tablet.block")
+ }
+
+ def block_publish = {
+
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait")
+
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block")
+ }
+
+ def unblock_publish = {
+
GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait")
+
GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block")
+ }
+
+ def checkSegmentNum = { rowsetNum, lastRowsetSegmentNum ->
+ def tablets = sql_return_maparray """ show tablets from ${table1}; """
+ logger.info("tablets: ${tablets}")
+ assertEquals(1, tablets.size())
+ String compactionUrl = tablets[0]["CompactionStatus"]
+ def (code, out, err) = curl("GET", compactionUrl)
+ logger.info("Show tablets status: code=" + code + ", out=" + out + ",
err=" + err)
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ assert tabletJson.rowsets.size() == rowsetNum + 1
+ def rowset = tabletJson.rowsets.get(tabletJson.rowsets.size() - 1)
+ logger.info("rowset: ${rowset}")
+ int start_index = rowset.indexOf("]")
+ int end_index = rowset.indexOf("DATA")
+ def segmentNumStr = rowset.substring(start_index + 1, end_index).trim()
+ logger.info("segmentNumStr: ${segmentNumStr}")
+ assert lastRowsetSegmentNum == Integer.parseInt(segmentNumStr)
+ }
+
+ // to cause multi segments
+ def customBeConfig = [
+ doris_scanner_row_bytes : 1
+ ]
+
+ setBeConfigTemporary(customBeConfig) {
+ try {
+ // batch_size is 4164 in csv_reader.cpp
+ // _batch_size is 8192 in vtablet_writer.cpp
+
+ // to cause multi segments
+ GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush")
+
+ block_publish()
+
+ // block sc to let load skip to calculate delete bitmap in flush
and commit phase
+ block_sc()
+
+ sql "alter table ${table1} modify column c1 varchar(100);"
+
+ Thread.sleep(3000)
+
+ def t1 = Thread.start {
+ // load data that will have multi segments and there are
duplicate keys between segments
+ String content = ""
+ (1..4096).each {
+ content += "${it},${it},${it}\n"
+ }
+ content += content
+ streamLoad {
+ table "${table1}"
+ set 'column_separator', ','
+ inputStream new ByteArrayInputStream(content.getBytes())
+ time 30000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(8192, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+ }
+
+
+ Thread.sleep(2000)
+
+ // let sc finish and wait for tablet state to be RUNNING
+ unblock_sc()
+ waitForSchemaChangeDone {
+ sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}'
ORDER BY createtime DESC LIMIT 1 """
+ time 1000
+ }
+ logger.info("wait for schema change done")
+
+ Thread.sleep(500)
+
+ unblock_publish()
+
+ t1.join()
+ // ensure that we really write multi segments
+ checkSegmentNum(4, 3)
+
+ qt_sql "select count() from (select k1,count() as cnt from
${table1} group by k1 having cnt > 1) A;"
+
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ throw e
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]