This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bf576b15e4 [improve](cloud) cloud reduce get_tablet_stats rpc to 
meta_service (#60543)
0bf576b15e4 is described below

commit 0bf576b15e4d931a7dc5e334a22f03b7347bad13
Author: meiyi <[email protected]>
AuthorDate: Wed Mar 18 20:36:22 2026 +0800

    [improve](cloud) cloud reduce get_tablet_stats rpc to meta_service (#60543)
---
 be/src/cloud/cloud_meta_mgr.cpp                    |  25 +-
 .../main/java/org/apache/doris/common/Config.java  |   9 +-
 .../org/apache/doris/alter/CloudRollupJobV2.java   |  11 +
 .../apache/doris/alter/CloudSchemaChangeJobV2.java |  10 +
 .../apache/doris/catalog/CloudTabletStatMgr.java   | 280 +++++++++++++++++++--
 .../apache/doris/cloud/catalog/CloudReplica.java   |  18 +-
 .../transaction/CloudGlobalTransactionMgr.java     |  31 ++-
 .../java/org/apache/doris/common/ClientPool.java   |   3 +
 .../apache/doris/common/proc/TabletsProcDir.java   |  17 ++
 .../java/org/apache/doris/qe/SessionVariable.java  |   3 +
 .../apache/doris/service/FrontendServiceImpl.java  |  56 ++++-
 .../doris/transaction/GlobalTransactionMgr.java    |   3 +-
 .../transaction/GlobalTransactionMgrIface.java     |   3 +-
 gensrc/thrift/FrontendService.thrift               |   8 +
 14 files changed, 427 insertions(+), 50 deletions(-)

diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index 04a50c75652..ef40c94048b 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -1419,15 +1419,18 @@ Status CloudMetaMgr::update_tmp_rowset(const 
RowsetMeta& rs_meta) {
 
 // async send TableStats(in res) to FE coz we are in streamload ctx, response 
to the user ASAP
 static void send_stats_to_fe_async(const int64_t db_id, const int64_t txn_id,
-                                   const std::string& label, 
CommitTxnResponse& res) {
+                                   const std::string& label, 
CommitTxnResponse& res,
+                                   const std::vector<int64_t>& tablet_ids) {
     std::string protobufBytes;
-    res.SerializeToString(&protobufBytes);
+    if (txn_id != -1) {
+        res.SerializeToString(&protobufBytes);
+    }
     auto st = 
ExecEnv::GetInstance()->send_table_stats_thread_pool()->submit_func(
-            [db_id, txn_id, label, protobufBytes]() -> Status {
+            [db_id, txn_id, label, protobufBytes, tablet_ids]() -> Status {
                 TReportCommitTxnResultRequest request;
                 TStatus result;
 
-                if (protobufBytes.length() <= 0) {
+                if (txn_id != -1 && protobufBytes.length() <= 0) {
                     LOG(WARNING) << "protobufBytes: " << 
protobufBytes.length();
                     return Status::OK(); // nobody cares the return status
                 }
@@ -1436,6 +1439,7 @@ static void send_stats_to_fe_async(const int64_t db_id, 
const int64_t txn_id,
                 request.__set_txnId(txn_id);
                 request.__set_label(label);
                 request.__set_payload(protobufBytes);
+                request.__set_tabletIds(tablet_ids);
 
                 Status status;
                 int64_t duration_ns = 0;
@@ -1489,7 +1493,11 @@ Status CloudMetaMgr::commit_txn(const StreamLoadContext& 
ctx, bool is_2pc) {
     auto st = retry_rpc("commit txn", req, &res, 
&MetaService_Stub::commit_txn);
 
     if (st.ok()) {
-        send_stats_to_fe_async(ctx.db_id, ctx.txn_id, ctx.label, res);
+        std::vector<int64_t> tablet_ids;
+        for (auto& commit_info : ctx.commit_infos) {
+            tablet_ids.emplace_back(commit_info.tabletId);
+        }
+        send_stats_to_fe_async(ctx.db_id, ctx.txn_id, ctx.label, res, 
tablet_ids);
     }
 
     return st;
@@ -1648,6 +1656,13 @@ Status CloudMetaMgr::commit_tablet_job(const 
TabletJobInfoPB& job, FinishTabletJ
         return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>(
                 "txn conflict when commit tablet job {}", 
job.ShortDebugString());
     }
+
+    if (st.ok() && !job.compaction().empty() && job.has_idx()) {
+        CommitTxnResponse commit_txn_resp;
+        std::vector<int64_t> tablet_ids = {job.idx().tablet_id()};
+        send_stats_to_fe_async(-1, -1, "", commit_txn_resp, tablet_ids);
+    }
+
     return st;
 }
 
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index ed5539d9388..f9be5d87559 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3276,7 +3276,6 @@ public class Config extends ConfigBase {
                             + "to other BEs in cloud mode."})
     public static int rehash_tablet_after_be_dead_seconds = 3600;
 
-
     @ConfField(mutable = true, description = {
             "Whether to enable the automatic start-stop feature in cloud 
model, default is true."})
     public static boolean enable_auto_start_for_cloud_cluster = true;
@@ -3286,6 +3285,14 @@ public class Config extends ConfigBase {
                     + "model is set to 300 times, which is approximately 5 
minutes by default."})
     public static int auto_start_wait_to_resume_times = 300;
 
+    @ConfField(description = {
+            "Maximal concurrent num of master FE sync tablet stats task to 
observers and followers in cloud mode."})
+    public static int cloud_sync_tablet_stats_task_threads_num = 4;
+
+    @ConfField(mutable = true, description = {"Version of getting tablet stats 
in cloud mode. "
+            + "Version 1: get all tablets; Version 2: get active and interval 
expired tablets"})
+    public static int cloud_get_tablet_stats_version = 2;
+
     @ConfField(description = {"Maximum concurrent number of get tablet stat 
jobs."})
     public static int max_get_tablet_stat_task_threads_num = 4;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
index 8c6a306aedb..e3192f5e491 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.alter;
 
+import org.apache.doris.catalog.CloudTabletStatMgr;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
@@ -54,6 +55,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 public class CloudRollupJobV2 extends RollupJobV2 {
     private static final Logger LOG = 
LogManager.getLogger(CloudRollupJobV2.class);
@@ -119,6 +121,15 @@ public class CloudRollupJobV2 extends RollupJobV2 {
 
         LOG.info("onCreateRollupReplicaDone finished, dbId:{}, tableId:{}, 
jobId:{}, rollupIndexList:{}",
                 dbId, tableId, jobId, rollupIndexList);
+
+        List<Long> tabletIds = partitionIdToRollupIndex.values().stream()
+                .flatMap(rollupIndex -> 
rollupIndex.getTablets().stream()).map(Tablet::getId)
+                .collect(Collectors.toList());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("force sync tablet stats for table: {}, index: {}, 
tabletNum: {}, tabletIds: {}", tableId,
+                    rollupIndexId, tabletIds.size(), tabletIds);
+        }
+        CloudTabletStatMgr.getInstance().addActiveTablets(tabletIds);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
index 9387a93aaa9..cd1b90fb923 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.alter;
 
+import org.apache.doris.catalog.CloudTabletStatMgr;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
@@ -91,6 +92,15 @@ public class CloudSchemaChangeJobV2 extends 
SchemaChangeJobV2 {
         }
         LOG.info("commitShadowIndex finished, dbId:{}, tableId:{}, jobId:{}, 
shadowIdxList:{}",
                 dbId, tableId, jobId, shadowIdxList);
+
+        List<Long> tabletIds = partitionIndexMap.cellSet().stream()
+                .flatMap(cell -> 
cell.getValue().getTablets().stream().map(Tablet::getId))
+                .collect(Collectors.toList());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("force sync tablet stats for table: {}, tabletNum: {}, 
tabletIds: {}", tableId,
+                    tabletIds.size(), tabletIds);
+        }
+        CloudTabletStatMgr.getInstance().addActiveTablets(tabletIds);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
index fd500fac1a0..d6f0f9516a6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
@@ -18,34 +18,56 @@
 package org.apache.doris.catalog;
 
 import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.cloud.catalog.CloudReplica;
+import org.apache.doris.cloud.catalog.CloudTablet;
 import org.apache.doris.cloud.proto.Cloud.GetTabletStatsRequest;
 import org.apache.doris.cloud.proto.Cloud.GetTabletStatsResponse;
 import org.apache.doris.cloud.proto.Cloud.MetaServiceCode;
 import org.apache.doris.cloud.proto.Cloud.TabletIndexPB;
 import org.apache.doris.cloud.proto.Cloud.TabletStatsPB;
 import org.apache.doris.cloud.rpc.MetaServiceProxy;
+import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.service.FrontendOptions;
-
+import org.apache.doris.system.Frontend;
+import org.apache.doris.system.SystemInfoService.HostInfo;
+import org.apache.doris.thrift.FrontendService;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatus;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TSyncCloudTabletStatsRequest;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /*
  * CloudTabletStatMgr is for collecting tablet(replica) statistics from 
backends.
- * Each FE will collect by itself.
+ * Config.cloud_get_tablet_stats_version:
+ * 1: Each FE will collect by itself.
+ * 2: Master FE collects active tablet stats and pushes to followers and 
observers,
+ *    and each FE will collect tablet stats by interval ladder.
  */
 public class CloudTabletStatMgr extends MasterDaemon {
     private static final Logger LOG = 
LogManager.getLogger(CloudTabletStatMgr.class);
@@ -55,7 +77,30 @@ public class CloudTabletStatMgr extends MasterDaemon {
     private volatile List<OlapTable.Statistics> cloudTableStatsList = new 
ArrayList<>();
 
     private static final ExecutorService GET_TABLET_STATS_THREAD_POOL = 
Executors.newFixedThreadPool(
-            Config.max_get_tablet_stat_task_threads_num);
+            Config.max_get_tablet_stat_task_threads_num,
+            new 
ThreadFactoryBuilder().setNameFormat("get-tablet-stats-%d").setDaemon(true).build());
+    // Master: send tablet stats to followers and observers
+    // Follower and observer: receive tablet stats from master
+    private static final ExecutorService SYNC_TABLET_STATS_THREAD_POOL = 
Executors.newFixedThreadPool(
+            Config.cloud_sync_tablet_stats_task_threads_num,
+            new 
ThreadFactoryBuilder().setNameFormat("sync-tablet-stats-%d").setDaemon(true).build());
+    private Set<Long> activeTablets = ConcurrentHashMap.newKeySet();
+
+    /**
+     * Interval ladder in milliseconds: 1m, 5m, 10m, 30m, 2h, 6h, 12h, 3d, 
infinite.
+     * Tablets with changing stats stay at lower intervals; stable tablets 
move to higher intervals.
+     */
+    private static final long[] DEFAULT_INTERVAL_LADDER_MS = {
+            TimeUnit.MINUTES.toMillis(1),    // 1 minute
+            TimeUnit.MINUTES.toMillis(5),    // 5 minutes
+            TimeUnit.MINUTES.toMillis(10),   // 10 minutes
+            TimeUnit.MINUTES.toMillis(30),   // 30 minutes
+            TimeUnit.HOURS.toMillis(2),      // 2 hours
+            TimeUnit.HOURS.toMillis(6),      // 6 hours
+            TimeUnit.HOURS.toMillis(12),     // 12 hours
+            TimeUnit.DAYS.toMillis(3),       // 3 days
+            Long.MAX_VALUE                   // infinite (never auto-fetch)
+    };
 
     public CloudTabletStatMgr() {
         super("cloud tablet stat mgr", 
Config.tablet_stat_update_interval_second * 1000);
@@ -63,12 +108,50 @@ public class CloudTabletStatMgr extends MasterDaemon {
 
     @Override
     protected void runAfterCatalogReady() {
-        LOG.info("cloud tablet stat begin");
-        List<Long> dbIds = getAllTabletStats();
+        int version = Config.cloud_get_tablet_stats_version;
+        LOG.info("cloud tablet stat begin with version: {}", version);
+
+        // version1: get all tablet stats
+        if (version == 1) {
+            this.activeTablets.clear();
+            List<Long> dbIds = getAllTabletStats(null);
+            updateStatInfo(dbIds);
+            return;
+        }
+
+        // version2: get stats for active tablets
+        Set<Long> copiedTablets = new HashSet<>(activeTablets);
+        activeTablets.removeAll(copiedTablets);
+        getActiveTabletStats(copiedTablets);
+
+        // get stats by interval
+        List<Long> dbIds = getAllTabletStats(cloudTablet -> {
+            if (copiedTablets.contains(cloudTablet.getId())) {
+                return false;
+            }
+            List<Replica> replicas = 
Env.getCurrentInvertedIndex().getReplicas(cloudTablet.getId());
+            if (replicas == null || replicas.isEmpty()) {
+                return false;
+            }
+            CloudReplica cloudReplica = (CloudReplica) replicas.get(0);
+            int index = cloudReplica.getStatsIntervalIndex();
+            if (index >= DEFAULT_INTERVAL_LADDER_MS.length) {
+                LOG.warn("get tablet stats interval index out of range, 
tabletId: {}, index: {}",
+                        cloudTablet.getId(), index);
+                index = DEFAULT_INTERVAL_LADDER_MS.length - 1;
+            }
+            long interval = DEFAULT_INTERVAL_LADDER_MS[index];
+            if (interval == Long.MAX_VALUE
+                    || System.currentTimeMillis() - 
cloudReplica.getLastGetTabletStatsTime() < interval) {
+                return false;
+            }
+            return true;
+        });
         updateStatInfo(dbIds);
     }
 
-    private List<Long> getAllTabletStats() {
+    private List<Long> getAllTabletStats(Function<CloudTablet, Boolean> 
filter) {
+        long getStatsTabletNum = 0;
         long start = System.currentTimeMillis();
         List<Future<Void>> futures = new ArrayList<>();
         GetTabletStatsRequest.Builder builder =
@@ -91,17 +174,21 @@ public class CloudTabletStatMgr extends MasterDaemon {
                     OlapTable tbl = (OlapTable) table;
                     for (Partition partition : tbl.getAllPartitions()) {
                         for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
-                            for (Long tabletId : index.getTabletIdsInOrder()) {
+                            for (Tablet tablet : index.getTablets()) {
+                                if (filter != null && 
!filter.apply((CloudTablet) tablet)) {
+                                    continue;
+                                }
+                                getStatsTabletNum++;
                                 TabletIndexPB.Builder tabletBuilder = 
TabletIndexPB.newBuilder();
                                 tabletBuilder.setDbId(dbId);
                                 tabletBuilder.setTableId(table.getId());
                                 tabletBuilder.setIndexId(index.getId());
                                 
tabletBuilder.setPartitionId(partition.getId());
-                                tabletBuilder.setTabletId(tabletId);
+                                tabletBuilder.setTabletId(tablet.getId());
                                 builder.addTabletIdx(tabletBuilder);
 
                                 if (builder.getTabletIdxCount() >= 
Config.get_tablet_stat_batch_size) {
-                                    
futures.add(submitGetTabletStatsTask(builder.build()));
+                                    
futures.add(submitGetTabletStatsTask(builder.build(), filter == null));
                                     builder = 
GetTabletStatsRequest.newBuilder()
                                             
.setRequestIp(FrontendOptions.getLocalHostAddressCached());
                                 }
@@ -115,7 +202,7 @@ public class CloudTabletStatMgr extends MasterDaemon {
         } // end for dbs
 
         if (builder.getTabletIdxCount() > 0) {
-            futures.add(submitGetTabletStatsTask(builder.build()));
+            futures.add(submitGetTabletStatsTask(builder.build(), filter == 
null));
         }
 
         try {
@@ -126,16 +213,62 @@ public class CloudTabletStatMgr extends MasterDaemon {
             LOG.error("Error waiting for get tablet stats tasks to complete", 
e);
         }
 
-        LOG.info("finished to get tablet stat of all backends. cost: {} ms",
-                (System.currentTimeMillis() - start));
+        LOG.info("finished to get tablet stats. getStatsTabletNum: {}, cost: 
{} ms",
+                getStatsTabletNum, (System.currentTimeMillis() - start));
         return dbIds;
     }
 
-    private Future<Void> submitGetTabletStatsTask(GetTabletStatsRequest req) {
+    private void getActiveTabletStats(Set<Long> tablets) {
+        List<Long> tabletIds = new ArrayList<>(tablets);
+        Collections.sort(tabletIds);
+        List<TabletMeta> tabletMetas = 
Env.getCurrentInvertedIndex().getTabletMetaList(tabletIds);
+        long start = System.currentTimeMillis();
+        List<Future<Void>> futures = new ArrayList<>();
+        GetTabletStatsRequest.Builder builder =
+                
GetTabletStatsRequest.newBuilder().setRequestIp(FrontendOptions.getLocalHostAddressCached());
+        long activeTabletNum = 0;
+        for (int i = 0; i < tabletIds.size(); i++) {
+            TabletIndexPB tabletIndexPB = getTabletIndexPB(tabletIds.get(i), 
tabletMetas.get(i));
+            if (tabletIndexPB == null) {
+                continue;
+            }
+            activeTabletNum++;
+            builder.addTabletIdx(tabletIndexPB);
+            if (builder.getTabletIdxCount() >= 
Config.get_tablet_stat_batch_size) {
+                futures.add(submitGetTabletStatsTask(builder.build(), true));
+                builder = GetTabletStatsRequest.newBuilder()
+                        
.setRequestIp(FrontendOptions.getLocalHostAddressCached());
+            }
+        }
+        if (builder.getTabletIdxCount() > 0) {
+            futures.add(submitGetTabletStatsTask(builder.build(), true));
+        }
+
+        try {
+            for (Future<Void> future : futures) {
+                future.get();
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Error waiting for get tablet stats tasks to complete", 
e);
+        }
+        LOG.info("finished to get {} active tablets stats, cost {}ms", 
activeTabletNum,
+                System.currentTimeMillis() - start);
+    }
+
+    private TabletIndexPB getTabletIndexPB(long tabletId, TabletMeta 
tabletMeta) {
+        if (tabletMeta == null || tabletMeta == 
TabletInvertedIndex.NOT_EXIST_TABLET_META) {
+            return null;
+        }
+        return 
TabletIndexPB.newBuilder().setDbId(tabletMeta.getDbId()).setTableId(tabletMeta.getTableId())
+                
.setIndexId(tabletMeta.getIndexId()).setPartitionId(tabletMeta.getPartitionId()).setTabletId(tabletId)
+                .build();
+    }
+
+    private Future<Void> submitGetTabletStatsTask(GetTabletStatsRequest req, 
boolean activeUpdate) {
         return GET_TABLET_STATS_THREAD_POOL.submit(() -> {
             GetTabletStatsResponse resp;
             try {
-                resp = getTabletStats(req);
+                resp = getTabletStatsFromMs(req);
             } catch (RpcException e) {
                 LOG.warn("get tablet stats exception:", e);
                 return null;
@@ -144,15 +277,7 @@ public class CloudTabletStatMgr extends MasterDaemon {
                 LOG.warn("get tablet stats return failed.");
                 return null;
             }
-            if (LOG.isDebugEnabled()) {
-                int i = 0;
-                for (TabletIndexPB idx : req.getTabletIdxList()) {
-                    LOG.debug("db_id: {} table_id: {} index_id: {} tablet_id: 
{} size: {}",
-                            idx.getDbId(), idx.getTableId(), idx.getIndexId(),
-                            idx.getTabletId(), 
resp.getTabletStats(i++).getDataSize());
-                }
-            }
-            updateTabletStat(resp);
+            updateTabletStat(resp, activeUpdate);
             return null;
         });
     }
@@ -286,7 +411,7 @@ public class CloudTabletStatMgr extends MasterDaemon {
                             tableTotalLocalIndexSize, 
tableTotalLocalSegmentSize, 0L, 0L);
                     olapTable.setStatistics(tableStats);
                     LOG.debug("finished to set row num for table: {} in 
database: {}",
-                             table.getName(), db.getFullName());
+                            table.getName(), db.getFullName());
                 } finally {
                     table.readUnlock();
                 }
@@ -338,7 +463,7 @@ public class CloudTabletStatMgr extends MasterDaemon {
                 (System.currentTimeMillis() - start));
     }
 
-    private void updateTabletStat(GetTabletStatsResponse response) {
+    private void updateTabletStat(GetTabletStatsResponse response, boolean 
activeUpdate) {
         TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
         for (TabletStatsPB stat : response.getTabletStatsList()) {
             List<Replica> replicas = 
invertedIndex.getReplicasByTabletId(stat.getIdx().getTabletId());
@@ -346,16 +471,45 @@ public class CloudTabletStatMgr extends MasterDaemon {
                 continue;
             }
             Replica replica = replicas.get(0);
+            boolean statsChanged = replica.getDataSize() != stat.getDataSize()
+                    || replica.getRowsetCount() != stat.getNumRowsets()
+                    || replica.getSegmentCount() != stat.getNumSegments()
+                    || replica.getRowCount() != stat.getNumRows()
+                    || replica.getLocalInvertedIndexSize() != 
stat.getIndexSize()
+                    || replica.getLocalSegmentSize() != stat.getSegmentSize();
             replica.setDataSize(stat.getDataSize());
             replica.setRowsetCount(stat.getNumRowsets());
             replica.setSegmentCount(stat.getNumSegments());
             replica.setRowCount(stat.getNumRows());
             replica.setLocalInvertedIndexSize(stat.getIndexSize());
             replica.setLocalSegmentSize(stat.getSegmentSize());
+
+            CloudReplica cloudReplica = (CloudReplica) replica;
+            cloudReplica.setLastGetTabletStatsTime(System.currentTimeMillis());
+            int statsIntervalIndex = cloudReplica.getStatsIntervalIndex();
+            if (activeUpdate || statsChanged) {
+                statsIntervalIndex = 0;
+                if (!activeUpdate && statsChanged && LOG.isDebugEnabled()) {
+                    LOG.debug("tablet stats changed, reset interval index to 
0, dbId: {}, tableId: {}, "
+                                    + "indexId: {}, partitionId: {}, tabletId: 
{}, dataSize: {}, rowCount: {}, "
+                                    + "rowsetCount: {}, segmentCount: {}, 
indexSize: {}, segmentSize: {}. lastIdx: {}",
+                            stat.getIdx().getDbId(), 
stat.getIdx().getTableId(), stat.getIdx().getIndexId(),
+                            stat.getIdx().getPartitionId(), 
stat.getIdx().getTabletId(), stat.getDataSize(),
+                            stat.getNumRows(), stat.getNumRowsets(), 
stat.getNumSegments(), stat.getIndexSize(),
+                            stat.getSegmentSize(), 
cloudReplica.getStatsIntervalIndex());
+                }
+            } else {
+                statsIntervalIndex = Math.min(statsIntervalIndex + 1, 
DEFAULT_INTERVAL_LADDER_MS.length - 1);
+            }
+            cloudReplica.setStatsIntervalIndex(statsIntervalIndex);
+        }
+        // push tablet stats to other fes
+        if (Config.cloud_get_tablet_stats_version == 2 && activeUpdate && 
Env.getCurrentEnv().isMaster()) {
+            pushTabletStats(response);
         }
     }
 
-    private GetTabletStatsResponse getTabletStats(GetTabletStatsRequest 
request)
+    private GetTabletStatsResponse getTabletStatsFromMs(GetTabletStatsRequest 
request)
             throws RpcException {
         GetTabletStatsResponse response;
         try {
@@ -394,4 +548,78 @@ public class CloudTabletStatMgr extends MasterDaemon {
         }
         this.cloudTableStatsList = new ArrayList<>(topStats);
     }
+
+    public void addActiveTablets(List<Long> tabletIds) {
+        if (Config.cloud_get_tablet_stats_version == 1 || tabletIds == null || 
tabletIds.isEmpty()) {
+            return;
+        }
+        activeTablets.addAll(tabletIds);
+    }
+
+    // master FE send update tablet stats rpc to other FEs
+    private void pushTabletStats(GetTabletStatsResponse response) {
+        List<Frontend> frontends = getFrontends();
+        if (frontends == null || frontends.isEmpty()) {
+            return;
+        }
+        TSyncCloudTabletStatsRequest request = new 
TSyncCloudTabletStatsRequest();
+        request.setTabletStatsPb(ByteBuffer.wrap(response.toByteArray()));
+        for (Frontend fe : frontends) {
+            SYNC_TABLET_STATS_THREAD_POOL.submit(() -> {
+                try {
+                    pushTabletStatsToFe(request, fe);
+                } catch (Exception e) {
+                    LOG.warn("push tablet stats to frontend {}:{} error", 
fe.getHost(), fe.getRpcPort(), e);
+                }
+            });
+        }
+    }
+
+    private void pushTabletStatsToFe(TSyncCloudTabletStatsRequest request, 
Frontend fe) {
+        FrontendService.Client client = null;
+        TNetworkAddress addr = new TNetworkAddress(fe.getHost(), 
fe.getRpcPort());
+        boolean ok = false;
+        try {
+            client = ClientPool.frontendStatsPool.borrowObject(addr);
+            TStatus status = client.syncCloudTabletStats(request);
+            ok = true;
+            if (status.getStatusCode() != TStatusCode.OK) {
+                LOG.warn("failed to push cloud tablet stats to frontend {}:{}, 
err: {}", fe.getHost(),
+                        fe.getRpcPort(), status.getErrorMsgs());
+            }
+        } catch (Exception e) {
+            LOG.warn("failed to push update cloud tablet stats to frontend 
{}:{}", fe.getHost(), fe.getRpcPort(), e);
+        } finally {
+            if (ok) {
+                ClientPool.frontendStatsPool.returnObject(addr, client);
+            } else {
+                ClientPool.frontendStatsPool.invalidateObject(addr, client);
+            }
+        }
+    }
+
+    // follower and observer FE receive sync tablet stats rpc from master FE
+    public void syncTabletStats(GetTabletStatsResponse response) {
+        if (Config.cloud_get_tablet_stats_version == 1 || 
response.getTabletStatsList().isEmpty()) {
+            return;
+        }
+        SYNC_TABLET_STATS_THREAD_POOL.submit(() -> {
+            updateTabletStat(response, true);
+        });
+    }
+
+    private List<Frontend> getFrontends() {
+        if (!Env.getCurrentEnv().isMaster()) {
+            return Collections.emptyList();
+        }
+        HostInfo selfNode = Env.getCurrentEnv().getSelfNode();
+        return Env.getCurrentEnv().getFrontends(null).stream()
+                .filter(fe -> fe.isAlive() && 
!(fe.getHost().equals(selfNode.getHost())
+                        && fe.getEditLogPort() == selfNode.getPort())).collect(
+                        Collectors.toList());
+    }
+
+    public static CloudTabletStatMgr getInstance() {
+        return (CloudTabletStatMgr) Env.getCurrentEnv().getTabletStatMgr();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
index afd2fd2501f..3a9699e8f84 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
@@ -36,6 +36,8 @@ import com.google.common.base.Strings;
 import com.google.common.hash.HashCode;
 import com.google.common.hash.Hashing;
 import com.google.gson.annotations.SerializedName;
+import lombok.Getter;
+import lombok.Setter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -67,9 +69,23 @@ public class CloudReplica extends Replica implements 
GsonPostProcessable {
     private long indexId = -1;
     @SerializedName(value = "idx")
     private long idx = -1;
+    // last time to get tablet stats
+    @Getter
+    @Setter
+    long lastGetTabletStatsTime = 0;
+    /**
+     * The index of {@link 
org.apache.doris.catalog.CloudTabletStatMgr#DEFAULT_INTERVAL_LADDER_MS} array.
+     * Used to control the interval of getting tablet stats.
+     * When get tablet stats:
+     * if the stats is unchanged, will update this index to next value to get 
stats less frequently;
+     * if the stats is changed, will update this index to 0 to get stats more 
frequently.
+     */
+    @Getter
+    @Setter
+    int statsIntervalIndex = 0;
 
     private long segmentCount = 0L;
-    private long rowsetCount = 0L;
+    private long rowsetCount = 1L; // [0-1] rowset
 
     private static final Random rand = new Random();
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 043128d02fd..a8baff54700 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.cloud.transaction;
 
+import org.apache.doris.catalog.CloudTabletStatMgr;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
@@ -147,6 +148,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -501,7 +503,8 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
      * @param commitTxnResponse commit txn call response from meta-service
      * @param tabletCommitInfos tablet commit infos containing backend and 
tablet mapping
      */
-    public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse, 
List<TabletCommitInfo> tabletCommitInfos) {
+    public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse, 
List<TabletCommitInfo> tabletCommitInfos,
+            List<Long> tabletIds) {
         // ========================================
         // notify BEs to make temporary rowsets visible
         // ========================================
@@ -542,6 +545,12 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         if (sb.length() > 0) {
             LOG.info("notify partition first load. {}", sb);
         }
+        // 4. notify update tablet stats
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("force sync tablet stats for txnId: {}, tabletNum: {}, 
tabletIds: {}", txnId,
+                    tabletIds.size(), tabletIds);
+        }
+        CloudTabletStatMgr.getInstance().addActiveTablets(tabletIds);
 
         // ========================================
         // produce event
@@ -733,12 +742,14 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         }
 
         final CommitTxnRequest commitTxnRequest = builder.build();
-        executeCommitTxnRequest(commitTxnRequest, transactionId, is2PC, 
txnCommitAttachment, tabletCommitInfos);
+        executeCommitTxnRequest(commitTxnRequest, transactionId, is2PC, 
txnCommitAttachment, tabletCommitInfos,
+                tabletCommitInfos == null ? Collections.emptyList()
+                        : tabletCommitInfos.stream().map(t -> 
t.getTabletId()).collect(Collectors.toList()));
     }
 
     private void executeCommitTxnRequest(CommitTxnRequest commitTxnRequest, 
long transactionId, boolean is2PC,
-            TxnCommitAttachment txnCommitAttachment, List<TabletCommitInfo> 
tabletCommitInfos)
-                throws UserException {
+            TxnCommitAttachment txnCommitAttachment, List<TabletCommitInfo> 
tabletCommitInfos, List<Long> tabletIds)
+            throws UserException {
         if (DebugPointUtil.isEnable("FE.mow.commit.exception")) {
             LOG.info("debug point FE.mow.commit.exception, throw e");
             throw new UserException(InternalErrorCode.INTERNAL_ERR, "debug 
point FE.mow.commit.exception");
@@ -761,7 +772,7 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         StopWatch stopWatch = new StopWatch();
         stopWatch.start();
         try {
-            txnState = commitTxn(commitTxnRequest, transactionId, is2PC, 
tabletCommitInfos);
+            txnState = commitTxn(commitTxnRequest, transactionId, is2PC, 
tabletCommitInfos, tabletIds);
             txnOperated = true;
             if 
(DebugPointUtil.isEnable("CloudGlobalTransactionMgr.commitTransaction.timeout"))
 {
                 throw new 
UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR,
@@ -801,7 +812,7 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     }
 
     private TransactionState commitTxn(CommitTxnRequest commitTxnRequest, long 
transactionId, boolean is2PC,
-            List<TabletCommitInfo> tabletCommitInfos) throws UserException {
+            List<TabletCommitInfo> tabletCommitInfos, List<Long> tabletIds) 
throws UserException {
         checkCommitInfo(commitTxnRequest);
 
         CommitTxnResponse commitTxnResponse = null;
@@ -861,7 +872,7 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             MetricRepo.COUNTER_TXN_SUCCESS.increase(1L);
             MetricRepo.HISTO_TXN_EXEC_LATENCY.update(txnState.getCommitTime() 
- txnState.getPrepareTime());
         }
-        afterCommitTxnResp(commitTxnResponse, tabletCommitInfos);
+        afterCommitTxnResp(commitTxnResponse, tabletCommitInfos, tabletIds);
         return txnState;
     }
 
@@ -1638,6 +1649,7 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             builder.addMowTableIds(olapTable.getId());
         }
         // add sub txn infos
+        Set<Long> tabletIds = new HashSet<>();
         for (SubTransactionState subTransactionState : subTransactionStates) {
             
builder.addSubTxnInfos(SubTxnInfo.newBuilder().setSubTxnId(subTransactionState.getSubTransactionId())
                     .setTableId(subTransactionState.getTable().getId())
@@ -1647,10 +1659,13 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
                                             .map(c -> new 
TabletCommitInfo(c.getTabletId(), c.getBackendId()))
                                             .collect(Collectors.toList())))
                     .build());
+            for (TTabletCommitInfo tabletCommitInfo : 
subTransactionState.getTabletCommitInfos()) {
+                tabletIds.add(tabletCommitInfo.getTabletId());
+            }
         }
 
         final CommitTxnRequest commitTxnRequest = builder.build();
-        executeCommitTxnRequest(commitTxnRequest, transactionId, false, null, 
null);
+        executeCommitTxnRequest(commitTxnRequest, transactionId, false, null, 
null, new ArrayList<>(tabletIds));
     }
 
     private List<Table> getTablesNeedCommitLock(List<Table> tableList) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
index 2b769ae2e62..f10ad74f33b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
@@ -72,6 +72,9 @@ public class ClientPool {
     public static GenericPool<FrontendService.Client> frontendPool =
             new GenericPool("FrontendService", backendConfig, 
Config.backend_rpc_timeout_ms,
                     
Config.thrift_server_type.equalsIgnoreCase(ThriftServer.THREADED_SELECTOR));
+    public static GenericPool<FrontendService.Client> frontendStatsPool =
+            new GenericPool<>("FrontendService", heartbeatConfig, 
heartbeatTimeoutMs,
+                    
Config.thrift_server_type.equalsIgnoreCase(ThriftServer.THREADED_SELECTOR));
     public static GenericPool<BackendService.Client> backendPool =
             new GenericPool("BackendService", backendConfig, 
Config.backend_rpc_timeout_ms);
     public static GenericPool<TPaloBrokerService.Client> brokerPool =
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
index 50fb5e98610..34eff696fd0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.common.proc;
 
+import org.apache.doris.catalog.CloudTabletStatMgr;
 import org.apache.doris.catalog.DiskInfo;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MaterializedIndex;
@@ -31,12 +32,15 @@ import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.util.ListComparator;
 import org.apache.doris.common.util.NetUtils;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.query.QueryStatsUtil;
 import org.apache.doris.system.Backend;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -49,6 +53,7 @@ import java.util.Map;
  * show tablets' detail info within an index
  */
 public class TabletsProcDir implements ProcDirInterface {
+    private static final Logger LOG = 
LogManager.getLogger(TabletsProcDir.class);
     public static final ImmutableList<String> TITLE_NAMES;
 
     static {
@@ -90,6 +95,11 @@ public class TabletsProcDir implements ProcDirInterface {
                 pathHashToRoot.put(diskInfo.getPathHash(), 
diskInfo.getRootPath());
             }
         }
+        List<Long> tabletIds = null;
+        if (Config.isCloudMode() && ConnectContext.get() != null && 
ConnectContext.get()
+                .getSessionVariable().cloudForceSyncTabletStats) {
+            tabletIds = new ArrayList<>();
+        }
         table.readLock();
         try {
             Map<Long, Long> replicaIdToQueryHits = new HashMap<>();
@@ -105,6 +115,9 @@ public class TabletsProcDir implements ProcDirInterface {
 
             // get infos
             for (Tablet tablet : index.getTablets()) {
+                if (tabletIds != null) {
+                    tabletIds.add(tablet.getId());
+                }
                 long tabletId = tablet.getId();
                 if (tablet.getReplicas().size() == 0) {
                     List<Comparable> tabletInfo = new ArrayList<Comparable>();
@@ -208,6 +221,10 @@ public class TabletsProcDir implements ProcDirInterface {
         } finally {
             table.readUnlock();
         }
+        if (tabletIds != null && !tabletIds.isEmpty()) {
+            LOG.info("force sync tablet stats for table: {}, tabletNum: {}", 
table, tabletIds.size());
+            CloudTabletStatMgr.getInstance().addActiveTablets(tabletIds);
+        }
         return tabletInfos;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 67e45b7d146..576b3a392be 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -822,6 +822,7 @@ public class SessionVariable implements Serializable, 
Writable {
             "cloud_partition_version_cache_ttl_ms";
     public static final String CLOUD_TABLE_VERSION_CACHE_TTL_MS =
             "cloud_table_version_cache_ttl_ms";
+    public static final String CLOUD_FORCE_SYNC_TABLET_STATS = 
"cloud_force_sync_tablet_stats";
     // CLOUD_VARIABLES_BEGIN
 
     public static final String ENABLE_MATCH_WITHOUT_INVERTED_INDEX = 
"enable_match_without_inverted_index";
@@ -3119,6 +3120,8 @@ public class SessionVariable implements Serializable, 
Writable {
     public String cloudCluster = "";
     @VariableMgr.VarAttr(name = DISABLE_EMPTY_PARTITION_PRUNE)
     public boolean disableEmptyPartitionPrune = false;
+    @VariableMgr.VarAttr(name = CLOUD_FORCE_SYNC_TABLET_STATS)
+    public boolean cloudForceSyncTabletStats = false;
     @VariableMgr.VarAttr(name = CLOUD_PARTITION_VERSION_CACHE_TTL_MS)
     public long cloudPartitionVersionCacheTtlMs = Long.MAX_VALUE;
     @VariableMgr.VarAttr(name = CLOUD_TABLE_VERSION_CACHE_TTL_MS)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 56040809b5b..f074711854c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -25,6 +25,7 @@ import org.apache.doris.backup.BackupMeta;
 import org.apache.doris.backup.Snapshot;
 import org.apache.doris.binlog.BinlogLagInfo;
 import org.apache.doris.catalog.AutoIncrementGenerator;
+import org.apache.doris.catalog.CloudTabletStatMgr;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DatabaseIf;
@@ -48,6 +49,7 @@ import org.apache.doris.cloud.catalog.CloudPartition;
 import org.apache.doris.cloud.catalog.CloudReplica;
 import org.apache.doris.cloud.catalog.CloudTablet;
 import org.apache.doris.cloud.proto.Cloud.CommitTxnResponse;
+import org.apache.doris.cloud.proto.Cloud.GetTabletStatsResponse;
 import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.AuthenticationException;
@@ -285,6 +287,7 @@ import 
org.apache.doris.thrift.TStreamLoadMultiTablePutResult;
 import org.apache.doris.thrift.TStreamLoadPutRequest;
 import org.apache.doris.thrift.TStreamLoadPutResult;
 import org.apache.doris.thrift.TSubTxnInfo;
+import org.apache.doris.thrift.TSyncCloudTabletStatsRequest;
 import org.apache.doris.thrift.TSyncQueryColumns;
 import org.apache.doris.thrift.TTableIndexQueryStats;
 import org.apache.doris.thrift.TTableMetadataNameIds;
@@ -5089,16 +5092,28 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             return new TStatus(TStatusCode.NOT_MASTER);
         }
 
-        LOG.info("receive load stats report request: {}, backend: {}, dbId: 
{}, txnId: {}, label: {}",
-                request, clientAddr, request.getDbId(), request.getTxnId(), 
request.getLabel());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("receive load stats report from backend: {}, dbId: {}, 
txnId: {}, label: {}, tabletIds: {}",
+                    clientAddr, request.getDbId(), request.getTxnId(), 
request.getLabel(), request.getTabletIds());
+        }
 
         try {
-            byte[] receivedProtobufBytes = request.getPayload();
-            if (receivedProtobufBytes == null || receivedProtobufBytes.length 
<= 0) {
-                return new TStatus(TStatusCode.INVALID_ARGUMENT);
+            List<Long> tabletIds = request.isSetTabletIds() ? 
request.getTabletIds() : Collections.emptyList();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("force sync tablet stats for txnId: {}, tabletNum: 
{}, tabletIds: {}", request.txnId,
+                        tabletIds.size(), tabletIds);
+            }
+            if (request.isSetTxnId() && request.getTxnId() != -1) {
+                byte[] receivedProtobufBytes = request.getPayload();
+                if (receivedProtobufBytes == null || 
receivedProtobufBytes.length <= 0) {
+                    return new TStatus(TStatusCode.INVALID_ARGUMENT);
+                }
+                CommitTxnResponse commitTxnResponse = 
CommitTxnResponse.parseFrom(receivedProtobufBytes);
+                
Env.getCurrentGlobalTransactionMgr().afterCommitTxnResp(commitTxnResponse, 
null, tabletIds);
+            } else {
+                // compaction notify update tablet stats
+                CloudTabletStatMgr.getInstance().addActiveTablets(tabletIds);
             }
-            CommitTxnResponse commitTxnResponse = 
CommitTxnResponse.parseFrom(receivedProtobufBytes);
-            
Env.getCurrentGlobalTransactionMgr().afterCommitTxnResp(commitTxnResponse, 
null);
         } catch (InvalidProtocolBufferException e) {
             // Handle the exception, log it, or take appropriate action
             e.printStackTrace();
@@ -5462,6 +5477,33 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         }
     }
 
+    @Override
+    public TStatus syncCloudTabletStats(TSyncCloudTabletStatsRequest request)
+            throws TException {
+        TStatus status = new TStatus(TStatusCode.OK);
+        if (Env.getCurrentEnv().isMaster()) {
+            LOG.warn("syncCloudTabletStats called on master, ignoring");
+            return status;
+        }
+
+        byte[] receivedProtobufBytes = request.getTabletStatsPb();
+        if (receivedProtobufBytes == null || receivedProtobufBytes.length <= 
0) {
+            status.setStatusCode(TStatusCode.INVALID_ARGUMENT);
+            status.addToErrorMsgs("TabletStatsPb is null or empty");
+            return status;
+        }
+        GetTabletStatsResponse getTabletStatsResponse;
+        try {
+            getTabletStatsResponse = 
GetTabletStatsResponse.parseFrom(receivedProtobufBytes);
+        } catch (Exception e) {
+            status.setStatusCode(TStatusCode.INVALID_ARGUMENT);
+            status.addToErrorMsgs("parse GetTabletStatsResponse error: " + 
e.getMessage());
+            return status;
+        }
+        
CloudTabletStatMgr.getInstance().syncTabletStats(getTabletStatsResponse);
+        return status;
+    }
+
     private TStatus checkMaster() {
         TStatus status = new TStatus(TStatusCode.OK);
         if (!Env.getCurrentEnv().isMaster()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index b5e30e9893d..2ce7717912f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -230,7 +230,8 @@ public class GlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     }
 
     @Override
-    public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse, 
List<TabletCommitInfo> tabletCommitInfos) {
+    public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse, 
List<TabletCommitInfo> tabletCommitInfos,
+            List<Long> tabletIds) {
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
index b611ff4e588..d05291e93e2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
@@ -210,7 +210,8 @@ public interface GlobalTransactionMgrIface extends Writable 
{
 
     public void 
replayBatchRemoveTransactionV2(BatchRemoveTransactionsOperationV2 operation) 
throws Exception;
 
-    public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse, 
List<TabletCommitInfo> tabletCommitInfos);
+    public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse, 
List<TabletCommitInfo> tabletCommitInfos,
+            List<Long> tabletIds);
 
     public void addSubTransaction(long dbId, long transactionId, long 
subTransactionId);
 
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index aeb5b06efb6..674ffc35a12 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1534,6 +1534,8 @@ struct TReportCommitTxnResultRequest {
     2: optional i64 txnId
     3: optional string label
     4: optional binary payload
+    // tablets which need to update stats
+    5: optional list<i64> tabletIds
 }
 
 struct TQueryColumn {
@@ -1868,6 +1870,10 @@ struct TMasterAddressResult {
     2: optional Types.TNetworkAddress master_address
 }
 
+struct TSyncCloudTabletStatsRequest {
+    1: optional binary tablet_stats_pb
+}
+
 service FrontendService {
     TGetDbsResult getDbNames(1: TGetDbsParams params)
     TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -1990,4 +1996,6 @@ service FrontendService {
     TInsertOverwriteRecordResult addOrDropInsertOverwriteRecord(1: 
TInsertOverwriteRecordRequest request)
 
     TRecordFinishedLoadJobResult recordFinishedLoadJobRequest(1: 
TRecordFinishedLoadJobRequest request)
+
+    Status.TStatus syncCloudTabletStats(1: TSyncCloudTabletStatsRequest 
request)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to