This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 914e1a551fb [feat](cloud) Add unused rowset state for CloudTablet 
(#51573)
914e1a551fb is described below

commit 914e1a551fb1ad922b3c350a3fcc4f60d2c70a74
Author: Lei Zhang <[email protected]>
AuthorDate: Tue Jun 17 14:42:15 2025 +0800

    [feat](cloud) Add unused rowset state for CloudTablet (#51573)
    
    * Add unused rowset state for CloudTablet to recycle file cache data and
    delete_bitmap
    * the pr is enhancement for the pr
    `https://github.com/apache/doris/pull/50973`
---
 be/src/cloud/cloud_tablet.cpp                      |  70 ++--
 be/src/cloud/cloud_tablet.h                        |   2 +
 ...ache_compaction_and_read_stale_cloud_docker.out | Bin 0 -> 287 bytes
 ...e_compaction_and_read_stale_cloud_docker.groovy | 390 +++++++++++++++++++++
 ...ultisegments_and_read_stale_cloud_docker.groovy | 347 ++++++++++++++++++
 5 files changed, 787 insertions(+), 22 deletions(-)

diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 43749157c9a..2b86d76cf31 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -63,6 +63,9 @@ bvar::LatencyRecorder 
g_cu_compaction_get_delete_bitmap_lock_time_ms(
 bvar::LatencyRecorder g_base_compaction_get_delete_bitmap_lock_time_ms(
         "base_compaction_get_delete_bitmap_lock_time_ms");
 
+bvar::Adder<int64_t> g_unused_rowsets_count("unused_rowsets_count");
+bvar::Adder<int64_t> g_unused_rowsets_bytes("unused_rowsets_bytes");
+
 static constexpr int LOAD_INITIATOR_ID = -1;
 
 CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr 
tablet_meta)
@@ -342,17 +345,27 @@ void 
CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
                 // replace existed rowset with `to_add` rowset. This may occur 
when:
                 //  1. schema change converts rowsets which have been double 
written to new tablet
                 //  2. cumu compaction picks single overlapping input rowset 
to perform compaction
-                if (keys_type() == UNIQUE_KEYS && 
enable_unique_key_merge_on_write()) {
-                    // add existed rowset to unused_rowsets to remove delete 
bitmap
-                    if (auto find_it = _rs_version_map.find(rs->version());
-                        find_it != _rs_version_map.end()) {
+
+                // add existed rowset to unused_rowsets to remove delete 
bitmap and recycle cached data
+
+                std::vector<RowsetSharedPtr> unused_rowsets;
+                if (auto find_it = _rs_version_map.find(rs->version());
+                    find_it != _rs_version_map.end()) {
+                    if (find_it->second->rowset_id() == rs->rowset_id()) {
+                        LOG(WARNING) << "tablet_id=" << tablet_id()
+                                     << ", rowset_id=" << 
rs->rowset_id().to_string()
+                                     << ", existed rowset_id="
+                                     << 
find_it->second->rowset_id().to_string();
                         DCHECK(find_it->second->rowset_id() != rs->rowset_id())
                                 << "tablet_id=" << tablet_id()
                                 << ", rowset_id=" << 
rs->rowset_id().to_string()
-                                << ", existed rowset=" << 
find_it->second->rowset_id().to_string();
-                        _unused_rowsets.emplace(find_it->second->rowset_id(), 
find_it->second);
+                                << ", existed rowset_id="
+                                << find_it->second->rowset_id().to_string();
                     }
+                    unused_rowsets.push_back(find_it->second);
                 }
+                add_unused_rowsets(unused_rowsets);
+
                 _tablet_meta->delete_rs_meta_by_version(rs->version(), 
nullptr);
                 _rs_version_map[rs->version()] = rs;
                 _tablet_meta->add_rowsets_unchecked({rs});
@@ -460,21 +473,16 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() {
         }
         _reconstruct_version_tracker_if_necessary();
     }
+
+    // if the rowset is not used by any query, we can recycle its cached data 
early.
     recycle_cached_data(expired_rowsets);
     if (config::enable_mow_verbose_log) {
         LOG_INFO("finish delete_expired_stale_rowset for tablet={}", 
tablet_id());
     }
 
+    add_unused_rowsets(expired_rowsets);
     if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write() &&
         !deleted_stale_rowsets.empty()) {
-        // record expired rowsets in unused rowsets
-        {
-            std::lock_guard<std::mutex> lock(_gc_mutex);
-            for (const auto& rowset : expired_rowsets) {
-                _unused_rowsets.emplace(rowset->rowset_id(), rowset);
-            }
-        }
-
         // agg delete bitmap for pre rowsets; record unused delete bitmap key 
ranges
         OlapStopWatch watch;
         for (const auto& [version, unused_rowsets] : deleted_stale_rowsets) {
@@ -504,19 +512,36 @@ bool CloudTablet::need_remove_unused_rowsets() {
     return !_unused_rowsets.empty() || !_unused_delete_bitmap.empty();
 }
 
+void CloudTablet::add_unused_rowsets(const std::vector<RowsetSharedPtr>& 
rowsets) {
+    std::lock_guard<std::mutex> lock(_gc_mutex);
+    for (const auto& rowset : rowsets) {
+        _unused_rowsets[rowset->rowset_id()] = rowset;
+        g_unused_rowsets_bytes << rowset->total_disk_size();
+    }
+    g_unused_rowsets_count << rowsets.size();
+}
+
 void CloudTablet::remove_unused_rowsets() {
+    int64_t removed_rowsets_num = 0;
+    int64_t removed_delete_bitmap_num = 0;
+    OlapStopWatch watch;
     std::lock_guard<std::mutex> lock(_gc_mutex);
-    // 1. remove unused rowsets and delete bitmap
+    // 1. remove unused rowsets's cache data and delete bitmap
     for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) {
+        // it->second is std::shared_ptr<Rowset>
         auto&& rs = it->second;
         if (rs.use_count() > 1) {
-            LOG(WARNING) << "Rowset " << rs->rowset_id() << " has " << 
rs.use_count()
-                         << " references. Can not remove delete bitmap.";
+            LOG(WARNING) << "tablet_id:" << tablet_id() << " rowset: " << 
rs->rowset_id() << " has "
+                         << rs.use_count() << " references, it cannot be 
removed";
             ++it;
             continue;
         }
         tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(), 
rs->version());
+        rs->clear_cache();
+        g_unused_rowsets_count << -1;
+        g_unused_rowsets_bytes << -rs->total_disk_size();
         it = _unused_rowsets.erase(it);
+        removed_rowsets_num++;
     }
 
     // 2. remove delete bitmap of pre rowsets
@@ -538,13 +563,14 @@ void CloudTablet::remove_unused_rowsets() {
         auto& key_ranges = std::get<1>(*it);
         tablet_meta()->delete_bitmap().remove(key_ranges);
         it = _unused_delete_bitmap.erase(it);
+        removed_delete_bitmap_num++;
     }
 
-    if (!_unused_rowsets.empty() || !_unused_delete_bitmap.empty()) {
-        LOG(INFO) << "tablet_id=" << tablet_id()
-                  << ", unused_rowset size=" << _unused_rowsets.size()
-                  << ", unused_delete_bitmap size=" << 
_unused_delete_bitmap.size();
-    }
+    LOG(INFO) << "tablet_id=" << tablet_id() << ", unused_rowset size=" << 
_unused_rowsets.size()
+              << ", unused_delete_bitmap size=" << _unused_delete_bitmap.size()
+              << ", removed_rowsets_num=" << removed_rowsets_num
+              << ", removed_delete_bitmap_num=" << removed_delete_bitmap_num
+              << ", cost(us)=" << watch.get_elapse_time_us();
 }
 
 void CloudTablet::update_base_size(const Rowset& rs) {
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index d63506ddde0..3474857b36a 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -276,6 +276,8 @@ public:
                                           std::map<std::string, int64_t>& 
pre_rowset_to_versions);
 
     bool need_remove_unused_rowsets();
+
+    void add_unused_rowsets(const std::vector<RowsetSharedPtr>& rowsets);
     void remove_unused_rowsets();
 
 private:
diff --git 
a/regression-test/data/compaction/test_filecache_compaction_and_read_stale_cloud_docker.out
 
b/regression-test/data/compaction/test_filecache_compaction_and_read_stale_cloud_docker.out
new file mode 100644
index 00000000000..8701d535cf3
Binary files /dev/null and 
b/regression-test/data/compaction/test_filecache_compaction_and_read_stale_cloud_docker.out
 differ
diff --git 
a/regression-test/suites/compaction/test_filecache_compaction_and_read_stale_cloud_docker.groovy
 
b/regression-test/suites/compaction/test_filecache_compaction_and_read_stale_cloud_docker.groovy
new file mode 100644
index 00000000000..8673f249e1d
--- /dev/null
+++ 
b/regression-test/suites/compaction/test_filecache_compaction_and_read_stale_cloud_docker.groovy
@@ -0,0 +1,390 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.atomic.AtomicBoolean
+import org.apache.doris.regression.suite.ClusterOptions
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("test_filecache_compaction_and_read_stale_cloud_docker", "docker") {
+    def options = new ClusterOptions()
+    options.cloudMode = true
+    options.setFeNum(1)
+    options.setBeNum(1)
+    options.enableDebugPoints()
+    options.feConfigs.add("enable_workload_group=false")
+    options.beConfigs.add('compaction_promotion_version_count=5')
+    options.beConfigs.add('tablet_rowset_stale_sweep_time_sec=0')
+    options.beConfigs.add('vacuum_stale_rowsets_interval_s=10')
+    options.beConfigs.add('enable_java_support=false')
+
+    def dbName = ""
+    def testTable = "test_filecache_compaction_and_read_stale_cloud_docker"
+    def backendId_to_backendIP = [:]
+    def backendId_to_backendHttpPort = [:]
+    def backendId_to_backendBrpcPort = [:]
+
+    def triggerCompaction = { tablet ->
+        def compact_type = "cumulative"
+        String tablet_id = tablet.TabletId
+        String trigger_backend_id = tablet.BackendId
+        def be_host = backendId_to_backendIP[trigger_backend_id]
+        def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+        if (compact_type == "cumulative") {
+            def (code_1, out_1, err_1) = be_run_cumulative_compaction(be_host, 
be_http_port, tablet_id)
+            logger.info("Run compaction: code=" + code_1 + ", out=" + out_1 + 
", err=" + err_1)
+            assertEquals(code_1, 0)
+            return out_1
+        } else if (compact_type == "full") {
+            def (code_2, out_2, err_2) = be_run_full_compaction(be_host, 
be_http_port, tablet_id)
+            logger.info("Run compaction: code=" + code_2 + ", out=" + out_2 + 
", err=" + err_2)
+            assertEquals(code_2, 0)
+            return out_2
+        } else {
+            assertFalse(True)
+        }
+    }
+
+    def getTabletStatus = { tablet ->
+        String tablet_id = tablet.TabletId
+        String trigger_backend_id = tablet.BackendId
+        def be_host = backendId_to_backendIP[trigger_backend_id]
+        def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+        StringBuilder sb = new StringBuilder();
+        sb.append("curl -X GET http://${be_host}:${be_http_port}";)
+        sb.append("/api/compaction/show?tablet_id=")
+        sb.append(tablet_id)
+
+        String command = sb.toString()
+        logger.info(command)
+        def process = command.execute()
+        def code = process.waitFor()
+        def out = process.getText()
+        logger.info("Get tablet status:  =" + code + ", out=" + out)
+        assertEquals(code, 0)
+        def tabletStatus = parseJson(out.trim())
+        return tabletStatus
+    }
+
+    def waitForCompaction = { tablet ->
+        String tablet_id = tablet.TabletId
+        String trigger_backend_id = tablet.BackendId
+        def be_host = backendId_to_backendIP[trigger_backend_id]
+        def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+        def running = true
+        do {
+            Thread.sleep(1000)
+            StringBuilder sb = new StringBuilder();
+            sb.append("curl -X GET http://${be_host}:${be_http_port}";)
+            sb.append("/api/compaction/run_status?tablet_id=")
+            sb.append(tablet_id)
+
+            String command = sb.toString()
+            logger.info(command)
+            def process = command.execute()
+            def code = process.waitFor()
+            def out = process.getText()
+            logger.info("Get compaction status: code=" + code + ", out=" + out)
+            assertEquals(code, 0)
+            def compactionStatus = parseJson(out.trim())
+            assertEquals("success", compactionStatus.status.toLowerCase())
+            running = compactionStatus.run_status
+        } while (running)
+    }
+
+    def getLocalDeleteBitmapStatus = { tablet ->
+        String tablet_id = tablet.TabletId
+        String trigger_backend_id = tablet.BackendId
+        def be_host = backendId_to_backendIP[trigger_backend_id]
+        def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+        boolean running = true
+        StringBuilder sb = new StringBuilder();
+        sb.append("curl -X GET http://${be_host}:${be_http_port}";)
+        sb.append("/api/delete_bitmap/count_local?verbose=true&tablet_id=")
+        sb.append(tablet_id)
+
+        String command = sb.toString()
+        logger.info(command)
+        def process = command.execute()
+        def code = process.waitFor()
+        def out = process.getText()
+        logger.info("Get local delete bitmap count status:  =" + code + ", 
out=" + out)
+        assertEquals(code, 0)
+        def deleteBitmapStatus = parseJson(out.trim())
+        return deleteBitmapStatus
+    }
+
+    def getMsDeleteBitmapStatus = { tablet ->
+        String tablet_id = tablet.TabletId
+        String trigger_backend_id = tablet.BackendId
+        def be_host = backendId_to_backendIP[trigger_backend_id]
+        def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+        boolean running = true
+        StringBuilder sb = new StringBuilder();
+        sb.append("curl -X GET http://${be_host}:${be_http_port}";)
+        sb.append("/api/delete_bitmap/count_ms?verbose=true&tablet_id=")
+        sb.append(tablet_id)
+
+        String command = sb.toString()
+        logger.info(command)
+        def process = command.execute()
+        def code = process.waitFor()
+        def out = process.getText()
+        logger.info("Get ms delete bitmap count status:  =" + code + ", out=" 
+ out)
+        assertEquals(code, 0)
+        def deleteBitmapStatus = parseJson(out.trim())
+        return deleteBitmapStatus
+    }
+
+    docker(options) {
+        def fes = sql_return_maparray "show frontends"
+        logger.info("frontends: ${fes}")
+        def url = "jdbc:mysql://${fes[0].Host}:${fes[0].QueryPort}/"
+        logger.info("url: " + url)
+        AtomicBoolean query_result = new AtomicBoolean(true)
+        def query = {
+            connect( context.config.jdbcUser,  context.config.jdbcPassword,  
url) {
+                logger.info("query start")
+                def results = sql_return_maparray """ select * from 
${dbName}.${testTable}; """
+                logger.info("query result: " + results)
+                Set<String> keys = new HashSet<>()
+                for (final def result in results) {
+                    if (keys.contains(result.k)) {
+                        logger.info("find duplicate key: " + result.k)
+                        query_result.set(false)
+                        break
+                    }
+                    keys.add(result.k)
+                }
+                logger.info("query finish. query_result: " + 
query_result.get())
+            }
+        }
+
+        def result = sql 'SELECT DATABASE()'
+        dbName = result[0][0]
+
+        sql """ DROP TABLE IF EXISTS ${testTable} """
+        sql """
+        create table ${testTable} (`k` int NOT NULL, `v` int NOT NULL)
+        UNIQUE KEY(`k`)
+        DISTRIBUTED BY HASH(`k`) BUCKETS 1
+        PROPERTIES (
+            "enable_unique_key_merge_on_write" = "true",
+            "replication_allocation" = "tag.location.default: 1",
+            "disable_auto_compaction" = "true"
+        );
+        """
+        // getBackendIpHttpPort(backendId_to_backendIP, 
backendId_to_backendHttpPort);
+        getBackendIpHttpAndBrpcPort(backendId_to_backendIP, 
backendId_to_backendHttpPort, backendId_to_backendBrpcPort);
+
+        def tablets = sql_return_maparray """ show tablets from ${testTable}; 
"""
+        logger.info("tablets: " + tablets)
+        assertEquals(1, tablets.size())
+        def tablet = tablets[0]
+        String tablet_id = tablet.TabletId
+
+        try {
+            Set<String> all_history_stale_rowsets = new HashSet<>();
+
+            // write some data
+            sql """ INSERT INTO ${testTable} VALUES (1,99); """
+            sql """ INSERT INTO ${testTable} VALUES (2,99); """
+            sql """ INSERT INTO ${testTable} VALUES (3,99); """
+            sql """ INSERT INTO ${testTable} VALUES (4,99); """
+            sql """ INSERT INTO ${testTable} VALUES (5,99); """
+            sql "sync"
+            order_qt_sql1 """ select * from ${testTable}; """
+
+            def tablet_status = getTabletStatus(tablet)
+
+            // after compaction, [1-6] versions will become stale rowsets
+            all_history_stale_rowsets.addAll(tablet_status["rowsets"])
+
+            // trigger compaction to generate base rowset
+            assertTrue(triggerCompaction(tablet).contains("Success"))
+            waitForCompaction(tablet)
+            tablet_status = getTabletStatus(tablet)
+            assertEquals(2, tablet_status["rowsets"].size())
+            all_history_stale_rowsets.addAll(tablet_status["rowsets"])
+
+            def ms_dm = getMsDeleteBitmapStatus(tablet)
+            assertEquals(0, ms_dm["delete_bitmap_count"])
+            order_qt_sql2 "select * from ${testTable}"
+
+            // write some data
+            sql """ INSERT INTO ${testTable} VALUES (1,99); """
+            sql """ INSERT INTO ${testTable} VALUES (2,99); """
+            sql """ INSERT INTO ${testTable} VALUES (3,99); """
+            sql """ INSERT INTO ${testTable} VALUES (4,99); """
+            sql """ INSERT INTO ${testTable} VALUES (5,99); """
+            sql """ sync """
+            order_qt_sql3 "select * from ${testTable}"
+            tablet_status = getTabletStatus(tablet)
+            assertEquals(7, tablet_status["rowsets"].size())
+            all_history_stale_rowsets.addAll(tablet_status["rowsets"])
+            ms_dm = getMsDeleteBitmapStatus(tablet)
+            assertEquals(5, ms_dm["delete_bitmap_count"])
+
+            // trigger and block one query
+            
GetDebugPoint().enableDebugPointForAllBEs("NewOlapScanner::_init_tablet_reader_params.block")
+            
GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowset")
+            
GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
+                    [tablet_id: "${tablet_id}", start_version: 7, end_version: 
11]);
+            Thread query_thread = new Thread(() -> query())
+            query_thread.start()
+            sleep(100)
+
+            // trigger compaction
+            // getTabletStatus(tablet)
+            assertTrue(triggerCompaction(tablet).contains("Success"))
+            waitForCompaction(tablet)
+            logger.info("compaction2 finished")
+            // check rowset count
+            tablet_status = getTabletStatus(tablet)
+            assertEquals(3, tablet_status["rowsets"].size())
+            all_history_stale_rowsets.addAll(tablet_status["rowsets"])
+            // check ms delete bitmap count
+            ms_dm = getMsDeleteBitmapStatus(tablet)
+            assertEquals(1, ms_dm["delete_bitmap_count"])
+            assertEquals(5, ms_dm["cardinality"])
+            // check local delete bitmap count
+            def local_dm = getLocalDeleteBitmapStatus(tablet)
+            assertEquals(5, local_dm["delete_bitmap_count"])
+            assertEquals(9, local_dm["cardinality"])
+
+            // wait for stale rowsets are deleted
+            boolean is_stale_rowsets_deleted = false
+            for (int i = 0; i < 100; i++) {
+                tablet_status = getTabletStatus(tablet)
+                if (tablet_status["stale_rowsets"].size() == 0) {
+                    is_stale_rowsets_deleted = true
+                    break
+                }
+                sleep(500)
+            }
+            assertTrue(is_stale_rowsets_deleted, "stale rowsets are not 
deleted")
+            // check to delete bitmap of stale rowsets is not deleted
+            sleep(1000)
+            def local_dm_status = getLocalDeleteBitmapStatus(tablet)
+            assertEquals(5, local_dm_status["delete_bitmap_count"])
+
+            // unnlock query and check no duplicated keys
+            
GetDebugPoint().disableDebugPointForAllBEs("NewOlapScanner::_init_tablet_reader_params.block")
+            query_thread.join()
+            assertTrue(query_result.get(), "find duplicated keys")
+
+            // check delete bitmap of compaction2 stale rowsets are deleted
+            // write some data
+            sql """ INSERT INTO ${testTable} VALUES (1,99); """
+            sql """ INSERT INTO ${testTable} VALUES (2,99); """
+            sql """ INSERT INTO ${testTable} VALUES (3,99); """
+            sql """ INSERT INTO ${testTable} VALUES (4,99); """
+            sql """ INSERT INTO ${testTable} VALUES (5,100); """
+            sql "sync"
+            order_qt_sql4 "select * from ${testTable}"
+            logger.info("order_qt_sql4 finished")
+            tablet_status = getTabletStatus(tablet)
+            all_history_stale_rowsets.addAll(tablet_status["rowsets"])
+            getMsDeleteBitmapStatus(tablet)
+            // trigger compaction
+            
GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
+                    [tablet_id: "${tablet_id}", start_version: 12, 
end_version: 16]);
+            tablet_status = getTabletStatus(tablet)
+            all_history_stale_rowsets.addAll(tablet_status["rowsets"])
+            assertTrue(triggerCompaction(tablet).contains("Success"))
+            waitForCompaction(tablet)
+            boolean is_compaction_finished = false
+            for (int i = 0; i < 100; i++) {
+                tablet_status = getTabletStatus(tablet)
+                all_history_stale_rowsets.addAll(tablet_status["rowsets"])
+                if (tablet_status["rowsets"].size() == 4) {
+                    logger.info("final tablet status: ${tablet_status}")
+                    is_compaction_finished = true
+                    break
+                }
+                sleep(500)
+            }
+            assertTrue(is_compaction_finished, "compaction is not finished")
+            logger.info("compaction3 finished")
+            // check ms delete bitmap count
+            ms_dm = getMsDeleteBitmapStatus(tablet)
+            assertEquals(2, ms_dm["delete_bitmap_count"])
+            assertEquals(10, ms_dm["cardinality"])
+            // check delete bitmap count
+            logger.info("check local delete bitmap is deleted")
+            boolean is_local_dm_deleted = false
+            for (int i = 0; i < 100; i++) {
+                local_dm_status = getLocalDeleteBitmapStatus(tablet)
+                if (local_dm_status["delete_bitmap_count"] == 2) {
+                    assertEquals(10, local_dm_status["cardinality"])
+                    is_local_dm_deleted = true
+                    break
+                }
+                sleep(500)
+            }
+            assertTrue(is_local_dm_deleted, "delete bitmap of compaction2 
stale rowsets are not deleted")
+            order_qt_sql5 "select * from ${testTable}"
+
+            tablet_status = getTabletStatus(tablet)
+            def final_rowsets = tablet_status["rowsets"]
+
+            // sleep for vacuum_stale_rowsets_interval_s=10 seconds to wait 
for unused rowsets are deleted
+            sleep(21000)
+
+            def be_host = backendId_to_backendIP[tablet.BackendId]
+            def be_http_port = backendId_to_backendHttpPort[tablet.BackendId]
+            for (int i = 0; i < all_history_stale_rowsets.size(); i++) {
+                def rowsetStr = all_history_stale_rowsets[i]
+                // [12-12] 1 DATA NONOVERLAPPING 
02000000000000124843c92c13625daa8296c20957119893 1011.00 B
+                def start_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[0].toInteger()
+                def end_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[1].toInteger()
+                def rowset_id = rowsetStr.split(" ")[4]
+                if (start_version == 0 || start_version != end_version) {
+                    continue
+                }
+
+                logger.info("rowset ${i}, start: ${start_version}, end: 
${end_version}, id: ${rowset_id}")
+                def data = 
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat";,
 true)
+                logger.info("file cache data: ${data}")
+                assertTrue(data.size() == 0)
+            }
+
+            for (int i = 0; i < final_rowsets.size(); i++) {
+                def rowsetStr = final_rowsets[i]
+                def start_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[0].toInteger()
+                def end_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[1].toInteger()
+                def rowset_id = rowsetStr.split(" ")[4]
+                if (start_version == 0) {
+                    continue
+                }
+
+                logger.info("final rowset ${i}, start: ${start_version}, end: 
${end_version}, id: ${rowset_id}")
+                def data = 
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat";,
 true)
+                logger.info("file cache data: ${data}")
+                assertTrue(data.size() > 0)
+            }
+
+            def (code_0, out_0, err_0) = curl("GET", 
"http://${be_host}:${backendId_to_backendBrpcPort[tablet.BackendId]}/vars/unused_rowsets_count";)
+            logger.info("out_0: ${out_0}")
+            def unusedRowsetsCount = 
out_0.trim().split(":")[1].trim().toInteger()
+            assertEquals(0, unusedRowsetsCount)
+
+        } finally {
+            GetDebugPoint().clearDebugPointsForAllBEs()
+        }
+    }
+}
diff --git 
a/regression-test/suites/compaction/test_filecache_compaction_multisegments_and_read_stale_cloud_docker.groovy
 
b/regression-test/suites/compaction/test_filecache_compaction_multisegments_and_read_stale_cloud_docker.groovy
new file mode 100644
index 00000000000..4b661316090
--- /dev/null
+++ 
b/regression-test/suites/compaction/test_filecache_compaction_multisegments_and_read_stale_cloud_docker.groovy
@@ -0,0 +1,347 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.atomic.AtomicBoolean
+import org.apache.doris.regression.suite.ClusterOptions
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("test_filecache_compaction_multisegments_and_read_stale_cloud_docker", 
"docker") {
+    def options = new ClusterOptions()
+    options.cloudMode = true
+    options.setFeNum(1)
+    options.setBeNum(1)
+    options.enableDebugPoints()
+    options.feConfigs.add("enable_workload_group=false")
+    options.beConfigs.add('compaction_promotion_version_count=5')
+    options.beConfigs.add('tablet_rowset_stale_sweep_time_sec=0')
+    options.beConfigs.add('vacuum_stale_rowsets_interval_s=10')
+    options.beConfigs.add('enable_java_support=false')
+    options.beConfigs.add('doris_scanner_row_bytes=1')
+
+    def dbName = ""
+    def testTable = "test_filecache_multisegments_and_read_stale"
+    def backendId_to_backendIP = [:]
+    def backendId_to_backendHttpPort = [:]
+    def backendId_to_backendBrpcPort = [:]
+
+    def triggerCompaction = { tablet ->
+        def compact_type = "cumulative"
+        String tablet_id = tablet.TabletId
+        String trigger_backend_id = tablet.BackendId
+        def be_host = backendId_to_backendIP[trigger_backend_id]
+        def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+        if (compact_type == "cumulative") {
+            def (code_1, out_1, err_1) = be_run_cumulative_compaction(be_host, 
be_http_port, tablet_id)
+            logger.info("Run compaction: code=" + code_1 + ", out=" + out_1 + 
", err=" + err_1)
+            assertEquals(code_1, 0)
+            return out_1
+        } else if (compact_type == "full") {
+            def (code_2, out_2, err_2) = be_run_full_compaction(be_host, 
be_http_port, tablet_id)
+            logger.info("Run compaction: code=" + code_2 + ", out=" + out_2 + 
", err=" + err_2)
+            assertEquals(code_2, 0)
+            return out_2
+        } else {
+            assertFalse(True)
+        }
+    }
+
+    def getTabletStatus = { tablet, rowsetIndex, lastRowsetSegmentNum, 
enableAssert = false, outputRowsets = null ->
+        String compactionUrl = tablet["CompactionStatus"]
+        def (code, out, err) = curl("GET", compactionUrl)
+        logger.info("Show tablets status: code=" + code + ", out=" + out + ", 
err=" + err)
+        assertEquals(code, 0)
+        def tabletJson = parseJson(out.trim())
+        assert tabletJson.rowsets instanceof List
+        if (outputRowsets != null) {
+            outputRowsets.addAll(tabletJson.rowsets)
+        }
+
+        assertTrue(tabletJson.rowsets.size() >= rowsetIndex)
+        def rowset = tabletJson.rowsets.get(rowsetIndex - 1)
+        logger.info("rowset: ${rowset}")
+        int start_index = rowset.indexOf("]")
+        int end_index = rowset.indexOf("DATA")
+        def segmentNumStr = rowset.substring(start_index + 1, end_index).trim()
+        logger.info("segmentNumStr: ${segmentNumStr}")
+        if (enableAssert) {
+            assertEquals(lastRowsetSegmentNum, Integer.parseInt(segmentNumStr))
+        } else {
+            return lastRowsetSegmentNum == Integer.parseInt(segmentNumStr);
+        }
+    }
+
+    def waitForCompaction = { tablet ->
+        String tablet_id = tablet.TabletId
+        String trigger_backend_id = tablet.BackendId
+        def be_host = backendId_to_backendIP[trigger_backend_id]
+        def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+        def running = true
+        do {
+            Thread.sleep(1000)
+            StringBuilder sb = new StringBuilder();
+            sb.append("curl -X GET http://${be_host}:${be_http_port}";)
+            sb.append("/api/compaction/run_status?tablet_id=")
+            sb.append(tablet_id)
+
+            String command = sb.toString()
+            logger.info(command)
+            def process = command.execute()
+            def code = process.waitFor()
+            def out = process.getText()
+            logger.info("Get compaction status: code=" + code + ", out=" + out)
+            assertEquals(code, 0)
+            def compactionStatus = parseJson(out.trim())
+            assertEquals("success", compactionStatus.status.toLowerCase())
+            running = compactionStatus.run_status
+        } while (running)
+    }
+
+    def getLocalDeleteBitmapStatus = { tablet ->
+        String tablet_id = tablet.TabletId
+        String trigger_backend_id = tablet.BackendId
+        def be_host = backendId_to_backendIP[trigger_backend_id]
+        def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+        boolean running = true
+        StringBuilder sb = new StringBuilder();
+        sb.append("curl -X GET http://${be_host}:${be_http_port}";)
+        sb.append("/api/delete_bitmap/count_local?verbose=true&tablet_id=")
+        sb.append(tablet_id)
+
+        String command = sb.toString()
+        logger.info(command)
+        def process = command.execute()
+        def code = process.waitFor()
+        def out = process.getText()
+        logger.info("Get local delete bitmap count status:  =" + code + ", 
out=" + out)
+        assertEquals(code, 0)
+        def deleteBitmapStatus = parseJson(out.trim())
+        return deleteBitmapStatus
+    }
+
+    def getMsDeleteBitmapStatus = { tablet ->
+        String tablet_id = tablet.TabletId
+        String trigger_backend_id = tablet.BackendId
+        def be_host = backendId_to_backendIP[trigger_backend_id]
+        def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+        boolean running = true
+        StringBuilder sb = new StringBuilder();
+        sb.append("curl -X GET http://${be_host}:${be_http_port}";)
+        sb.append("/api/delete_bitmap/count_ms?verbose=true&tablet_id=")
+        sb.append(tablet_id)
+
+        String command = sb.toString()
+        logger.info(command)
+        def process = command.execute()
+        def code = process.waitFor()
+        def out = process.getText()
+        logger.info("Get ms delete bitmap count status:  =" + code + ", out=" 
+ out)
+        assertEquals(code, 0)
+        def deleteBitmapStatus = parseJson(out.trim())
+        return deleteBitmapStatus
+    }
+
+    docker(options) {
+        def fes = sql_return_maparray "show frontends"
+        logger.info("frontends: ${fes}")
+        def url = "jdbc:mysql://${fes[0].Host}:${fes[0].QueryPort}/"
+        logger.info("url: " + url)
+        AtomicBoolean query_result = new AtomicBoolean(true)
+        def query = {
+            connect( context.config.jdbcUser,  context.config.jdbcPassword,  
url) {
+                logger.info("query start")
+                def results = sql_return_maparray """ select * from 
${dbName}.${testTable}; """
+                logger.info("query result: " + results)
+                Set<String> keys = new HashSet<>()
+                for (final def result in results) {
+                    if (keys.contains(result.k)) {
+                        logger.info("find duplicate key: " + result.k)
+                        query_result.set(false)
+                        break
+                    }
+                    keys.add(result.k)
+                }
+                logger.info("query finish. query_result: " + 
query_result.get())
+            }
+        }
+
+        def result = sql 'SELECT DATABASE()'
+        dbName = result[0][0]
+
+        sql """ DROP TABLE IF EXISTS ${testTable} """
+        sql """ CREATE TABLE IF NOT EXISTS ${testTable} (
+            `k1` int(11) NULL, 
+            `k2` int(11) NULL, 
+            `v3` int(11) NULL,
+            `v4` int(11) NULL
+        ) unique KEY(`k1`, `k2`) 
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1",
+            "disable_auto_compaction" = "true"
+            );
+        """
+
+        // getBackendIpHttpPort(backendId_to_backendIP, 
backendId_to_backendHttpPort);
+        getBackendIpHttpAndBrpcPort(backendId_to_backendIP, 
backendId_to_backendHttpPort, backendId_to_backendBrpcPort);
+
+        def tablets = sql_return_maparray """ show tablets from ${testTable}; 
"""
+        logger.info("tablets: " + tablets)
+        assertEquals(1, tablets.size())
+        def tablet = tablets[0]
+        String tablet_id = tablet.TabletId
+        def backend_id = tablet.BackendId
+
+        GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush")
+        
GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowset")
+        
GetDebugPoint().enableDebugPointForAllBEs("Tablet.delete_expired_stale_rowset.start_delete_unused_rowset")
+
+        Set<String> all_history_stale_rowsets = new HashSet<>();
+        try {
+            // load 1
+            streamLoad {
+                table "${testTable}"
+                set 'column_separator', ','
+                set 'compress_type', 'GZ'
+                file 'test_schema_change_add_key_column.csv.gz'
+                time 10000 // limit inflight 10s
+
+                check { res, exception, startTime, endTime ->
+                    if (exception != null) {
+                        throw exception
+                    }
+                    def json = parseJson(res)
+                    assertEquals("success", json.Status.toLowerCase())
+                    assertEquals(8192, json.NumberTotalRows)
+                    assertEquals(0, json.NumberFilteredRows)
+                }
+            }
+            sql "sync"
+            def rowCount1 = sql """ select count() from ${testTable}; """
+            logger.info("rowCount1: ${rowCount1}")
+            // check generate 3 segments
+            getTabletStatus(tablet, 2, 3, true, all_history_stale_rowsets)
+
+            // trigger compaction
+            
GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
+                    [tablet_id: "${tablet.TabletId}", start_version: 2, 
end_version: 2])
+            def (code, out, err) = 
be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+            logger.info("Run compaction: code=" + code + ", out=" + out + ", 
err=" + err)
+            assertEquals(code, 0)
+            def compactJson = parseJson(out.trim())
+            logger.info("compact json: " + compactJson)
+            // check generate 1 segments
+            for (int i = 0; i < 20; i++) {
+                if (getTabletStatus(tablet, 2, 1, false, 
all_history_stale_rowsets)) {
+                    break
+                }
+                sleep(100)
+            }
+            getTabletStatus(tablet, 2, 1, false, all_history_stale_rowsets)
+            sql """ select * from ${testTable} limit 1; """
+
+            // load 2
+            streamLoad {
+                table "${testTable}"
+                set 'column_separator', ','
+                set 'compress_type', 'GZ'
+                file 'test_schema_change_add_key_column1.csv.gz'
+                time 10000 // limit inflight 10s
+
+                check { res, exception, startTime, endTime ->
+                    if (exception != null) {
+                        throw exception
+                    }
+                    def json = parseJson(res)
+                    assertEquals("success", json.Status.toLowerCase())
+                    assertEquals(20480, json.NumberTotalRows)
+                    assertEquals(0, json.NumberFilteredRows)
+                }
+            }
+            sql "sync"
+            def rowCount2 = sql """ select count() from ${testTable}; """
+            logger.info("rowCount2: ${rowCount2}")
+            // check generate 3 segments
+            getTabletStatus(tablet, 3, 6, false, all_history_stale_rowsets)
+            def local_dm = getLocalDeleteBitmapStatus(tablet)
+            logger.info("local delete bitmap 1: " + local_dm)
+
+            // trigger compaction for load 2
+            
GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
+                    [tablet_id: "${tablet.TabletId}", start_version: 3, 
end_version: 3])
+            (code, out, err) = 
be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+            logger.info("Run compaction: code=" + code + ", out=" + out + ", 
err=" + err)
+            assertEquals(code, 0)
+            compactJson = parseJson(out.trim())
+            logger.info("compact json: " + compactJson)
+            waitForCompaction(tablet)
+            // check generate 1 segments
+            for (int i = 0; i < 20; i++) {
+                if (getTabletStatus(tablet, 3, 1, false, 
all_history_stale_rowsets)) {
+                    break
+                }
+                sleep(100)
+            }
+            getTabletStatus(tablet, 3, 1, false, all_history_stale_rowsets)
+
+            
GetDebugPoint().enableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.vacuum_stale_rowsets")
 // cloud
+            
GetDebugPoint().enableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.start_delete_unused_rowset")
 // local
+            local_dm = getLocalDeleteBitmapStatus(tablet)
+            logger.info("local delete bitmap 2: " + local_dm)
+            assertEquals(1, local_dm["delete_bitmap_count"])
+
+
+            // sleep for vacuum_stale_rowsets_interval_s=10 seconds to wait 
for unused rowsets are deleted
+            sleep(21000)
+
+            def be_host = backendId_to_backendIP[tablet.BackendId]
+            def be_http_port = backendId_to_backendHttpPort[tablet.BackendId]
+            logger.info("be_host: ${be_host}, be_http_port: ${be_http_port}, 
BrpcPort: ${backendId_to_backendBrpcPort[tablet.BackendId]}")
+
+            for (int i = 0; i < all_history_stale_rowsets.size(); i++) {
+                def rowsetStr = all_history_stale_rowsets[i]
+                // [12-12] 1 DATA NONOVERLAPPING 
02000000000000124843c92c13625daa8296c20957119893 1011.00 B
+                def start_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[0].toInteger()
+                def end_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[1].toInteger()
+                def rowset_id = rowsetStr.split(" ")[4]
+                if (start_version == 0) {
+                    continue
+                }
+
+                int start_index = rowsetStr.indexOf("]")
+                int end_index = rowsetStr.indexOf("DATA")
+                def segmentNum = rowsetStr.substring(start_index + 1, 
end_index).trim().toInteger()
+
+                logger.info("rowset ${i}, start: ${start_version}, end: 
${end_version}, id: ${rowset_id}, segment: ${segmentNum}")
+                def data = 
Http.GET("http://${be_host}:${be_http_port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat";,
 true)
+                logger.info("file cache data: ${data}")
+                if (segmentNum <= 1) {
+                    assertTrue(data.size() > 0)
+                } else {
+                    assertTrue(data.size() == 0)
+                }
+            }
+
+            def (code_0, out_0, err_0) = curl("GET", 
"http://${be_host}:${backendId_to_backendBrpcPort[tablet.BackendId]}/vars/unused_rowsets_count";)
+            logger.info("out_0: ${out_0}")
+            def unusedRowsetsCount = 
out_0.trim().split(":")[1].trim().toInteger()
+            assertEquals(0, unusedRowsetsCount)
+        } finally {
+            GetDebugPoint().clearDebugPointsForAllBEs()
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to