This is an automated email from the ASF dual-hosted git repository.
zhangchen pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 0c55edd3bf6 branch-2.1: [opt](compaction) Don't check missed rows in
cumu compaction if input rowsets are not in tablet (#45279) (#45304)
0c55edd3bf6 is described below
commit 0c55edd3bf60d45815861267d3855d673b0e196f
Author: bobhan1 <[email protected]>
AuthorDate: Thu Dec 12 11:14:30 2024 +0800
branch-2.1: [opt](compaction) Don't check missed rows in cumu compaction if
input rowsets are not in tablet (#45279) (#45304)
pick https://github.com/apache/doris/pull/45279
---
be/src/olap/compaction.cpp | 24 +++-
be/src/olap/cumulative_compaction.cpp | 14 ++
be/src/olap/cumulative_compaction_policy.cpp | 16 +++
be/src/olap/schema_change.cpp | 2 +
be/src/olap/tablet.cpp | 9 ++
be/src/olap/tablet.h | 1 +
.../test_compaction_on_sc_new_tablet.out | 25 ++++
.../test_compaction_on_sc_new_tablet.groovy | 149 +++++++++++++++++++++
8 files changed, 239 insertions(+), 1 deletion(-)
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 4b2ac6df119..88a85c4206f 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -972,8 +972,30 @@ Status Compaction::modify_rowsets(const
Merger::Statistics* stats) {
&output_rowset_delete_bitmap);
if (missed_rows) {
missed_rows_size = missed_rows->size();
+ // Suppose a heavy schema change process on BE converting tablet A
to tablet B.
+ // 1. during schema change double write, new loads write [X-Y] on
tablet B.
+ // 2. rowsets with version [a],[a+1],...,[b-1],[b] on tablet B are
picked for cumu compaction(X<=a<b<=Y).(cumu compaction
+ // on new tablet during schema change double write is allowed
after https://github.com/apache/doris/pull/16470)
+ // 3. schema change remove all rowsets on tablet B before version
Z(b<=Z<=Y) before it begins to convert historical rowsets.
+ // 4. schema change finishes.
+ // 5. cumu compation begins on new tablet with version
[a],...,[b]. If there are duplicate keys between these rowsets,
+ // the compaction check will fail because these rowsets have
skipped to calculate delete bitmap in commit phase and
+ // publish phase because tablet B is in NOT_READY state when
writing.
+
+ // Considering that the cumu compaction will fail finally in this
situation because `Tablet::modify_rowsets` will check if rowsets in
+ // `to_delete`(_input_rowsets) still exist in tablet's
`_rs_version_map`, we can just skip to check missed rows here.
+ bool need_to_check_missed_rows = true;
+ {
+ std::shared_lock rlock(_tablet->get_header_lock());
+ need_to_check_missed_rows =
+ std::all_of(_input_rowsets.begin(),
_input_rowsets.end(),
+ [&](const RowsetSharedPtr& rowset) {
+ return
_tablet->rowset_exists_unlocked(rowset);
+ });
+ }
+
if (_tablet->tablet_state() == TABLET_RUNNING && stats != nullptr
&&
- stats->merged_rows != missed_rows_size) {
+ stats->merged_rows != missed_rows_size &&
need_to_check_missed_rows) {
std::stringstream ss;
ss << "cumulative compaction: the merged rows(" <<
stats->merged_rows
<< ") is not equal to missed rows(" << missed_rows_size
diff --git a/be/src/olap/cumulative_compaction.cpp
b/be/src/olap/cumulative_compaction.cpp
index 04504432f19..8f598134905 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -66,6 +66,20 @@ Status CumulativeCompaction::prepare_compact() {
}
Status CumulativeCompaction::execute_compact_impl() {
+ DBUG_EXECUTE_IF("CumulativeCompaction::execute_compact.block", {
+ auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
+ if (target_tablet_id == _tablet->tablet_id()) {
+ LOG(INFO) << "start debug block "
+ << "CumulativeCompaction::execute_compact.block";
+ while (DebugPoints::instance()->is_enable(
+ "CumulativeCompaction::execute_compact.block")) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(200));
+ }
+ LOG(INFO) << "end debug block "
+ << "CumulativeCompaction::execute_compact.block";
+ }
+ })
+
std::unique_lock<std::mutex>
lock(_tablet->get_cumulative_compaction_lock(), std::try_to_lock);
if (!lock.owns_lock()) {
return Status::Error<TRY_LOCK_FAILED, false>(
diff --git a/be/src/olap/cumulative_compaction_policy.cpp
b/be/src/olap/cumulative_compaction_policy.cpp
index ee7a2b1812a..c812a12b656 100644
--- a/be/src/olap/cumulative_compaction_policy.cpp
+++ b/be/src/olap/cumulative_compaction_policy.cpp
@@ -28,6 +28,7 @@
#include "olap/olap_common.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
+#include "util/debug_points.h"
namespace doris {
@@ -246,6 +247,21 @@ int
SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets, Version*
last_delete_version,
size_t* compaction_score, bool allow_delete) {
+
DBUG_EXECUTE_IF("SizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
{
+ auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
+ if (target_tablet_id == tablet->tablet_id()) {
+ auto start_version = dp->param<int64_t>("start_version", -1);
+ auto end_version = dp->param<int64_t>("end_version", -1);
+ for (auto& rowset : candidate_rowsets) {
+ if (rowset->start_version() >= start_version &&
+ rowset->end_version() <= end_version) {
+ input_rowsets->push_back(rowset);
+ }
+ }
+ }
+ return input_rowsets->size();
+ })
+
size_t promotion_size = tablet->cumulative_promotion_size();
auto max_version = tablet->max_version().first;
int transient_size = 0;
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index b0701edf09d..b5cc293bdff 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -819,6 +819,8 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
return_columns[i] = i;
}
+ DBUG_EXECUTE_IF("SchemaChangeJob::_do_process_alter_tablet.block",
DBUG_BLOCK);
+
// begin to find deltas to convert from base tablet to new tablet so that
// obtain base tablet and new tablet's push lock and header write lock to
prevent loading data
{
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 5d20ba13c39..5e25e402fa5 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -537,6 +537,15 @@ Status Tablet::add_rowset(RowsetSharedPtr rowset) {
return Status::OK();
}
+bool Tablet::rowset_exists_unlocked(const RowsetSharedPtr& rowset) {
+ if (auto it = _rs_version_map.find(rowset->version()); it ==
_rs_version_map.end()) {
+ return false;
+ } else if (rowset->rowset_id() != it->second->rowset_id()) {
+ return false;
+ }
+ return true;
+}
+
Status Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
std::vector<RowsetSharedPtr>& to_delete, bool
check_delete) {
// the compaction process allow to compact the single version, eg:
version[4-4].
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index fc2d7a21cab..e22213ed79a 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -168,6 +168,7 @@ public:
// MUST hold EXCLUSIVE `_meta_lock`.
Status modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
std::vector<RowsetSharedPtr>& to_delete, bool
check_delete = false);
+ bool rowset_exists_unlocked(const RowsetSharedPtr& rowset);
// _rs_version_map and _stale_rs_version_map should be protected by
_meta_lock
// The caller must call hold _meta_lock when call this two function.
diff --git
a/regression-test/data/fault_injection_p0/test_compaction_on_sc_new_tablet.out
b/regression-test/data/fault_injection_p0/test_compaction_on_sc_new_tablet.out
new file mode 100644
index 00000000000..e7188943a10
--- /dev/null
+++
b/regression-test/data/fault_injection_p0/test_compaction_on_sc_new_tablet.out
@@ -0,0 +1,25 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+1 1 1 1
+2 2 2 2
+3 3 3 3
+4 4 4 4
+5 5 5 5
+6 6 6 6
+7 7 7 7
+8 8 8 8
+9 9 9 9
+10 10 10 10
+
+-- !sql --
+1 9 9 9
+2 2 2 2
+3 3 3 3
+4 4 4 4
+5 5 5 5
+6 6 6 6
+7 7 7 7
+8 8 8 8
+9 9 9 9
+10 10 10 10
+
diff --git
a/regression-test/suites/fault_injection_p0/test_compaction_on_sc_new_tablet.groovy
b/regression-test/suites/fault_injection_p0/test_compaction_on_sc_new_tablet.groovy
new file mode 100644
index 00000000000..2f3c44ef2dd
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_compaction_on_sc_new_tablet.groovy
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.junit.Assert
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_compaction_on_sc_new_tablet", "docker") {
+ def options = new ClusterOptions()
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.enableDebugPoints()
+ options.cloudMode = false
+ options.beConfigs += [
+ 'enable_java_support=false',
+ 'enable_mow_compaction_correctness_check_core=true'
+ ]
+ docker(options) {
+ try {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ def table1 = "test_compaction_on_sc_new_tablet"
+ sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+ sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+ `k` int,
+ `c1` int,
+ `c2` int,
+ `c3` int
+ ) UNIQUE KEY(k)
+ DISTRIBUTED BY HASH(k) BUCKETS 1
+ PROPERTIES (
+ "disable_auto_compaction" = "true",
+ "replication_num" = "1",
+ "enable_unique_key_merge_on_write" = "true"); """
+
+ // [2-11]
+ for (int i = 1; i <= 10; i++) {
+ sql "insert into ${table1} values($i,$i,$i,$i);"
+ }
+ qt_sql "select * from ${table1} order by k;"
+
+
+ def beNodes = sql_return_maparray("show backends;")
+ def tabletStats = sql_return_maparray("show tablets from
${table1};")
+ logger.info("tabletStats: \n${tabletStats}")
+ def tabletStat = tabletStats.get(0)
+ def tabletBackendId = tabletStat.BackendId
+ def tabletId = tabletStat.TabletId
+ def version = tabletStat.Version
+ def tabletBackend;
+ for (def be : beNodes) {
+ if (be.BackendId == tabletBackendId) {
+ tabletBackend = be
+ break;
+ }
+ }
+ logger.info("tablet ${tabletId} is on backend
${tabletBackend.Host} with backendId=${tabletBackend.BackendId},
version=${version}");
+
+ // blocking the schema change process before it gains max version
+
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::_do_process_alter_tablet.block")
+ Thread.sleep(2000)
+
+ sql "alter table ${table1} modify column c1 varchar(100);"
+
+ Thread.sleep(4000)
+
+ // double write [11-22]
+ for (int i = 20; i <= 30; i++) {
+ sql "insert into ${table1} values(1,9,9,9);"
+ }
+
+ tabletStats = sql_return_maparray("show tablets from ${table1};")
+ logger.info("tabletStats: \n${tabletStats}")
+ assertEquals(2, tabletStats.size())
+
+ def oldTabletStat
+ def newTabletStat
+ for (def stat: tabletStats) {
+ if (!stat.TabletId.equals(tabletId)) {
+ newTabletStat = stat
+ } else {
+ oldTabletStat = stat
+ }
+ }
+ logger.info("old tablet=[tablet_id=${oldTabletStat.TabletId},
version=${oldTabletStat.Version}]")
+ logger.info("new tablet=[tablet_id=${newTabletStat.TabletId},
version=${newTabletStat.Version}]")
+
+
+ // trigger cumu compaction on new tablet
+ int start_version = 15
+ int end_version = 17
+ // block compaction process on new tablet
+
GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction::execute_compact.block",
[tablet_id: "${newTabletStat.TabletId}"])
+ // manully set cumu compaction's input rowsets on new tablet
+
GetDebugPoint().enableDebugPointForAllBEs("SizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
+ [tablet_id:"${newTabletStat.TabletId}",
start_version:"${start_version}", end_version:"${end_version}"])
+
+ Thread.sleep(2000)
+
+ logger.info("trigger compaction [15-17] on new tablet
${newTabletStat.TabletId}")
+ def (code, out, err) =
be_run_cumulative_compaction(tabletBackend.Host, tabletBackend.HttpPort,
newTabletStat.TabletId)
+ logger.info("Run compaction: code=" + code + ", out=" + out + ",
err=" + err)
+ Assert.assertEquals(code, 0)
+ def compactJson = parseJson(out.trim())
+ Assert.assertEquals("success", compactJson.status.toLowerCase())
+
+ // make the schema change run to complete and wait for it
+
GetDebugPoint().disableDebugPointForAllBEs("SchemaChangeJob::_do_process_alter_tablet.block")
+ waitForSchemaChangeDone {
+ sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}'
ORDER BY createtime DESC LIMIT 1 """
+ time 2000
+ }
+
+ Thread.sleep(2000)
+
+ // make the cumu compaction run to complete and wait for it
+
GetDebugPoint().disableDebugPointForAllBEs("CumulativeCompaction::execute_compact.block")
+
+
+ // BE should skip to check merged rows in cumu compaction,
otherwise it will cause coredump
+ // becasue [11-22] in new tablet will skip to calc delete bitmap
becase tablet is in NOT_READY state
+ Thread.sleep(7000)
+
+ qt_sql "select * from ${table1} order by k;"
+
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ throw e
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]