This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 914e1a551fb [feat](cloud) Add unused rowset state for CloudTablet
(#51573)
914e1a551fb is described below
commit 914e1a551fb1ad922b3c350a3fcc4f60d2c70a74
Author: Lei Zhang <[email protected]>
AuthorDate: Tue Jun 17 14:42:15 2025 +0800
[feat](cloud) Add unused rowset state for CloudTablet (#51573)
* Add unused rowset state for CloudTablet to recycle file cache data and
delete_bitmap
* the pr is enhancement for the pr
`https://github.com/apache/doris/pull/50973`
---
be/src/cloud/cloud_tablet.cpp | 70 ++--
be/src/cloud/cloud_tablet.h | 2 +
...ache_compaction_and_read_stale_cloud_docker.out | Bin 0 -> 287 bytes
...e_compaction_and_read_stale_cloud_docker.groovy | 390 +++++++++++++++++++++
...ultisegments_and_read_stale_cloud_docker.groovy | 347 ++++++++++++++++++
5 files changed, 787 insertions(+), 22 deletions(-)
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 43749157c9a..2b86d76cf31 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -63,6 +63,9 @@ bvar::LatencyRecorder
g_cu_compaction_get_delete_bitmap_lock_time_ms(
bvar::LatencyRecorder g_base_compaction_get_delete_bitmap_lock_time_ms(
"base_compaction_get_delete_bitmap_lock_time_ms");
+bvar::Adder<int64_t> g_unused_rowsets_count("unused_rowsets_count");
+bvar::Adder<int64_t> g_unused_rowsets_bytes("unused_rowsets_bytes");
+
static constexpr int LOAD_INITIATOR_ID = -1;
CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr
tablet_meta)
@@ -342,17 +345,27 @@ void
CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
// replace existed rowset with `to_add` rowset. This may occur
when:
// 1. schema change converts rowsets which have been double
written to new tablet
// 2. cumu compaction picks single overlapping input rowset
to perform compaction
- if (keys_type() == UNIQUE_KEYS &&
enable_unique_key_merge_on_write()) {
- // add existed rowset to unused_rowsets to remove delete
bitmap
- if (auto find_it = _rs_version_map.find(rs->version());
- find_it != _rs_version_map.end()) {
+
+ // add existed rowset to unused_rowsets to remove delete
bitmap and recycle cached data
+
+ std::vector<RowsetSharedPtr> unused_rowsets;
+ if (auto find_it = _rs_version_map.find(rs->version());
+ find_it != _rs_version_map.end()) {
+ if (find_it->second->rowset_id() == rs->rowset_id()) {
+ LOG(WARNING) << "tablet_id=" << tablet_id()
+ << ", rowset_id=" <<
rs->rowset_id().to_string()
+ << ", existed rowset_id="
+ <<
find_it->second->rowset_id().to_string();
DCHECK(find_it->second->rowset_id() != rs->rowset_id())
<< "tablet_id=" << tablet_id()
<< ", rowset_id=" <<
rs->rowset_id().to_string()
- << ", existed rowset=" <<
find_it->second->rowset_id().to_string();
- _unused_rowsets.emplace(find_it->second->rowset_id(),
find_it->second);
+ << ", existed rowset_id="
+ << find_it->second->rowset_id().to_string();
}
+ unused_rowsets.push_back(find_it->second);
}
+ add_unused_rowsets(unused_rowsets);
+
_tablet_meta->delete_rs_meta_by_version(rs->version(),
nullptr);
_rs_version_map[rs->version()] = rs;
_tablet_meta->add_rowsets_unchecked({rs});
@@ -460,21 +473,16 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() {
}
_reconstruct_version_tracker_if_necessary();
}
+
+ // if the rowset is not used by any query, we can recycle its cached data
early.
recycle_cached_data(expired_rowsets);
if (config::enable_mow_verbose_log) {
LOG_INFO("finish delete_expired_stale_rowset for tablet={}",
tablet_id());
}
+ add_unused_rowsets(expired_rowsets);
if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write() &&
!deleted_stale_rowsets.empty()) {
- // record expired rowsets in unused rowsets
- {
- std::lock_guard<std::mutex> lock(_gc_mutex);
- for (const auto& rowset : expired_rowsets) {
- _unused_rowsets.emplace(rowset->rowset_id(), rowset);
- }
- }
-
// agg delete bitmap for pre rowsets; record unused delete bitmap key
ranges
OlapStopWatch watch;
for (const auto& [version, unused_rowsets] : deleted_stale_rowsets) {
@@ -504,19 +512,36 @@ bool CloudTablet::need_remove_unused_rowsets() {
return !_unused_rowsets.empty() || !_unused_delete_bitmap.empty();
}
+void CloudTablet::add_unused_rowsets(const std::vector<RowsetSharedPtr>&
rowsets) {
+ std::lock_guard<std::mutex> lock(_gc_mutex);
+ for (const auto& rowset : rowsets) {
+ _unused_rowsets[rowset->rowset_id()] = rowset;
+ g_unused_rowsets_bytes << rowset->total_disk_size();
+ }
+ g_unused_rowsets_count << rowsets.size();
+}
+
void CloudTablet::remove_unused_rowsets() {
+ int64_t removed_rowsets_num = 0;
+ int64_t removed_delete_bitmap_num = 0;
+ OlapStopWatch watch;
std::lock_guard<std::mutex> lock(_gc_mutex);
- // 1. remove unused rowsets and delete bitmap
+ // 1. remove unused rowsets's cache data and delete bitmap
for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) {
+ // it->second is std::shared_ptr<Rowset>
auto&& rs = it->second;
if (rs.use_count() > 1) {
- LOG(WARNING) << "Rowset " << rs->rowset_id() << " has " <<
rs.use_count()
- << " references. Can not remove delete bitmap.";
+ LOG(WARNING) << "tablet_id:" << tablet_id() << " rowset: " <<
rs->rowset_id() << " has "
+ << rs.use_count() << " references, it cannot be
removed";
++it;
continue;
}
tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(),
rs->version());
+ rs->clear_cache();
+ g_unused_rowsets_count << -1;
+ g_unused_rowsets_bytes << -rs->total_disk_size();
it = _unused_rowsets.erase(it);
+ removed_rowsets_num++;
}
// 2. remove delete bitmap of pre rowsets
@@ -538,13 +563,14 @@ void CloudTablet::remove_unused_rowsets() {
auto& key_ranges = std::get<1>(*it);
tablet_meta()->delete_bitmap().remove(key_ranges);
it = _unused_delete_bitmap.erase(it);
+ removed_delete_bitmap_num++;
}
- if (!_unused_rowsets.empty() || !_unused_delete_bitmap.empty()) {
- LOG(INFO) << "tablet_id=" << tablet_id()
- << ", unused_rowset size=" << _unused_rowsets.size()
- << ", unused_delete_bitmap size=" <<
_unused_delete_bitmap.size();
- }
+ LOG(INFO) << "tablet_id=" << tablet_id() << ", unused_rowset size=" <<
_unused_rowsets.size()
+ << ", unused_delete_bitmap size=" << _unused_delete_bitmap.size()
+ << ", removed_rowsets_num=" << removed_rowsets_num
+ << ", removed_delete_bitmap_num=" << removed_delete_bitmap_num
+ << ", cost(us)=" << watch.get_elapse_time_us();
}
void CloudTablet::update_base_size(const Rowset& rs) {
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index d63506ddde0..3474857b36a 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -276,6 +276,8 @@ public:
std::map<std::string, int64_t>&
pre_rowset_to_versions);
bool need_remove_unused_rowsets();
+
+ void add_unused_rowsets(const std::vector<RowsetSharedPtr>& rowsets);
void remove_unused_rowsets();
private:
diff --git
a/regression-test/data/compaction/test_filecache_compaction_and_read_stale_cloud_docker.out
b/regression-test/data/compaction/test_filecache_compaction_and_read_stale_cloud_docker.out
new file mode 100644
index 00000000000..8701d535cf3
Binary files /dev/null and
b/regression-test/data/compaction/test_filecache_compaction_and_read_stale_cloud_docker.out
differ
diff --git
a/regression-test/suites/compaction/test_filecache_compaction_and_read_stale_cloud_docker.groovy
b/regression-test/suites/compaction/test_filecache_compaction_and_read_stale_cloud_docker.groovy
new file mode 100644
index 00000000000..8673f249e1d
--- /dev/null
+++
b/regression-test/suites/compaction/test_filecache_compaction_and_read_stale_cloud_docker.groovy
@@ -0,0 +1,390 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.atomic.AtomicBoolean
+import org.apache.doris.regression.suite.ClusterOptions
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("test_filecache_compaction_and_read_stale_cloud_docker", "docker") {
+ def options = new ClusterOptions()
+ options.cloudMode = true
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.enableDebugPoints()
+ options.feConfigs.add("enable_workload_group=false")
+ options.beConfigs.add('compaction_promotion_version_count=5')
+ options.beConfigs.add('tablet_rowset_stale_sweep_time_sec=0')
+ options.beConfigs.add('vacuum_stale_rowsets_interval_s=10')
+ options.beConfigs.add('enable_java_support=false')
+
+ def dbName = ""
+ def testTable = "test_filecache_compaction_and_read_stale_cloud_docker"
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ def backendId_to_backendBrpcPort = [:]
+
+ def triggerCompaction = { tablet ->
+ def compact_type = "cumulative"
+ String tablet_id = tablet.TabletId
+ String trigger_backend_id = tablet.BackendId
+ def be_host = backendId_to_backendIP[trigger_backend_id]
+ def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+ if (compact_type == "cumulative") {
+ def (code_1, out_1, err_1) = be_run_cumulative_compaction(be_host,
be_http_port, tablet_id)
+ logger.info("Run compaction: code=" + code_1 + ", out=" + out_1 +
", err=" + err_1)
+ assertEquals(code_1, 0)
+ return out_1
+ } else if (compact_type == "full") {
+ def (code_2, out_2, err_2) = be_run_full_compaction(be_host,
be_http_port, tablet_id)
+ logger.info("Run compaction: code=" + code_2 + ", out=" + out_2 +
", err=" + err_2)
+ assertEquals(code_2, 0)
+ return out_2
+ } else {
+ assertFalse(True)
+ }
+ }
+
+ def getTabletStatus = { tablet ->
+ String tablet_id = tablet.TabletId
+ String trigger_backend_id = tablet.BackendId
+ def be_host = backendId_to_backendIP[trigger_backend_id]
+ def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET http://${be_host}:${be_http_port}")
+ sb.append("/api/compaction/show?tablet_id=")
+ sb.append(tablet_id)
+
+ String command = sb.toString()
+ logger.info(command)
+ def process = command.execute()
+ def code = process.waitFor()
+ def out = process.getText()
+ logger.info("Get tablet status: =" + code + ", out=" + out)
+ assertEquals(code, 0)
+ def tabletStatus = parseJson(out.trim())
+ return tabletStatus
+ }
+
+ def waitForCompaction = { tablet ->
+ String tablet_id = tablet.TabletId
+ String trigger_backend_id = tablet.BackendId
+ def be_host = backendId_to_backendIP[trigger_backend_id]
+ def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+ def running = true
+ do {
+ Thread.sleep(1000)
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET http://${be_host}:${be_http_port}")
+ sb.append("/api/compaction/run_status?tablet_id=")
+ sb.append(tablet_id)
+
+ String command = sb.toString()
+ logger.info(command)
+ def process = command.execute()
+ def code = process.waitFor()
+ def out = process.getText()
+ logger.info("Get compaction status: code=" + code + ", out=" + out)
+ assertEquals(code, 0)
+ def compactionStatus = parseJson(out.trim())
+ assertEquals("success", compactionStatus.status.toLowerCase())
+ running = compactionStatus.run_status
+ } while (running)
+ }
+
+ def getLocalDeleteBitmapStatus = { tablet ->
+ String tablet_id = tablet.TabletId
+ String trigger_backend_id = tablet.BackendId
+ def be_host = backendId_to_backendIP[trigger_backend_id]
+ def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+ boolean running = true
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET http://${be_host}:${be_http_port}")
+ sb.append("/api/delete_bitmap/count_local?verbose=true&tablet_id=")
+ sb.append(tablet_id)
+
+ String command = sb.toString()
+ logger.info(command)
+ def process = command.execute()
+ def code = process.waitFor()
+ def out = process.getText()
+ logger.info("Get local delete bitmap count status: =" + code + ",
out=" + out)
+ assertEquals(code, 0)
+ def deleteBitmapStatus = parseJson(out.trim())
+ return deleteBitmapStatus
+ }
+
+ def getMsDeleteBitmapStatus = { tablet ->
+ String tablet_id = tablet.TabletId
+ String trigger_backend_id = tablet.BackendId
+ def be_host = backendId_to_backendIP[trigger_backend_id]
+ def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+ boolean running = true
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET http://${be_host}:${be_http_port}")
+ sb.append("/api/delete_bitmap/count_ms?verbose=true&tablet_id=")
+ sb.append(tablet_id)
+
+ String command = sb.toString()
+ logger.info(command)
+ def process = command.execute()
+ def code = process.waitFor()
+ def out = process.getText()
+ logger.info("Get ms delete bitmap count status: =" + code + ", out="
+ out)
+ assertEquals(code, 0)
+ def deleteBitmapStatus = parseJson(out.trim())
+ return deleteBitmapStatus
+ }
+
+ docker(options) {
+ def fes = sql_return_maparray "show frontends"
+ logger.info("frontends: ${fes}")
+ def url = "jdbc:mysql://${fes[0].Host}:${fes[0].QueryPort}/"
+ logger.info("url: " + url)
+ AtomicBoolean query_result = new AtomicBoolean(true)
+ def query = {
+ connect( context.config.jdbcUser, context.config.jdbcPassword,
url) {
+ logger.info("query start")
+ def results = sql_return_maparray """ select * from
${dbName}.${testTable}; """
+ logger.info("query result: " + results)
+ Set<String> keys = new HashSet<>()
+ for (final def result in results) {
+ if (keys.contains(result.k)) {
+ logger.info("find duplicate key: " + result.k)
+ query_result.set(false)
+ break
+ }
+ keys.add(result.k)
+ }
+ logger.info("query finish. query_result: " +
query_result.get())
+ }
+ }
+
+ def result = sql 'SELECT DATABASE()'
+ dbName = result[0][0]
+
+ sql """ DROP TABLE IF EXISTS ${testTable} """
+ sql """
+ create table ${testTable} (`k` int NOT NULL, `v` int NOT NULL)
+ UNIQUE KEY(`k`)
+ DISTRIBUTED BY HASH(`k`) BUCKETS 1
+ PROPERTIES (
+ "enable_unique_key_merge_on_write" = "true",
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true"
+ );
+ """
+ // getBackendIpHttpPort(backendId_to_backendIP,
backendId_to_backendHttpPort);
+ getBackendIpHttpAndBrpcPort(backendId_to_backendIP,
backendId_to_backendHttpPort, backendId_to_backendBrpcPort);
+
+ def tablets = sql_return_maparray """ show tablets from ${testTable};
"""
+ logger.info("tablets: " + tablets)
+ assertEquals(1, tablets.size())
+ def tablet = tablets[0]
+ String tablet_id = tablet.TabletId
+
+ try {
+ Set<String> all_history_stale_rowsets = new HashSet<>();
+
+ // write some data
+ sql """ INSERT INTO ${testTable} VALUES (1,99); """
+ sql """ INSERT INTO ${testTable} VALUES (2,99); """
+ sql """ INSERT INTO ${testTable} VALUES (3,99); """
+ sql """ INSERT INTO ${testTable} VALUES (4,99); """
+ sql """ INSERT INTO ${testTable} VALUES (5,99); """
+ sql "sync"
+ order_qt_sql1 """ select * from ${testTable}; """
+
+ def tablet_status = getTabletStatus(tablet)
+
+ // after compaction, [1-6] versions will become stale rowsets
+ all_history_stale_rowsets.addAll(tablet_status["rowsets"])
+
+ // trigger compaction to generate base rowset
+ assertTrue(triggerCompaction(tablet).contains("Success"))
+ waitForCompaction(tablet)
+ tablet_status = getTabletStatus(tablet)
+ assertEquals(2, tablet_status["rowsets"].size())
+ all_history_stale_rowsets.addAll(tablet_status["rowsets"])
+
+ def ms_dm = getMsDeleteBitmapStatus(tablet)
+ assertEquals(0, ms_dm["delete_bitmap_count"])
+ order_qt_sql2 "select * from ${testTable}"
+
+ // write some data
+ sql """ INSERT INTO ${testTable} VALUES (1,99); """
+ sql """ INSERT INTO ${testTable} VALUES (2,99); """
+ sql """ INSERT INTO ${testTable} VALUES (3,99); """
+ sql """ INSERT INTO ${testTable} VALUES (4,99); """
+ sql """ INSERT INTO ${testTable} VALUES (5,99); """
+ sql """ sync """
+ order_qt_sql3 "select * from ${testTable}"
+ tablet_status = getTabletStatus(tablet)
+ assertEquals(7, tablet_status["rowsets"].size())
+ all_history_stale_rowsets.addAll(tablet_status["rowsets"])
+ ms_dm = getMsDeleteBitmapStatus(tablet)
+ assertEquals(5, ms_dm["delete_bitmap_count"])
+
+ // trigger and block one query
+
GetDebugPoint().enableDebugPointForAllBEs("NewOlapScanner::_init_tablet_reader_params.block")
+
GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowset")
+
GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
+ [tablet_id: "${tablet_id}", start_version: 7, end_version:
11]);
+ Thread query_thread = new Thread(() -> query())
+ query_thread.start()
+ sleep(100)
+
+ // trigger compaction
+ // getTabletStatus(tablet)
+ assertTrue(triggerCompaction(tablet).contains("Success"))
+ waitForCompaction(tablet)
+ logger.info("compaction2 finished")
+ // check rowset count
+ tablet_status = getTabletStatus(tablet)
+ assertEquals(3, tablet_status["rowsets"].size())
+ all_history_stale_rowsets.addAll(tablet_status["rowsets"])
+ // check ms delete bitmap count
+ ms_dm = getMsDeleteBitmapStatus(tablet)
+ assertEquals(1, ms_dm["delete_bitmap_count"])
+ assertEquals(5, ms_dm["cardinality"])
+ // check local delete bitmap count
+ def local_dm = getLocalDeleteBitmapStatus(tablet)
+ assertEquals(5, local_dm["delete_bitmap_count"])
+ assertEquals(9, local_dm["cardinality"])
+
+ // wait for stale rowsets are deleted
+ boolean is_stale_rowsets_deleted = false
+ for (int i = 0; i < 100; i++) {
+ tablet_status = getTabletStatus(tablet)
+ if (tablet_status["stale_rowsets"].size() == 0) {
+ is_stale_rowsets_deleted = true
+ break
+ }
+ sleep(500)
+ }
+ assertTrue(is_stale_rowsets_deleted, "stale rowsets are not
deleted")
+ // check to delete bitmap of stale rowsets is not deleted
+ sleep(1000)
+ def local_dm_status = getLocalDeleteBitmapStatus(tablet)
+ assertEquals(5, local_dm_status["delete_bitmap_count"])
+
+ // unnlock query and check no duplicated keys
+
GetDebugPoint().disableDebugPointForAllBEs("NewOlapScanner::_init_tablet_reader_params.block")
+ query_thread.join()
+ assertTrue(query_result.get(), "find duplicated keys")
+
+ // check delete bitmap of compaction2 stale rowsets are deleted
+ // write some data
+ sql """ INSERT INTO ${testTable} VALUES (1,99); """
+ sql """ INSERT INTO ${testTable} VALUES (2,99); """
+ sql """ INSERT INTO ${testTable} VALUES (3,99); """
+ sql """ INSERT INTO ${testTable} VALUES (4,99); """
+ sql """ INSERT INTO ${testTable} VALUES (5,100); """
+ sql "sync"
+ order_qt_sql4 "select * from ${testTable}"
+ logger.info("order_qt_sql4 finished")
+ tablet_status = getTabletStatus(tablet)
+ all_history_stale_rowsets.addAll(tablet_status["rowsets"])
+ getMsDeleteBitmapStatus(tablet)
+ // trigger compaction
+
GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
+ [tablet_id: "${tablet_id}", start_version: 12,
end_version: 16]);
+ tablet_status = getTabletStatus(tablet)
+ all_history_stale_rowsets.addAll(tablet_status["rowsets"])
+ assertTrue(triggerCompaction(tablet).contains("Success"))
+ waitForCompaction(tablet)
+ boolean is_compaction_finished = false
+ for (int i = 0; i < 100; i++) {
+ tablet_status = getTabletStatus(tablet)
+ all_history_stale_rowsets.addAll(tablet_status["rowsets"])
+ if (tablet_status["rowsets"].size() == 4) {
+ logger.info("final tablet status: ${tablet_status}")
+ is_compaction_finished = true
+ break
+ }
+ sleep(500)
+ }
+ assertTrue(is_compaction_finished, "compaction is not finished")
+ logger.info("compaction3 finished")
+ // check ms delete bitmap count
+ ms_dm = getMsDeleteBitmapStatus(tablet)
+ assertEquals(2, ms_dm["delete_bitmap_count"])
+ assertEquals(10, ms_dm["cardinality"])
+ // check delete bitmap count
+ logger.info("check local delete bitmap is deleted")
+ boolean is_local_dm_deleted = false
+ for (int i = 0; i < 100; i++) {
+ local_dm_status = getLocalDeleteBitmapStatus(tablet)
+ if (local_dm_status["delete_bitmap_count"] == 2) {
+ assertEquals(10, local_dm_status["cardinality"])
+ is_local_dm_deleted = true
+ break
+ }
+ sleep(500)
+ }
+ assertTrue(is_local_dm_deleted, "delete bitmap of compaction2
stale rowsets are not deleted")
+ order_qt_sql5 "select * from ${testTable}"
+
+ tablet_status = getTabletStatus(tablet)
+ def final_rowsets = tablet_status["rowsets"]
+
+ // sleep for vacuum_stale_rowsets_interval_s=10 seconds to wait
for unused rowsets are deleted
+ sleep(21000)
+
+ def be_host = backendId_to_backendIP[tablet.BackendId]
+ def be_http_port = backendId_to_backendHttpPort[tablet.BackendId]
+ for (int i = 0; i < all_history_stale_rowsets.size(); i++) {
+ def rowsetStr = all_history_stale_rowsets[i]
+ // [12-12] 1 DATA NONOVERLAPPING
02000000000000124843c92c13625daa8296c20957119893 1011.00 B
+ def start_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[0].toInteger()
+ def end_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[1].toInteger()
+ def rowset_id = rowsetStr.split(" ")[4]
+ if (start_version == 0 || start_version != end_version) {
+ continue
+ }
+
+ logger.info("rowset ${i}, start: ${start_version}, end:
${end_version}, id: ${rowset_id}")
+ def data =
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat",
true)
+ logger.info("file cache data: ${data}")
+ assertTrue(data.size() == 0)
+ }
+
+ for (int i = 0; i < final_rowsets.size(); i++) {
+ def rowsetStr = final_rowsets[i]
+ def start_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[0].toInteger()
+ def end_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[1].toInteger()
+ def rowset_id = rowsetStr.split(" ")[4]
+ if (start_version == 0) {
+ continue
+ }
+
+ logger.info("final rowset ${i}, start: ${start_version}, end:
${end_version}, id: ${rowset_id}")
+ def data =
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat",
true)
+ logger.info("file cache data: ${data}")
+ assertTrue(data.size() > 0)
+ }
+
+ def (code_0, out_0, err_0) = curl("GET",
"http://${be_host}:${backendId_to_backendBrpcPort[tablet.BackendId]}/vars/unused_rowsets_count")
+ logger.info("out_0: ${out_0}")
+ def unusedRowsetsCount =
out_0.trim().split(":")[1].trim().toInteger()
+ assertEquals(0, unusedRowsetsCount)
+
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+ }
+}
diff --git
a/regression-test/suites/compaction/test_filecache_compaction_multisegments_and_read_stale_cloud_docker.groovy
b/regression-test/suites/compaction/test_filecache_compaction_multisegments_and_read_stale_cloud_docker.groovy
new file mode 100644
index 00000000000..4b661316090
--- /dev/null
+++
b/regression-test/suites/compaction/test_filecache_compaction_multisegments_and_read_stale_cloud_docker.groovy
@@ -0,0 +1,347 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.atomic.AtomicBoolean
+import org.apache.doris.regression.suite.ClusterOptions
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("test_filecache_compaction_multisegments_and_read_stale_cloud_docker",
"docker") {
+ def options = new ClusterOptions()
+ options.cloudMode = true
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.enableDebugPoints()
+ options.feConfigs.add("enable_workload_group=false")
+ options.beConfigs.add('compaction_promotion_version_count=5')
+ options.beConfigs.add('tablet_rowset_stale_sweep_time_sec=0')
+ options.beConfigs.add('vacuum_stale_rowsets_interval_s=10')
+ options.beConfigs.add('enable_java_support=false')
+ options.beConfigs.add('doris_scanner_row_bytes=1')
+
+ def dbName = ""
+ def testTable = "test_filecache_multisegments_and_read_stale"
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ def backendId_to_backendBrpcPort = [:]
+
+ def triggerCompaction = { tablet ->
+ def compact_type = "cumulative"
+ String tablet_id = tablet.TabletId
+ String trigger_backend_id = tablet.BackendId
+ def be_host = backendId_to_backendIP[trigger_backend_id]
+ def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+ if (compact_type == "cumulative") {
+ def (code_1, out_1, err_1) = be_run_cumulative_compaction(be_host,
be_http_port, tablet_id)
+ logger.info("Run compaction: code=" + code_1 + ", out=" + out_1 +
", err=" + err_1)
+ assertEquals(code_1, 0)
+ return out_1
+ } else if (compact_type == "full") {
+ def (code_2, out_2, err_2) = be_run_full_compaction(be_host,
be_http_port, tablet_id)
+ logger.info("Run compaction: code=" + code_2 + ", out=" + out_2 +
", err=" + err_2)
+ assertEquals(code_2, 0)
+ return out_2
+ } else {
+ assertFalse(True)
+ }
+ }
+
+ def getTabletStatus = { tablet, rowsetIndex, lastRowsetSegmentNum,
enableAssert = false, outputRowsets = null ->
+ String compactionUrl = tablet["CompactionStatus"]
+ def (code, out, err) = curl("GET", compactionUrl)
+ logger.info("Show tablets status: code=" + code + ", out=" + out + ",
err=" + err)
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ if (outputRowsets != null) {
+ outputRowsets.addAll(tabletJson.rowsets)
+ }
+
+ assertTrue(tabletJson.rowsets.size() >= rowsetIndex)
+ def rowset = tabletJson.rowsets.get(rowsetIndex - 1)
+ logger.info("rowset: ${rowset}")
+ int start_index = rowset.indexOf("]")
+ int end_index = rowset.indexOf("DATA")
+ def segmentNumStr = rowset.substring(start_index + 1, end_index).trim()
+ logger.info("segmentNumStr: ${segmentNumStr}")
+ if (enableAssert) {
+ assertEquals(lastRowsetSegmentNum, Integer.parseInt(segmentNumStr))
+ } else {
+ return lastRowsetSegmentNum == Integer.parseInt(segmentNumStr);
+ }
+ }
+
+ def waitForCompaction = { tablet ->
+ String tablet_id = tablet.TabletId
+ String trigger_backend_id = tablet.BackendId
+ def be_host = backendId_to_backendIP[trigger_backend_id]
+ def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+ def running = true
+ do {
+ Thread.sleep(1000)
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET http://${be_host}:${be_http_port}")
+ sb.append("/api/compaction/run_status?tablet_id=")
+ sb.append(tablet_id)
+
+ String command = sb.toString()
+ logger.info(command)
+ def process = command.execute()
+ def code = process.waitFor()
+ def out = process.getText()
+ logger.info("Get compaction status: code=" + code + ", out=" + out)
+ assertEquals(code, 0)
+ def compactionStatus = parseJson(out.trim())
+ assertEquals("success", compactionStatus.status.toLowerCase())
+ running = compactionStatus.run_status
+ } while (running)
+ }
+
+ def getLocalDeleteBitmapStatus = { tablet ->
+ String tablet_id = tablet.TabletId
+ String trigger_backend_id = tablet.BackendId
+ def be_host = backendId_to_backendIP[trigger_backend_id]
+ def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+ boolean running = true
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET http://${be_host}:${be_http_port}")
+ sb.append("/api/delete_bitmap/count_local?verbose=true&tablet_id=")
+ sb.append(tablet_id)
+
+ String command = sb.toString()
+ logger.info(command)
+ def process = command.execute()
+ def code = process.waitFor()
+ def out = process.getText()
+ logger.info("Get local delete bitmap count status: =" + code + ",
out=" + out)
+ assertEquals(code, 0)
+ def deleteBitmapStatus = parseJson(out.trim())
+ return deleteBitmapStatus
+ }
+
+ def getMsDeleteBitmapStatus = { tablet ->
+ String tablet_id = tablet.TabletId
+ String trigger_backend_id = tablet.BackendId
+ def be_host = backendId_to_backendIP[trigger_backend_id]
+ def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+ boolean running = true
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET http://${be_host}:${be_http_port}")
+ sb.append("/api/delete_bitmap/count_ms?verbose=true&tablet_id=")
+ sb.append(tablet_id)
+
+ String command = sb.toString()
+ logger.info(command)
+ def process = command.execute()
+ def code = process.waitFor()
+ def out = process.getText()
+ logger.info("Get ms delete bitmap count status: =" + code + ", out="
+ out)
+ assertEquals(code, 0)
+ def deleteBitmapStatus = parseJson(out.trim())
+ return deleteBitmapStatus
+ }
+
+ docker(options) {
+ def fes = sql_return_maparray "show frontends"
+ logger.info("frontends: ${fes}")
+ def url = "jdbc:mysql://${fes[0].Host}:${fes[0].QueryPort}/"
+ logger.info("url: " + url)
+ AtomicBoolean query_result = new AtomicBoolean(true)
+ def query = {
+ connect( context.config.jdbcUser, context.config.jdbcPassword,
url) {
+ logger.info("query start")
+ def results = sql_return_maparray """ select * from
${dbName}.${testTable}; """
+ logger.info("query result: " + results)
+ Set<String> keys = new HashSet<>()
+ for (final def result in results) {
+ if (keys.contains(result.k)) {
+ logger.info("find duplicate key: " + result.k)
+ query_result.set(false)
+ break
+ }
+ keys.add(result.k)
+ }
+ logger.info("query finish. query_result: " +
query_result.get())
+ }
+ }
+
+ def result = sql 'SELECT DATABASE()'
+ dbName = result[0][0]
+
+ sql """ DROP TABLE IF EXISTS ${testTable} """
+ sql """ CREATE TABLE IF NOT EXISTS ${testTable} (
+ `k1` int(11) NULL,
+ `k2` int(11) NULL,
+ `v3` int(11) NULL,
+ `v4` int(11) NULL
+ ) unique KEY(`k1`, `k2`)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "disable_auto_compaction" = "true"
+ );
+ """
+
+ // getBackendIpHttpPort(backendId_to_backendIP,
backendId_to_backendHttpPort);
+ getBackendIpHttpAndBrpcPort(backendId_to_backendIP,
backendId_to_backendHttpPort, backendId_to_backendBrpcPort);
+
+ def tablets = sql_return_maparray """ show tablets from ${testTable};
"""
+ logger.info("tablets: " + tablets)
+ assertEquals(1, tablets.size())
+ def tablet = tablets[0]
+ String tablet_id = tablet.TabletId
+ def backend_id = tablet.BackendId
+
+ GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush")
+
GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowset")
+
GetDebugPoint().enableDebugPointForAllBEs("Tablet.delete_expired_stale_rowset.start_delete_unused_rowset")
+
+ Set<String> all_history_stale_rowsets = new HashSet<>();
+ try {
+ // load 1
+ streamLoad {
+ table "${testTable}"
+ set 'column_separator', ','
+ set 'compress_type', 'GZ'
+ file 'test_schema_change_add_key_column.csv.gz'
+ time 10000 // limit inflight 10s
+
+ check { res, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ def json = parseJson(res)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(8192, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+ sql "sync"
+ def rowCount1 = sql """ select count() from ${testTable}; """
+ logger.info("rowCount1: ${rowCount1}")
+ // check generate 3 segments
+ getTabletStatus(tablet, 2, 3, true, all_history_stale_rowsets)
+
+ // trigger compaction
+
GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
+ [tablet_id: "${tablet.TabletId}", start_version: 2,
end_version: 2])
+ def (code, out, err) =
be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Run compaction: code=" + code + ", out=" + out + ",
err=" + err)
+ assertEquals(code, 0)
+ def compactJson = parseJson(out.trim())
+ logger.info("compact json: " + compactJson)
+ // check generate 1 segments
+ for (int i = 0; i < 20; i++) {
+ if (getTabletStatus(tablet, 2, 1, false,
all_history_stale_rowsets)) {
+ break
+ }
+ sleep(100)
+ }
+ getTabletStatus(tablet, 2, 1, false, all_history_stale_rowsets)
+ sql """ select * from ${testTable} limit 1; """
+
+ // load 2
+ streamLoad {
+ table "${testTable}"
+ set 'column_separator', ','
+ set 'compress_type', 'GZ'
+ file 'test_schema_change_add_key_column1.csv.gz'
+ time 10000 // limit inflight 10s
+
+ check { res, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ def json = parseJson(res)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(20480, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+ sql "sync"
+ def rowCount2 = sql """ select count() from ${testTable}; """
+ logger.info("rowCount2: ${rowCount2}")
+ // check generate 3 segments
+ getTabletStatus(tablet, 3, 6, false, all_history_stale_rowsets)
+ def local_dm = getLocalDeleteBitmapStatus(tablet)
+ logger.info("local delete bitmap 1: " + local_dm)
+
+ // trigger compaction for load 2
+
GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
+ [tablet_id: "${tablet.TabletId}", start_version: 3,
end_version: 3])
+ (code, out, err) =
be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Run compaction: code=" + code + ", out=" + out + ",
err=" + err)
+ assertEquals(code, 0)
+ compactJson = parseJson(out.trim())
+ logger.info("compact json: " + compactJson)
+ waitForCompaction(tablet)
+ // check generate 1 segments
+ for (int i = 0; i < 20; i++) {
+ if (getTabletStatus(tablet, 3, 1, false,
all_history_stale_rowsets)) {
+ break
+ }
+ sleep(100)
+ }
+ getTabletStatus(tablet, 3, 1, false, all_history_stale_rowsets)
+
+
GetDebugPoint().enableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.vacuum_stale_rowsets")
// cloud
+
GetDebugPoint().enableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.start_delete_unused_rowset")
// local
+ local_dm = getLocalDeleteBitmapStatus(tablet)
+ logger.info("local delete bitmap 2: " + local_dm)
+ assertEquals(1, local_dm["delete_bitmap_count"])
+
+
+ // sleep for vacuum_stale_rowsets_interval_s=10 seconds to wait
for unused rowsets are deleted
+ sleep(21000)
+
+ def be_host = backendId_to_backendIP[tablet.BackendId]
+ def be_http_port = backendId_to_backendHttpPort[tablet.BackendId]
+ logger.info("be_host: ${be_host}, be_http_port: ${be_http_port},
BrpcPort: ${backendId_to_backendBrpcPort[tablet.BackendId]}")
+
+ for (int i = 0; i < all_history_stale_rowsets.size(); i++) {
+ def rowsetStr = all_history_stale_rowsets[i]
+ // [12-12] 1 DATA NONOVERLAPPING
02000000000000124843c92c13625daa8296c20957119893 1011.00 B
+ def start_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[0].toInteger()
+ def end_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[1].toInteger()
+ def rowset_id = rowsetStr.split(" ")[4]
+ if (start_version == 0) {
+ continue
+ }
+
+ int start_index = rowsetStr.indexOf("]")
+ int end_index = rowsetStr.indexOf("DATA")
+ def segmentNum = rowsetStr.substring(start_index + 1,
end_index).trim().toInteger()
+
+ logger.info("rowset ${i}, start: ${start_version}, end:
${end_version}, id: ${rowset_id}, segment: ${segmentNum}")
+ def data =
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat",
true)
+ logger.info("file cache data: ${data}")
+ if (segmentNum <= 1) {
+ assertTrue(data.size() > 0)
+ } else {
+ assertTrue(data.size() == 0)
+ }
+ }
+
+ def (code_0, out_0, err_0) = curl("GET",
"http://${be_host}:${backendId_to_backendBrpcPort[tablet.BackendId]}/vars/unused_rowsets_count")
+ logger.info("out_0: ${out_0}")
+ def unusedRowsetsCount =
out_0.trim().split(":")[1].trim().toInteger()
+ assertEquals(0, unusedRowsetsCount)
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]