This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 8fc26d510c3 branch-3.0: [opt](compaction) Don't check missed rows in
cumu compaction if input rowsets are not in tablet #45279 (#45303)
8fc26d510c3 is described below
commit 8fc26d510c3f80114933db623143fdd767848b6f
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Dec 20 00:37:24 2024 +0800
branch-3.0: [opt](compaction) Don't check missed rows in cumu compaction if
input rowsets are not in tablet #45279 (#45303)
Cherry-picked from #45279
Co-authored-by: bobhan1 <[email protected]>
---
be/src/olap/compaction.cpp | 25 +++-
be/src/olap/cumulative_compaction.cpp | 14 ++
be/src/olap/cumulative_compaction_policy.cpp | 16 +++
be/src/olap/schema_change.cpp | 2 +
be/src/olap/tablet.cpp | 9 ++
be/src/olap/tablet.h | 1 +
.../test_compaction_on_sc_new_tablet.out | 25 ++++
.../test_compaction_on_sc_new_tablet.groovy | 149 +++++++++++++++++++++
8 files changed, 240 insertions(+), 1 deletion(-)
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 34e52ccb5b0..5f490e99034 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -1014,8 +1014,31 @@ Status CompactionMixin::modify_rowsets() {
if
(!_tablet->tablet_meta()->tablet_schema()->cluster_key_idxes().empty()) {
merged_missed_rows_size += _stats.filtered_rows;
}
+
+ // Suppose a heavy schema change process on BE converting tablet A
to tablet B.
+ // 1. during schema change double write, new loads write [X-Y] on
tablet B.
+ // 2. rowsets with version [a],[a+1],...,[b-1],[b] on tablet B are
picked for cumu compaction(X<=a<b<=Y).(cumu compaction
+ // on new tablet during schema change double write is allowed
after https://github.com/apache/doris/pull/16470)
+ // 3. schema change remove all rowsets on tablet B before version
Z(b<=Z<=Y) before it begins to convert historical rowsets.
+ // 4. schema change finishes.
+ // 5. cumu compation begins on new tablet with version
[a],...,[b]. If there are duplicate keys between these rowsets,
+ // the compaction check will fail because these rowsets have
skipped to calculate delete bitmap in commit phase and
+ // publish phase because tablet B is in NOT_READY state when
writing.
+
+ // Considering that the cumu compaction will fail finally in this
situation because `Tablet::modify_rowsets` will check if rowsets in
+ // `to_delete`(_input_rowsets) still exist in tablet's
`_rs_version_map`, we can just skip to check missed rows here.
+ bool need_to_check_missed_rows = true;
+ {
+ std::shared_lock rlock(_tablet->get_header_lock());
+ need_to_check_missed_rows =
+ std::all_of(_input_rowsets.begin(),
_input_rowsets.end(),
+ [&](const RowsetSharedPtr& rowset) {
+ return
tablet()->rowset_exists_unlocked(rowset);
+ });
+ }
+
if (_tablet->tablet_state() == TABLET_RUNNING &&
- merged_missed_rows_size != missed_rows_size) {
+ merged_missed_rows_size != missed_rows_size &&
need_to_check_missed_rows) {
std::stringstream ss;
ss << "cumulative compaction: the merged rows(" <<
_stats.merged_rows
<< "), filtered rows(" << _stats.filtered_rows
diff --git a/be/src/olap/cumulative_compaction.cpp
b/be/src/olap/cumulative_compaction.cpp
index b961c694ede..fe9e5204f4c 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -100,6 +100,20 @@ Status CumulativeCompaction::prepare_compact() {
}
Status CumulativeCompaction::execute_compact() {
+ DBUG_EXECUTE_IF("CumulativeCompaction::execute_compact.block", {
+ auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
+ if (target_tablet_id == _tablet->tablet_id()) {
+ LOG(INFO) << "start debug block "
+ << "CumulativeCompaction::execute_compact.block";
+ while (DebugPoints::instance()->is_enable(
+ "CumulativeCompaction::execute_compact.block")) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(200));
+ }
+ LOG(INFO) << "end debug block "
+ << "CumulativeCompaction::execute_compact.block";
+ }
+ })
+
std::unique_lock<std::mutex>
lock(tablet()->get_cumulative_compaction_lock(), std::try_to_lock);
if (!lock.owns_lock()) {
return Status::Error<TRY_LOCK_FAILED, false>(
diff --git a/be/src/olap/cumulative_compaction_policy.cpp
b/be/src/olap/cumulative_compaction_policy.cpp
index ee7a2b1812a..c812a12b656 100644
--- a/be/src/olap/cumulative_compaction_policy.cpp
+++ b/be/src/olap/cumulative_compaction_policy.cpp
@@ -28,6 +28,7 @@
#include "olap/olap_common.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
+#include "util/debug_points.h"
namespace doris {
@@ -246,6 +247,21 @@ int
SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets, Version*
last_delete_version,
size_t* compaction_score, bool allow_delete) {
+
DBUG_EXECUTE_IF("SizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
{
+ auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
+ if (target_tablet_id == tablet->tablet_id()) {
+ auto start_version = dp->param<int64_t>("start_version", -1);
+ auto end_version = dp->param<int64_t>("end_version", -1);
+ for (auto& rowset : candidate_rowsets) {
+ if (rowset->start_version() >= start_version &&
+ rowset->end_version() <= end_version) {
+ input_rowsets->push_back(rowset);
+ }
+ }
+ }
+ return input_rowsets->size();
+ })
+
size_t promotion_size = tablet->cumulative_promotion_size();
auto max_version = tablet->max_version().first;
int transient_size = 0;
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 46191443506..ae449532182 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -838,6 +838,8 @@ Status SchemaChangeJob::_do_process_alter_tablet(const
TAlterTabletReqV2& reques
return_columns[i] = i;
}
+ DBUG_EXECUTE_IF("SchemaChangeJob::_do_process_alter_tablet.block",
DBUG_BLOCK);
+
// begin to find deltas to convert from base tablet to new tablet so that
// obtain base tablet and new tablet's push lock and header write lock to
prevent loading data
{
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 5cf09398972..d1d48b91ff3 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -512,6 +512,15 @@ Status Tablet::add_rowset(RowsetSharedPtr rowset) {
return Status::OK();
}
+bool Tablet::rowset_exists_unlocked(const RowsetSharedPtr& rowset) {
+ if (auto it = _rs_version_map.find(rowset->version()); it ==
_rs_version_map.end()) {
+ return false;
+ } else if (rowset->rowset_id() != it->second->rowset_id()) {
+ return false;
+ }
+ return true;
+}
+
Status Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
std::vector<RowsetSharedPtr>& to_delete, bool
check_delete) {
// the compaction process allow to compact the single version, eg:
version[4-4].
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index a38b5806174..ca774a618fc 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -173,6 +173,7 @@ public:
// MUST hold EXCLUSIVE `_meta_lock`.
Status modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
std::vector<RowsetSharedPtr>& to_delete, bool
check_delete = false);
+ bool rowset_exists_unlocked(const RowsetSharedPtr& rowset);
Status add_inc_rowset(const RowsetSharedPtr& rowset);
/// Delete stale rowset by timing. This delete policy uses now() minutes
diff --git
a/regression-test/data/fault_injection_p0/test_compaction_on_sc_new_tablet.out
b/regression-test/data/fault_injection_p0/test_compaction_on_sc_new_tablet.out
new file mode 100644
index 00000000000..e7188943a10
--- /dev/null
+++
b/regression-test/data/fault_injection_p0/test_compaction_on_sc_new_tablet.out
@@ -0,0 +1,25 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+1 1 1 1
+2 2 2 2
+3 3 3 3
+4 4 4 4
+5 5 5 5
+6 6 6 6
+7 7 7 7
+8 8 8 8
+9 9 9 9
+10 10 10 10
+
+-- !sql --
+1 9 9 9
+2 2 2 2
+3 3 3 3
+4 4 4 4
+5 5 5 5
+6 6 6 6
+7 7 7 7
+8 8 8 8
+9 9 9 9
+10 10 10 10
+
diff --git
a/regression-test/suites/fault_injection_p0/test_compaction_on_sc_new_tablet.groovy
b/regression-test/suites/fault_injection_p0/test_compaction_on_sc_new_tablet.groovy
new file mode 100644
index 00000000000..2f3c44ef2dd
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_compaction_on_sc_new_tablet.groovy
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.junit.Assert
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_compaction_on_sc_new_tablet", "docker") {
+ def options = new ClusterOptions()
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.enableDebugPoints()
+ options.cloudMode = false
+ options.beConfigs += [
+ 'enable_java_support=false',
+ 'enable_mow_compaction_correctness_check_core=true'
+ ]
+ docker(options) {
+ try {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ def table1 = "test_compaction_on_sc_new_tablet"
+ sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+ sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+ `k` int,
+ `c1` int,
+ `c2` int,
+ `c3` int
+ ) UNIQUE KEY(k)
+ DISTRIBUTED BY HASH(k) BUCKETS 1
+ PROPERTIES (
+ "disable_auto_compaction" = "true",
+ "replication_num" = "1",
+ "enable_unique_key_merge_on_write" = "true"); """
+
+ // [2-11]
+ for (int i = 1; i <= 10; i++) {
+ sql "insert into ${table1} values($i,$i,$i,$i);"
+ }
+ qt_sql "select * from ${table1} order by k;"
+
+
+ def beNodes = sql_return_maparray("show backends;")
+ def tabletStats = sql_return_maparray("show tablets from
${table1};")
+ logger.info("tabletStats: \n${tabletStats}")
+ def tabletStat = tabletStats.get(0)
+ def tabletBackendId = tabletStat.BackendId
+ def tabletId = tabletStat.TabletId
+ def version = tabletStat.Version
+ def tabletBackend;
+ for (def be : beNodes) {
+ if (be.BackendId == tabletBackendId) {
+ tabletBackend = be
+ break;
+ }
+ }
+ logger.info("tablet ${tabletId} is on backend
${tabletBackend.Host} with backendId=${tabletBackend.BackendId},
version=${version}");
+
+ // blocking the schema change process before it gains max version
+
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::_do_process_alter_tablet.block")
+ Thread.sleep(2000)
+
+ sql "alter table ${table1} modify column c1 varchar(100);"
+
+ Thread.sleep(4000)
+
+ // double write [11-22]
+ for (int i = 20; i <= 30; i++) {
+ sql "insert into ${table1} values(1,9,9,9);"
+ }
+
+ tabletStats = sql_return_maparray("show tablets from ${table1};")
+ logger.info("tabletStats: \n${tabletStats}")
+ assertEquals(2, tabletStats.size())
+
+ def oldTabletStat
+ def newTabletStat
+ for (def stat: tabletStats) {
+ if (!stat.TabletId.equals(tabletId)) {
+ newTabletStat = stat
+ } else {
+ oldTabletStat = stat
+ }
+ }
+ logger.info("old tablet=[tablet_id=${oldTabletStat.TabletId},
version=${oldTabletStat.Version}]")
+ logger.info("new tablet=[tablet_id=${newTabletStat.TabletId},
version=${newTabletStat.Version}]")
+
+
+ // trigger cumu compaction on new tablet
+ int start_version = 15
+ int end_version = 17
+ // block compaction process on new tablet
+
GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction::execute_compact.block",
[tablet_id: "${newTabletStat.TabletId}"])
+ // manully set cumu compaction's input rowsets on new tablet
+
GetDebugPoint().enableDebugPointForAllBEs("SizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
+ [tablet_id:"${newTabletStat.TabletId}",
start_version:"${start_version}", end_version:"${end_version}"])
+
+ Thread.sleep(2000)
+
+ logger.info("trigger compaction [15-17] on new tablet
${newTabletStat.TabletId}")
+ def (code, out, err) =
be_run_cumulative_compaction(tabletBackend.Host, tabletBackend.HttpPort,
newTabletStat.TabletId)
+ logger.info("Run compaction: code=" + code + ", out=" + out + ",
err=" + err)
+ Assert.assertEquals(code, 0)
+ def compactJson = parseJson(out.trim())
+ Assert.assertEquals("success", compactJson.status.toLowerCase())
+
+ // make the schema change run to complete and wait for it
+
GetDebugPoint().disableDebugPointForAllBEs("SchemaChangeJob::_do_process_alter_tablet.block")
+ waitForSchemaChangeDone {
+ sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}'
ORDER BY createtime DESC LIMIT 1 """
+ time 2000
+ }
+
+ Thread.sleep(2000)
+
+ // make the cumu compaction run to complete and wait for it
+
GetDebugPoint().disableDebugPointForAllBEs("CumulativeCompaction::execute_compact.block")
+
+
+ // BE should skip to check merged rows in cumu compaction,
otherwise it will cause coredump
+ // becasue [11-22] in new tablet will skip to calc delete bitmap
becase tablet is in NOT_READY state
+ Thread.sleep(7000)
+
+ qt_sql "select * from ${table1} order by k;"
+
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ throw e
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]